From 639eaee6d48a8d53255642a58acbaeab807bc49b Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 13:07:00 +0900 Subject: [PATCH 01/14] feat: implement polling fallback for event communication - Added testWebSocket to detect protocol availability - Implemented polling fallback in MeshV2Service - Updated gql-operations.js with new polling mutations and queries - Exported GRAPHQL_ENDPOINT from mesh-client.js - Added unit tests for polling functionality Co-Authored-By: Gemini --- .../scratch3_mesh_v2/gql-operations.js | 41 +++- .../scratch3_mesh_v2/mesh-client.js | 3 +- .../scratch3_mesh_v2/mesh-service.js | 194 ++++++++++++++++-- test/unit/mesh_service_v2_polling.js | 170 +++++++++++++++ 4 files changed, 391 insertions(+), 17 deletions(-) create mode 100644 test/unit/mesh_service_v2_polling.js diff --git a/src/extensions/scratch3_mesh_v2/gql-operations.js b/src/extensions/scratch3_mesh_v2/gql-operations.js index 19ee27b244..c9b36d0a4b 100644 --- a/src/extensions/scratch3_mesh_v2/gql-operations.js +++ b/src/extensions/scratch3_mesh_v2/gql-operations.js @@ -22,8 +22,13 @@ const CREATE_DOMAIN = gql` `; const CREATE_GROUP = gql` - mutation CreateGroup($name: String!, $hostId: ID!, $domain: String!, $maxConnectionTimeSeconds: Int) { - createGroup(name: $name, hostId: $hostId, domain: $domain, maxConnectionTimeSeconds: $maxConnectionTimeSeconds) { + mutation CreateGroup( + $name: String!, $hostId: ID!, $domain: String!, $maxConnectionTimeSeconds: Int, $useWebSocket: Boolean! + ) { + createGroup( + name: $name, hostId: $hostId, domain: $domain, + maxConnectionTimeSeconds: $maxConnectionTimeSeconds, useWebSocket: $useWebSocket + ) { id domain fullId @@ -32,6 +37,8 @@ const CREATE_GROUP = gql` createdAt expiresAt heartbeatIntervalSeconds + useWebSocket + pollingIntervalSeconds } } `; @@ -45,6 +52,8 @@ const JOIN_GROUP = gql` domain expiresAt heartbeatIntervalSeconds + useWebSocket + pollingIntervalSeconds } } `; @@ -139,6 +148,32 @@ const FIRE_EVENTS = gql` } `; +const RECORD_EVENTS = gql` + mutation RecordEventsByNode($groupId: ID!, $domain: String!, $nodeId: ID!, $events: [EventInput!]!) { + recordEventsByNode( + groupId: $groupId, domain: $domain, nodeId: $nodeId, events: $events + ) { + groupId + domain + recordedCount + nextSince + } + } +`; + +const GET_EVENTS_SINCE = gql` + query GetEventsSince($groupId: ID!, $domain: String!, $since: AWSDateTime!) { + getEventsSince(groupId: $groupId, domain: $domain, since: $since) { + name + firedByNodeId + groupId + domain + payload + timestamp + } + } +`; + const ON_MESSAGE_IN_GROUP = gql` subscription OnMessageInGroup($groupId: ID!, $domain: String!) { onMessageInGroup(groupId: $groupId, domain: $domain) { @@ -203,6 +238,8 @@ module.exports = { SEND_MEMBER_HEARTBEAT, REPORT_DATA, FIRE_EVENTS, + RECORD_EVENTS, + GET_EVENTS_SINCE, ON_MESSAGE_IN_GROUP, LIST_GROUP_STATUSES }; diff --git a/src/extensions/scratch3_mesh_v2/mesh-client.js b/src/extensions/scratch3_mesh_v2/mesh-client.js index 8c5f0e38f0..737ec34a69 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-client.js +++ b/src/extensions/scratch3_mesh_v2/mesh-client.js @@ -32,5 +32,6 @@ const getClient = () => client; module.exports = { createClient, getClient, - gql + gql, + GRAPHQL_ENDPOINT }; diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index fec9e7a8ad..70b9bdf5d0 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -14,10 +14,14 @@ const { SEND_MEMBER_HEARTBEAT, REPORT_DATA, FIRE_EVENTS, + RECORD_EVENTS, + GET_EVENTS_SINCE, ON_MESSAGE_IN_GROUP, LIST_GROUP_STATUSES } = require('./gql-operations'); +const {GRAPHQL_ENDPOINT} = require('./mesh-client'); + /** * Parses an environment variable as an integer with validation. * @param {string} envVar - The environment variable value. @@ -58,11 +62,15 @@ class MeshV2Service { this.groupName = null; this.expiresAt = null; this.isHost = false; + this.useWebSocket = true; + this.pollingIntervalSeconds = 2; + this.lastFetchTime = null; this.subscriptions = []; this.connectionTimer = null; this.heartbeatTimer = null; this.dataSyncTimer = null; + this.pollingTimer = null; // Last data send promise to track completion of the most recent data transmission this.lastDataSendPromise = Promise.resolve(); @@ -198,6 +206,45 @@ class MeshV2Service { } } + /** + * Test if WebSocket connection is possible in the current environment. + * @returns {Promise} True if WebSocket is available. + */ + testWebSocket () { + return new Promise(resolve => { + try { + // Derived from https://xxx.appsync-api.region.amazonaws.com/graphql + // to wss://xxx.appsync-realtime-api.region.amazonaws.com/graphql + const wsUrl = GRAPHQL_ENDPOINT + .replace('https://', 'wss://') + .replace('appsync-api', 'appsync-realtime-api'); + + const socket = new WebSocket(wsUrl, 'graphql-ws'); + const timeout = setTimeout(() => { + log.warn('Mesh V2: WebSocket test timed out'); + socket.close(); + resolve(false); + }, 3000); // 3 seconds timeout for test + + socket.onopen = () => { + log.info('Mesh V2: WebSocket test successful'); + clearTimeout(timeout); + socket.close(); + resolve(true); + }; + + socket.onerror = err => { + log.warn(`Mesh V2: WebSocket test failed: ${err}`); + clearTimeout(timeout); + resolve(false); + }; + } catch (error) { + log.warn(`Mesh V2: WebSocket not supported or failed to initialize: ${error}`); + resolve(false); + } + }); + } + async createDomain () { if (!this.client) throw new Error('Client not initialized'); @@ -224,13 +271,18 @@ class MeshV2Service { await this.createDomain(); } + // Test WebSocket availability + this.useWebSocket = await this.testWebSocket(); + log.info(`Mesh V2: WebSocket available: ${this.useWebSocket}`); + this.costTracking.mutationCount++; const result = await this.client.mutate({ mutation: CREATE_GROUP, variables: { name: groupName, hostId: this.meshId, - domain: this.domain + domain: this.domain, + useWebSocket: this.useWebSocket } }); @@ -239,13 +291,21 @@ class MeshV2Service { this.groupName = group.name; this.domain = group.domain; // Update domain from server this.expiresAt = group.expiresAt; + this.useWebSocket = group.useWebSocket; + if (group.pollingIntervalSeconds) { + this.pollingIntervalSeconds = group.pollingIntervalSeconds; + } this.isHost = true; if (group.heartbeatIntervalSeconds) { this.hostHeartbeatInterval = group.heartbeatIntervalSeconds; } this.costTracking.connectionStartTime = Date.now(); - this.startSubscriptions(); + if (this.useWebSocket) { + this.startSubscriptions(); + } else { + this.startPolling(); + } this.startHeartbeat(); this.startEventBatchTimer(); this.startConnectionTimer(); @@ -253,7 +313,8 @@ class MeshV2Service { await this.sendAllGlobalVariables(); - log.info(`Mesh V2: Created group ${this.groupName} (${this.groupId}) in domain ${this.domain}`); + log.info(`Mesh V2: Created group ${this.groupName} (${this.groupId}) in domain ${this.domain} ` + + `(Protocol: ${this.useWebSocket ? 'WebSocket' : 'Polling'})`); return group; } catch (error) { log.error(`Mesh V2: Failed to create group: ${error}`); @@ -305,13 +366,21 @@ class MeshV2Service { this.groupName = groupName || groupId; this.domain = node.domain; // Update domain from server this.expiresAt = node.expiresAt; + this.useWebSocket = node.useWebSocket; + if (node.pollingIntervalSeconds) { + this.pollingIntervalSeconds = node.pollingIntervalSeconds; + } this.isHost = false; if (node.heartbeatIntervalSeconds) { this.memberHeartbeatInterval = node.heartbeatIntervalSeconds; } this.costTracking.connectionStartTime = Date.now(); - this.startSubscriptions(); + if (this.useWebSocket) { + this.startSubscriptions(); + } else { + this.startPolling(); + } this.startHeartbeat(); // Start heartbeat for member too this.startEventBatchTimer(); this.startConnectionTimer(); @@ -320,7 +389,8 @@ class MeshV2Service { await this.sendAllGlobalVariables(); await this.fetchAllNodesData(); - log.info(`Mesh V2: Joined group ${this.groupId} in domain ${this.domain}`); + log.info(`Mesh V2: Joined group ${this.groupId} in domain ${this.domain} ` + + `(Protocol: ${this.useWebSocket ? 'WebSocket' : 'Polling'})`); return node; } catch (error) { log.error(`Mesh V2: Failed to join group: ${error}`); @@ -426,6 +496,7 @@ class MeshV2Service { this.lastBroadcastOffset = 0; this.stopSubscriptions(); + this.stopPolling(); this.stopHeartbeat(); this.stopEventBatchTimer(); this.stopConnectionTimer(); @@ -483,6 +554,83 @@ class MeshV2Service { this.subscriptions = []; } + /** + * Start polling for events when WebSocket is not available. + */ + startPolling () { + this.stopPolling(); + if (!this.groupId) return; + + log.info(`Mesh V2: Starting event polling (Interval: ${this.pollingIntervalSeconds}s)`); + // Initial fetch time + if (!this.lastFetchTime) { + this.lastFetchTime = new Date().toISOString(); + } + + this.pollingTimer = setInterval(() => { + this.pollEvents(); + }, this.pollingIntervalSeconds * 1000); + } + + /** + * Stop event polling. + */ + stopPolling () { + if (this.pollingTimer) { + log.info('Mesh V2: Stopping event polling'); + clearInterval(this.pollingTimer); + this.pollingTimer = null; + } + this.lastFetchTime = null; + } + + /** + * Fetch new events from the server since the last fetch time. + */ + async pollEvents () { + if (!this.groupId || !this.client || this.useWebSocket) return; + + try { + this.costTracking.queryCount++; + const result = await this.client.query({ + query: GET_EVENTS_SINCE, + variables: { + groupId: this.groupId, + domain: this.domain, + since: this.lastFetchTime + }, + fetchPolicy: 'network-only' + }); + + const events = result.data.getEventsSince; + if (events && events.length > 0) { + log.info(`Mesh V2: Polled ${events.length} events`); + // Process events (similar to handleBatchEvent but with direct event objects) + const batchEvent = { + firedByNodeId: 'polling-server', // dummy + events: events.map(e => ({ + name: e.name, + firedByNodeId: e.firedByNodeId, + groupId: e.groupId, + domain: e.domain, + payload: e.payload, + timestamp: e.timestamp + })) + }; + this.handleBatchEvent(batchEvent); + + // Update lastFetchTime to the cursor of the last received event + this.lastFetchTime = events[events.length - 1].cursor; + } + } catch (error) { + log.error(`Mesh V2: Event polling failed: ${error}`); + const reason = this.shouldDisconnectOnError(error); + if (reason) { + this.cleanupAndDisconnect(reason); + } + } + } + handleDataUpdate (nodeStatus) { if (!nodeStatus || nodeStatus.nodeId === this.meshId) return; @@ -679,16 +827,34 @@ class MeshV2Service { this.costTracking.mutationCount++; this.costTracking.fireEventsCount++; - log.info(`Mesh V2: Sending batch of ${events.length} events to group ${this.groupId}`); - await this.client.mutate({ - mutation: FIRE_EVENTS, - variables: { - groupId: this.groupId, - domain: this.domain, - nodeId: this.meshId, - events: events + log.info(`Mesh V2: Sending batch of ${events.length} events to group ${this.groupId} ` + + `(Protocol: ${this.useWebSocket ? 'WebSocket' : 'Polling'})`); + + if (this.useWebSocket) { + await this.client.mutate({ + mutation: FIRE_EVENTS, + variables: { + groupId: this.groupId, + domain: this.domain, + nodeId: this.meshId, + events: events + } + }); + } else { + const result = await this.client.mutate({ + mutation: RECORD_EVENTS, + variables: { + groupId: this.groupId, + domain: this.domain, + nodeId: this.meshId, + events: events + } + }); + // Update lastFetchTime if it's currently null + if (!this.lastFetchTime) { + this.lastFetchTime = result.data.recordEventsByNode.nextSince; } - }); + } } catch (error) { log.error(`Mesh V2: Failed to fire batch events: ${error}`); const reason = this.shouldDisconnectOnError(error); diff --git a/test/unit/mesh_service_v2_polling.js b/test/unit/mesh_service_v2_polling.js new file mode 100644 index 0000000000..022d4711d9 --- /dev/null +++ b/test/unit/mesh_service_v2_polling.js @@ -0,0 +1,170 @@ +const test = require('tap').test; +const minilog = require('minilog'); +// Suppress debug and info logs during tests +minilog.suggest.deny('vm', 'debug'); +minilog.suggest.deny('vm', 'info'); + +const MeshV2Service = require('../../src/extensions/scratch3_mesh_v2/mesh-service'); +const {GET_EVENTS_SINCE, RECORD_EVENTS} = require('../../src/extensions/scratch3_mesh_v2/gql-operations'); + +const createMockBlocks = () => ({ + runtime: { + sequencer: {}, + emit: () => {}, + on: () => {}, + off: () => {} + }, + opcodeFunctions: { + event_broadcast: () => {} + } +}); + +test('MeshV2Service Polling', t => { + t.test('pollEvents fetches and handles events', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.lastFetchTime = 'T1'; + + const events = [ + { + name: 'e1', + firedByNodeId: 'node2', + groupId: 'group1', + domain: 'domain1', + payload: 'p1', + timestamp: 'T2', + cursor: 'C2' + }, + { + name: 'e2', + firedByNodeId: 'node2', + groupId: 'group1', + domain: 'domain1', + payload: 'p2', + timestamp: 'T3', + cursor: 'C3' + } + ]; + + service.client = { + query: options => { + st.equal(options.query, GET_EVENTS_SINCE); + st.equal(options.variables.since, 'T1'); + return Promise.resolve({ + data: { + getEventsSince: events + } + }); + } + }; + + // Spy on handleBatchEvent + let handledBatch = null; + service.handleBatchEvent = batch => { + handledBatch = batch; + }; + + await service.pollEvents(); + + st.ok(handledBatch); + st.equal(handledBatch.events.length, 2); + st.equal(handledBatch.events[0].name, 'e1'); + st.equal(service.lastFetchTime, 'C3'); + + st.end(); + }); + + t.test('fireEventsBatch uses RECORD_EVENTS when useWebSocket is false', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.lastFetchTime = null; + + const events = [{eventName: 'e1', payload: 'p1', firedAt: 't1'}]; + + service.client = { + mutate: options => { + st.equal(options.mutation, RECORD_EVENTS); + return Promise.resolve({ + data: { + recordEventsByNode: { + nextSince: 'T_NEW' + } + } + }); + } + }; + + await service.fireEventsBatch(events); + + st.equal(service.lastFetchTime, 'T_NEW'); + + st.end(); + }); + + t.test('startPolling sets up interval', st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.pollingIntervalSeconds = 0.01; // 10ms + + let pollCount = 0; + service.pollEvents = () => { + pollCount++; + }; + + service.startPolling(); + st.ok(service.pollingTimer); + + setTimeout(() => { + service.stopPolling(); + st.ok(pollCount > 0); + st.equal(service.pollingTimer, null); + st.end(); + }, 50); + }); + + t.test('testWebSocket success', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + + // Mock WebSocket + global.WebSocket = class { + constructor () { + setTimeout(() => this.onopen(), 10); + } + close () {} + }; + + const result = await service.testWebSocket(); + st.equal(result, true); + + delete global.WebSocket; + st.end(); + }); + + t.test('testWebSocket failure', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + + // Mock WebSocket + global.WebSocket = class { + constructor () { + setTimeout(() => this.onerror(new Error('fail')), 10); + } + close () {} + }; + + const result = await service.testWebSocket(); + st.equal(result, false); + + delete global.WebSocket; + st.end(); + }); + + t.end(); +}); From 69b544af318211ec3bcd1a6f514595a106a4edd7 Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 18:51:01 +0900 Subject: [PATCH 02/14] fix: resolve VariableTypeMismatch and WebSocket failure - Change GetEventsSince type from AWSDateTime! to String! - Improve testWebSocket URL construction to support custom domains --- src/extensions/scratch3_mesh_v2/gql-operations.js | 2 +- src/extensions/scratch3_mesh_v2/mesh-service.js | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/gql-operations.js b/src/extensions/scratch3_mesh_v2/gql-operations.js index c9b36d0a4b..829388632a 100644 --- a/src/extensions/scratch3_mesh_v2/gql-operations.js +++ b/src/extensions/scratch3_mesh_v2/gql-operations.js @@ -162,7 +162,7 @@ const RECORD_EVENTS = gql` `; const GET_EVENTS_SINCE = gql` - query GetEventsSince($groupId: ID!, $domain: String!, $since: AWSDateTime!) { + query GetEventsSince($groupId: ID!, $domain: String!, $since: String!) { getEventsSince(groupId: $groupId, domain: $domain, since: $since) { name firedByNodeId diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 70b9bdf5d0..4eabd06030 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -215,9 +215,13 @@ class MeshV2Service { try { // Derived from https://xxx.appsync-api.region.amazonaws.com/graphql // to wss://xxx.appsync-realtime-api.region.amazonaws.com/graphql - const wsUrl = GRAPHQL_ENDPOINT - .replace('https://', 'wss://') - .replace('appsync-api', 'appsync-realtime-api'); + // or for custom domains, to wss://api.example.com/graphql/realtime + let wsUrl = GRAPHQL_ENDPOINT.replace('https://', 'wss://'); + if (wsUrl.includes('appsync-api')) { + wsUrl = wsUrl.replace('appsync-api', 'appsync-realtime-api'); + } else { + wsUrl = wsUrl.replace(/\/graphql$/, '/graphql/realtime'); + } const socket = new WebSocket(wsUrl, 'graphql-ws'); const timeout = setTimeout(() => { From f250f0bae2b980d063f56c82bbbb00408ca9d0ae Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 19:17:32 +0900 Subject: [PATCH 03/14] fix: resolve Null value error for 'since' variable during polling - Added 'cursor' field to GET_EVENTS_SINCE query to correctly update lastFetchTime - Added guard in pollEvents to prevent execution if lastFetchTime is null --- src/extensions/scratch3_mesh_v2/gql-operations.js | 1 + src/extensions/scratch3_mesh_v2/mesh-service.js | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/extensions/scratch3_mesh_v2/gql-operations.js b/src/extensions/scratch3_mesh_v2/gql-operations.js index 829388632a..0ceba8ea74 100644 --- a/src/extensions/scratch3_mesh_v2/gql-operations.js +++ b/src/extensions/scratch3_mesh_v2/gql-operations.js @@ -170,6 +170,7 @@ const GET_EVENTS_SINCE = gql` domain payload timestamp + cursor } } `; diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 4eabd06030..33c4698ee5 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -592,7 +592,7 @@ class MeshV2Service { * Fetch new events from the server since the last fetch time. */ async pollEvents () { - if (!this.groupId || !this.client || this.useWebSocket) return; + if (!this.groupId || !this.client || this.useWebSocket || !this.lastFetchTime) return; try { this.costTracking.queryCount++; From 1b295d725114e1963057982c052dcb7963abe460 Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 19:24:55 +0900 Subject: [PATCH 04/14] fix: initialize lastFetchTime during group create/join - Set lastFetchTime to createdAt in createGroup - Set lastFetchTime to current time in joinGroup - Remove redundant initializations in startPolling and fireEventsBatch --- src/extensions/scratch3_mesh_v2/mesh-service.js | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 33c4698ee5..939b2e034e 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -295,6 +295,7 @@ class MeshV2Service { this.groupName = group.name; this.domain = group.domain; // Update domain from server this.expiresAt = group.expiresAt; + this.lastFetchTime = group.createdAt; this.useWebSocket = group.useWebSocket; if (group.pollingIntervalSeconds) { this.pollingIntervalSeconds = group.pollingIntervalSeconds; @@ -370,6 +371,7 @@ class MeshV2Service { this.groupName = groupName || groupId; this.domain = node.domain; // Update domain from server this.expiresAt = node.expiresAt; + this.lastFetchTime = new Date().toISOString(); this.useWebSocket = node.useWebSocket; if (node.pollingIntervalSeconds) { this.pollingIntervalSeconds = node.pollingIntervalSeconds; @@ -566,10 +568,6 @@ class MeshV2Service { if (!this.groupId) return; log.info(`Mesh V2: Starting event polling (Interval: ${this.pollingIntervalSeconds}s)`); - // Initial fetch time - if (!this.lastFetchTime) { - this.lastFetchTime = new Date().toISOString(); - } this.pollingTimer = setInterval(() => { this.pollEvents(); @@ -592,7 +590,7 @@ class MeshV2Service { * Fetch new events from the server since the last fetch time. */ async pollEvents () { - if (!this.groupId || !this.client || this.useWebSocket || !this.lastFetchTime) return; + if (!this.groupId || !this.client || this.useWebSocket) return; try { this.costTracking.queryCount++; @@ -845,7 +843,7 @@ class MeshV2Service { } }); } else { - const result = await this.client.mutate({ + await this.client.mutate({ mutation: RECORD_EVENTS, variables: { groupId: this.groupId, @@ -854,10 +852,6 @@ class MeshV2Service { events: events } }); - // Update lastFetchTime if it's currently null - if (!this.lastFetchTime) { - this.lastFetchTime = result.data.recordEventsByNode.nextSince; - } } } catch (error) { log.error(`Mesh V2: Failed to fire batch events: ${error}`); From 43e24745ae01e0c3ab5570d33b8c325e711aeae8 Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 19:34:18 +0900 Subject: [PATCH 05/14] feat: add mesh_polling query parameter to force polling mode - Added getForcePollingFromUrl to utils.js - Use forcePolling setting in MeshV2Service to bypass WebSocket test - Override useWebSocket to false if forcePolling is enabled during group join --- src/extensions/scratch3_mesh_v2/mesh-service.js | 16 +++++++++++++--- src/extensions/scratch3_mesh_v2/utils.js | 12 +++++++++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 939b2e034e..623560a047 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -20,6 +20,8 @@ const { LIST_GROUP_STATUSES } = require('./gql-operations'); +const {getForcePollingFromUrl} = require('./utils'); + const {GRAPHQL_ENDPOINT} = require('./mesh-client'); /** @@ -62,7 +64,11 @@ class MeshV2Service { this.groupName = null; this.expiresAt = null; this.isHost = false; - this.useWebSocket = true; + this.forcePolling = getForcePollingFromUrl(); + this.useWebSocket = !this.forcePolling; + if (this.forcePolling) { + log.info('Mesh V2: Forced polling mode enabled via URL parameter'); + } this.pollingIntervalSeconds = 2; this.lastFetchTime = null; @@ -276,7 +282,11 @@ class MeshV2Service { } // Test WebSocket availability - this.useWebSocket = await this.testWebSocket(); + if (this.forcePolling) { + this.useWebSocket = false; + } else { + this.useWebSocket = await this.testWebSocket(); + } log.info(`Mesh V2: WebSocket available: ${this.useWebSocket}`); this.costTracking.mutationCount++; @@ -372,7 +382,7 @@ class MeshV2Service { this.domain = node.domain; // Update domain from server this.expiresAt = node.expiresAt; this.lastFetchTime = new Date().toISOString(); - this.useWebSocket = node.useWebSocket; + this.useWebSocket = this.forcePolling ? false : node.useWebSocket; if (node.pollingIntervalSeconds) { this.pollingIntervalSeconds = node.pollingIntervalSeconds; } diff --git a/src/extensions/scratch3_mesh_v2/utils.js b/src/extensions/scratch3_mesh_v2/utils.js index ad3eb5bd92..194d1cdb98 100644 --- a/src/extensions/scratch3_mesh_v2/utils.js +++ b/src/extensions/scratch3_mesh_v2/utils.js @@ -53,10 +53,20 @@ const saveDomainToLocalStorage = domain => { /* istanbul ignore next */ const getDomain = () => getDomainFromUrl() || getDomainFromLocalStorage(); +/* istanbul ignore next */ +const getForcePollingFromUrl = () => { + /* istanbul ignore next */ + if (typeof window === 'undefined') return false; + const urlParams = new URLSearchParams(window.location.search); + const polling = urlParams.get('mesh_polling'); + return !!polling && polling !== '0' && polling !== 'false'; +}; + module.exports = { getDomainFromUrl, getDomainFromLocalStorage, saveDomainToLocalStorage, getDomain, - validateDomain + validateDomain, + getForcePollingFromUrl }; From 666090654d5159484e8ff40bcddf2ea2b10db470 Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 20:14:25 +0900 Subject: [PATCH 06/14] fix: correct lastFetchTime initialization and polling cursor handling - Updated JOIN_GROUP, RENEW_HEARTBEAT, and SEND_MEMBER_HEARTBEAT queries to include createdAt - Initialize lastFetchTime to empty string in constructor - Use node.createdAt from server when joining group - Use event.cursor to update lastFetchTime during polling - Added fallback for lastFetchTime in pollEvents to avoid GraphQL error - Updated tests with createdAt mock data --- .../scratch3_mesh_v2/gql-operations.js | 3 + .../scratch3_mesh_v2/mesh-service.js | 55 ++++++++++--------- test/unit/extension_mesh_v2_service.js | 4 ++ 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/gql-operations.js b/src/extensions/scratch3_mesh_v2/gql-operations.js index 0ceba8ea74..ab364d470d 100644 --- a/src/extensions/scratch3_mesh_v2/gql-operations.js +++ b/src/extensions/scratch3_mesh_v2/gql-operations.js @@ -50,6 +50,7 @@ const JOIN_GROUP = gql` name groupId domain + createdAt expiresAt heartbeatIntervalSeconds useWebSocket @@ -88,6 +89,7 @@ const RENEW_HEARTBEAT = gql` renewHeartbeat(groupId: $groupId, domain: $domain, hostId: $hostId) { groupId domain + createdAt expiresAt heartbeatIntervalSeconds } @@ -100,6 +102,7 @@ const SEND_MEMBER_HEARTBEAT = gql` nodeId groupId domain + createdAt expiresAt heartbeatIntervalSeconds } diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 623560a047..6cb36619b1 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -70,7 +70,7 @@ class MeshV2Service { log.info('Mesh V2: Forced polling mode enabled via URL parameter'); } this.pollingIntervalSeconds = 2; - this.lastFetchTime = null; + this.lastFetchTime = ''; this.subscriptions = []; this.connectionTimer = null; @@ -306,6 +306,7 @@ class MeshV2Service { this.domain = group.domain; // Update domain from server this.expiresAt = group.expiresAt; this.lastFetchTime = group.createdAt; + log.info(`Mesh V2: Initialized lastFetchTime to ${this.lastFetchTime} (from group.createdAt)`); this.useWebSocket = group.useWebSocket; if (group.pollingIntervalSeconds) { this.pollingIntervalSeconds = group.pollingIntervalSeconds; @@ -381,7 +382,8 @@ class MeshV2Service { this.groupName = groupName || groupId; this.domain = node.domain; // Update domain from server this.expiresAt = node.expiresAt; - this.lastFetchTime = new Date().toISOString(); + this.lastFetchTime = node.createdAt; + log.info(`Mesh V2: Initialized lastFetchTime to ${this.lastFetchTime} (from node.createdAt)`); this.useWebSocket = this.forcePolling ? false : node.useWebSocket; if (node.pollingIntervalSeconds) { this.pollingIntervalSeconds = node.pollingIntervalSeconds; @@ -602,8 +604,14 @@ class MeshV2Service { async pollEvents () { if (!this.groupId || !this.client || this.useWebSocket) return; + if (!this.lastFetchTime) { + log.warn('Mesh V2: pollEvents called but lastFetchTime is empty. Falling back to current time.'); + this.lastFetchTime = new Date().toISOString(); + } + + log.debug(`Mesh V2: pollEvents for group ${this.groupId}. since=${this.lastFetchTime}`); + try { - this.costTracking.queryCount++; const result = await this.client.query({ query: GET_EVENTS_SINCE, variables: { @@ -614,32 +622,21 @@ class MeshV2Service { fetchPolicy: 'network-only' }); - const events = result.data.getEventsSince; - if (events && events.length > 0) { - log.info(`Mesh V2: Polled ${events.length} events`); - // Process events (similar to handleBatchEvent but with direct event objects) - const batchEvent = { - firedByNodeId: 'polling-server', // dummy - events: events.map(e => ({ - name: e.name, - firedByNodeId: e.firedByNodeId, - groupId: e.groupId, - domain: e.domain, - payload: e.payload, - timestamp: e.timestamp - })) - }; - this.handleBatchEvent(batchEvent); - - // Update lastFetchTime to the cursor of the last received event - this.lastFetchTime = events[events.length - 1].cursor; + if (result.data && result.data.getEventsSince) { + const events = result.data.getEventsSince; + if (events.length > 0) { + log.info(`Mesh V2: Polled ${events.length} events`); + events.forEach(event => { + this.handleEvent(event); + // Update lastFetchTime to the latest event's cursor + if (event.cursor) { + this.lastFetchTime = event.cursor; + } + }); + } } } catch (error) { log.error(`Mesh V2: Event polling failed: ${error}`); - const reason = this.shouldDisconnectOnError(error); - if (reason) { - this.cleanupAndDisconnect(reason); - } } } @@ -912,6 +909,9 @@ class MeshV2Service { } }); this.expiresAt = result.data.renewHeartbeat.expiresAt; + if (result.data.renewHeartbeat.createdAt) { + this.lastFetchTime = result.data.renewHeartbeat.createdAt; + } log.info(`Mesh V2: Heartbeat renewed. Expires at: ${this.expiresAt}`); if (result.data.renewHeartbeat.heartbeatIntervalSeconds) { const newInterval = result.data.renewHeartbeat.heartbeatIntervalSeconds; @@ -946,6 +946,9 @@ class MeshV2Service { } }); log.info('Mesh V2: Member heartbeat sent'); + if (result.data.sendMemberHeartbeat.createdAt) { + this.lastFetchTime = result.data.sendMemberHeartbeat.createdAt; + } if (result.data.sendMemberHeartbeat.expiresAt) { this.expiresAt = result.data.sendMemberHeartbeat.expiresAt; this.startConnectionTimer(); diff --git a/test/unit/extension_mesh_v2_service.js b/test/unit/extension_mesh_v2_service.js index 364d704c17..6254284520 100644 --- a/test/unit/extension_mesh_v2_service.js +++ b/test/unit/extension_mesh_v2_service.js @@ -39,20 +39,24 @@ test('MeshV2Service Cost Tracking', t => { id: 'g1', name: 'G1', domain: 'd1', + createdAt: '2026-01-11T10:00:00Z', expiresAt: '2099-01-01T00:00:00Z', heartbeatIntervalSeconds: 60 }, joinGroup: { id: 'n1', domain: 'd1', + createdAt: '2026-01-11T10:00:00Z', expiresAt: '2099-01-01T00:00:00Z', heartbeatIntervalSeconds: 120 }, renewHeartbeat: { + createdAt: '2026-01-11T10:00:00Z', expiresAt: '2099-01-01T00:00:00Z', heartbeatIntervalSeconds: 60 }, sendMemberHeartbeat: { + createdAt: '2026-01-11T10:00:00Z', expiresAt: '2099-01-01T00:00:00Z', heartbeatIntervalSeconds: 120 } From f70f3a166882996729581895ea71d1deab537b26 Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 20:40:09 +0900 Subject: [PATCH 07/14] fix: initialize lastFetchTime on client side before group operations - Set lastFetchTime to current client time just before createGroup/joinGroup mutations - Reverted server-side createdAt dependency in GraphQL queries - Ensure stopPolling is called during cleanup to minimize requests - Updated tests to match reverted schema --- .../scratch3_mesh_v2/gql-operations.js | 3 -- .../scratch3_mesh_v2/mesh-service.js | 52 ++++++++----------- test/unit/extension_mesh_v2_service.js | 4 -- 3 files changed, 21 insertions(+), 38 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/gql-operations.js b/src/extensions/scratch3_mesh_v2/gql-operations.js index ab364d470d..0ceba8ea74 100644 --- a/src/extensions/scratch3_mesh_v2/gql-operations.js +++ b/src/extensions/scratch3_mesh_v2/gql-operations.js @@ -50,7 +50,6 @@ const JOIN_GROUP = gql` name groupId domain - createdAt expiresAt heartbeatIntervalSeconds useWebSocket @@ -89,7 +88,6 @@ const RENEW_HEARTBEAT = gql` renewHeartbeat(groupId: $groupId, domain: $domain, hostId: $hostId) { groupId domain - createdAt expiresAt heartbeatIntervalSeconds } @@ -102,7 +100,6 @@ const SEND_MEMBER_HEARTBEAT = gql` nodeId groupId domain - createdAt expiresAt heartbeatIntervalSeconds } diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 6cb36619b1..0ada746883 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -290,6 +290,8 @@ class MeshV2Service { log.info(`Mesh V2: WebSocket available: ${this.useWebSocket}`); this.costTracking.mutationCount++; + this.lastFetchTime = new Date().toISOString(); + log.info(`Mesh V2: Initialized lastFetchTime to ${this.lastFetchTime} (before createGroup)`); const result = await this.client.mutate({ mutation: CREATE_GROUP, variables: { @@ -305,8 +307,6 @@ class MeshV2Service { this.groupName = group.name; this.domain = group.domain; // Update domain from server this.expiresAt = group.expiresAt; - this.lastFetchTime = group.createdAt; - log.info(`Mesh V2: Initialized lastFetchTime to ${this.lastFetchTime} (from group.createdAt)`); this.useWebSocket = group.useWebSocket; if (group.pollingIntervalSeconds) { this.pollingIntervalSeconds = group.pollingIntervalSeconds; @@ -368,6 +368,8 @@ class MeshV2Service { try { this.costTracking.mutationCount++; + this.lastFetchTime = new Date().toISOString(); + log.info(`Mesh V2: Initialized lastFetchTime to ${this.lastFetchTime} (before joinGroup)`); const result = await this.client.mutate({ mutation: JOIN_GROUP, variables: { @@ -382,8 +384,6 @@ class MeshV2Service { this.groupName = groupName || groupId; this.domain = node.domain; // Update domain from server this.expiresAt = node.expiresAt; - this.lastFetchTime = node.createdAt; - log.info(`Mesh V2: Initialized lastFetchTime to ${this.lastFetchTime} (from node.createdAt)`); this.useWebSocket = this.forcePolling ? false : node.useWebSocket; if (node.pollingIntervalSeconds) { this.pollingIntervalSeconds = node.pollingIntervalSeconds; @@ -595,7 +595,7 @@ class MeshV2Service { clearInterval(this.pollingTimer); this.pollingTimer = null; } - this.lastFetchTime = null; + this.lastFetchTime = ''; } /** @@ -898,21 +898,20 @@ class MeshV2Service { if (!this.groupId || !this.client || !this.isHost) return; try { - this.costTracking.mutationCount++; - this.costTracking.heartbeatCount++; - const result = await this.client.mutate({ - mutation: RENEW_HEARTBEAT, - variables: { - groupId: this.groupId, - domain: this.domain, - hostId: this.meshId - } - }); - this.expiresAt = result.data.renewHeartbeat.expiresAt; - if (result.data.renewHeartbeat.createdAt) { - this.lastFetchTime = result.data.renewHeartbeat.createdAt; - } - log.info(`Mesh V2: Heartbeat renewed. Expires at: ${this.expiresAt}`); + this.costTracking.mutationCount++; + this.costTracking.heartbeatCount++; + const result = await this.client.mutate({ + mutation: RENEW_HEARTBEAT, + variables: { + groupId: this.groupId, + domain: this.domain, + hostId: this.meshId + } + }); + + this.expiresAt = result.data.renewHeartbeat.expiresAt; + log.info(`Mesh V2: Heartbeat renewed. Expires at: ${this.expiresAt}`); + if (result.data.renewHeartbeat.heartbeatIntervalSeconds) { const newInterval = result.data.renewHeartbeat.heartbeatIntervalSeconds; if (newInterval !== this.hostHeartbeatInterval) { @@ -945,21 +944,12 @@ class MeshV2Service { nodeId: this.meshId } }); + log.info('Mesh V2: Member heartbeat sent'); - if (result.data.sendMemberHeartbeat.createdAt) { - this.lastFetchTime = result.data.sendMemberHeartbeat.createdAt; - } if (result.data.sendMemberHeartbeat.expiresAt) { this.expiresAt = result.data.sendMemberHeartbeat.expiresAt; - this.startConnectionTimer(); - } - if (result.data.sendMemberHeartbeat.heartbeatIntervalSeconds) { - const newInterval = result.data.sendMemberHeartbeat.heartbeatIntervalSeconds; - if (newInterval !== this.memberHeartbeatInterval) { - this.memberHeartbeatInterval = newInterval; - this.startHeartbeat(); // Restart with new interval - } } + return result.data.sendMemberHeartbeat; } catch (error) { log.error(`Mesh V2: Member heartbeat failed: ${error}`); diff --git a/test/unit/extension_mesh_v2_service.js b/test/unit/extension_mesh_v2_service.js index 6254284520..364d704c17 100644 --- a/test/unit/extension_mesh_v2_service.js +++ b/test/unit/extension_mesh_v2_service.js @@ -39,24 +39,20 @@ test('MeshV2Service Cost Tracking', t => { id: 'g1', name: 'G1', domain: 'd1', - createdAt: '2026-01-11T10:00:00Z', expiresAt: '2099-01-01T00:00:00Z', heartbeatIntervalSeconds: 60 }, joinGroup: { id: 'n1', domain: 'd1', - createdAt: '2026-01-11T10:00:00Z', expiresAt: '2099-01-01T00:00:00Z', heartbeatIntervalSeconds: 120 }, renewHeartbeat: { - createdAt: '2026-01-11T10:00:00Z', expiresAt: '2099-01-01T00:00:00Z', heartbeatIntervalSeconds: 60 }, sendMemberHeartbeat: { - createdAt: '2026-01-11T10:00:00Z', expiresAt: '2099-01-01T00:00:00Z', heartbeatIntervalSeconds: 120 } From 4d9e3087fb2f1470b10a3861f979f5ae5e3e4d3e Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 20:47:10 +0900 Subject: [PATCH 08/14] fix: resolve TypeError in pollEvents by inlining event queuing logic - Removed hallucinated handleEvent call - Filter out self-fired events in pollEvents - Sort and queue other nodes' events with correct offsets for playback - Update lastFetchTime consistently from the latest polled event --- .../scratch3_mesh_v2/mesh-service.js | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 0ada746883..0f30ded6bb 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -626,13 +626,41 @@ class MeshV2Service { const events = result.data.getEventsSince; if (events.length > 0) { log.info(`Mesh V2: Polled ${events.length} events`); - events.forEach(event => { - this.handleEvent(event); - // Update lastFetchTime to the latest event's cursor - if (event.cursor) { - this.lastFetchTime = event.cursor; + + // Filter out events from self and sort by timestamp to preserve order + const otherEvents = events + .filter(event => event.firedByNodeId !== this.meshId) + .sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp)); + + if (otherEvents.length > 0) { + // Use the first event's timestamp as base for relative timing within this polled batch + const baseTime = new Date(otherEvents[0].timestamp).getTime(); + + otherEvents.forEach(event => { + const eventTime = new Date(event.timestamp).getTime(); + const offsetMs = eventTime - baseTime; + + this.pendingBroadcasts.push({ + event: event, + offsetMs: offsetMs + }); + log.info(`Mesh V2: Queued event: ${event.name} ` + + `(offset: ${offsetMs}ms, original timestamp: ${event.timestamp})`); + }); + + // Start playback if not already started + if (this.batchStartTime === null && this.pendingBroadcasts.length > 0) { + this.batchStartTime = Date.now(); + this.lastBroadcastOffset = 0; } - }); + } + + // ALWAYS update lastFetchTime from the LAST event in the result (including our own) + // to ensure we don't fetch the same events again. + const lastEvent = events[events.length - 1]; + if (lastEvent.cursor) { + this.lastFetchTime = lastEvent.cursor; + } } } } catch (error) { From e12003bbdea4c231873a741fd6a8f1559f330a4d Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 21:39:09 +0900 Subject: [PATCH 09/14] refactor: extract _queueEventsForPlayback to unify event queuing logic - Added _queueEventsForPlayback private method - Simplified pollEvents and handleBatchEvent by using the new method - Ensured consistent sorting, offset calculation, and playback start trigger --- .../scratch3_mesh_v2/mesh-service.js | 45 ++++++++----------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 0f30ded6bb..f54c2762d7 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -629,30 +629,10 @@ class MeshV2Service { // Filter out events from self and sort by timestamp to preserve order const otherEvents = events - .filter(event => event.firedByNodeId !== this.meshId) - .sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp)); + .filter(event => event.firedByNodeId !== this.meshId); if (otherEvents.length > 0) { - // Use the first event's timestamp as base for relative timing within this polled batch - const baseTime = new Date(otherEvents[0].timestamp).getTime(); - - otherEvents.forEach(event => { - const eventTime = new Date(event.timestamp).getTime(); - const offsetMs = eventTime - baseTime; - - this.pendingBroadcasts.push({ - event: event, - offsetMs: offsetMs - }); - log.info(`Mesh V2: Queued event: ${event.name} ` + - `(offset: ${offsetMs}ms, original timestamp: ${event.timestamp})`); - }); - - // Start playback if not already started - if (this.batchStartTime === null && this.pendingBroadcasts.length > 0) { - this.batchStartTime = Date.now(); - this.lastBroadcastOffset = 0; - } + this._queueEventsForPlayback(otherEvents); } // ALWAYS update lastFetchTime from the LAST event in the result (including our own) @@ -699,26 +679,37 @@ class MeshV2Service { log.info(`Mesh V2: Received ${events.length} events from ${batchEvent.firedByNodeId}`); - // タイムスタンプでソート - const sortedEvents = events.sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp)); + this._queueEventsForPlayback(events); + } + + /** + * Internal method to queue events for playback with relative timing. + * @param {Array} events - Array of events to queue. + * @private + */ + _queueEventsForPlayback (events) { + if (!events || events.length === 0) return; + + // タイムスタンプでソート(副作用を避けるためコピーを作成) + const sortedEvents = [...events].sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp)); // 最初のイベントを基準にオフセットを計算 const baseTime = new Date(sortedEvents[0].timestamp).getTime(); - // キューに追加(setTimeoutは使わない) + // キューに追加 sortedEvents.forEach(event => { const eventTime = new Date(event.timestamp).getTime(); const offsetMs = eventTime - baseTime; this.pendingBroadcasts.push({ event: event, - offsetMs: offsetMs // 元のタイミング情報を保持 + offsetMs: offsetMs }); log.info(`Mesh V2: Queued event: ${event.name} ` + `(offset: ${offsetMs}ms, original timestamp: ${event.timestamp})`); }); - // バッチ処理開始時刻を記録(最初のイベント追加時のみ) + // バッチ処理開始時刻を記録(未開始の場合のみ) if (this.batchStartTime === null && this.pendingBroadcasts.length > 0) { this.batchStartTime = Date.now(); this.lastBroadcastOffset = 0; From 298b01c86a2d55e45d0974b9d1bec984b296868e Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 21:46:09 +0900 Subject: [PATCH 10/14] style: fix indentation in renewHeartbeat to resolve CI failure --- .../scratch3_mesh_v2/mesh-service.js | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index f54c2762d7..4a6ac05225 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -917,20 +917,20 @@ class MeshV2Service { if (!this.groupId || !this.client || !this.isHost) return; try { - this.costTracking.mutationCount++; - this.costTracking.heartbeatCount++; - const result = await this.client.mutate({ - mutation: RENEW_HEARTBEAT, - variables: { - groupId: this.groupId, - domain: this.domain, - hostId: this.meshId - } - }); - - this.expiresAt = result.data.renewHeartbeat.expiresAt; - log.info(`Mesh V2: Heartbeat renewed. Expires at: ${this.expiresAt}`); - + this.costTracking.mutationCount++; + this.costTracking.heartbeatCount++; + const result = await this.client.mutate({ + mutation: RENEW_HEARTBEAT, + variables: { + groupId: this.groupId, + domain: this.domain, + hostId: this.meshId + } + }); + + this.expiresAt = result.data.renewHeartbeat.expiresAt; + log.info(`Mesh V2: Heartbeat renewed. Expires at: ${this.expiresAt}`); + if (result.data.renewHeartbeat.heartbeatIntervalSeconds) { const newInterval = result.data.renewHeartbeat.heartbeatIntervalSeconds; if (newInterval !== this.hostHeartbeatInterval) { From f2b817f64cd7d070dff674014a090e197849a06b Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 22:04:28 +0900 Subject: [PATCH 11/14] fix: resolve CI failures in mesh_service_v2_polling.js - Update lastFetchTime in fireEventsBatch - Check pendingBroadcasts in pollEvents test --- src/extensions/scratch3_mesh_v2/mesh-service.js | 5 ++++- test/unit/mesh_service_v2_polling.js | 11 ++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 4a6ac05225..23b14493b8 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -869,7 +869,7 @@ class MeshV2Service { } }); } else { - await this.client.mutate({ + const result = await this.client.mutate({ mutation: RECORD_EVENTS, variables: { groupId: this.groupId, @@ -878,6 +878,9 @@ class MeshV2Service { events: events } }); + if (result.data && result.data.recordEventsByNode && result.data.recordEventsByNode.nextSince) { + this.lastFetchTime = result.data.recordEventsByNode.nextSince; + } } } catch (error) { log.error(`Mesh V2: Failed to fire batch events: ${error}`); diff --git a/test/unit/mesh_service_v2_polling.js b/test/unit/mesh_service_v2_polling.js index 022d4711d9..bc77284ef9 100644 --- a/test/unit/mesh_service_v2_polling.js +++ b/test/unit/mesh_service_v2_polling.js @@ -60,17 +60,10 @@ test('MeshV2Service Polling', t => { } }; - // Spy on handleBatchEvent - let handledBatch = null; - service.handleBatchEvent = batch => { - handledBatch = batch; - }; - await service.pollEvents(); - st.ok(handledBatch); - st.equal(handledBatch.events.length, 2); - st.equal(handledBatch.events[0].name, 'e1'); + st.equal(service.pendingBroadcasts.length, 2); + st.equal(service.pendingBroadcasts[0].event.name, 'e1'); st.equal(service.lastFetchTime, 'C3'); st.end(); From 0c715b73facd6e264681e5ce18014f4abaa3a393 Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 22:16:32 +0900 Subject: [PATCH 12/14] feat: improve cost tracking and test coverage for polling - Add queryCount tracking to pollEvents - Add tests for self-event filtering and lastFetchTime fallback --- .../scratch3_mesh_v2/mesh-service.js | 1 + test/unit/mesh_service_v2_polling.js | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index 23b14493b8..ca20b09067 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -612,6 +612,7 @@ class MeshV2Service { log.debug(`Mesh V2: pollEvents for group ${this.groupId}. since=${this.lastFetchTime}`); try { + this.costTracking.queryCount++; const result = await this.client.query({ query: GET_EVENTS_SINCE, variables: { diff --git a/test/unit/mesh_service_v2_polling.js b/test/unit/mesh_service_v2_polling.js index bc77284ef9..7c5ea0fc0c 100644 --- a/test/unit/mesh_service_v2_polling.js +++ b/test/unit/mesh_service_v2_polling.js @@ -159,5 +159,62 @@ test('MeshV2Service Polling', t => { st.end(); }); + t.test('pollEvents filters out self-fired events', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.lastFetchTime = 'T1'; + + const events = [ + { + name: 'self-event', + firedByNodeId: 'node1', // self + timestamp: 'T2', + cursor: 'C2' + }, + { + name: 'other-event', + firedByNodeId: 'node2', + timestamp: 'T3', + cursor: 'C3' + } + ]; + + service.client = { + query: () => { + return Promise.resolve({data: {getEventsSince: events}}); + } + }; + + await service.pollEvents(); + + st.equal(service.pendingBroadcasts.length, 1); + st.equal(service.pendingBroadcasts[0].event.name, 'other-event'); + st.equal(service.lastFetchTime, 'C3'); // cursor still updates + st.equal(service.costTracking.queryCount, 1); + + st.end(); + }); + + t.test('pollEvents falls back to current time if lastFetchTime is empty', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.lastFetchTime = ''; // empty + + service.client = { + query: options => { + st.ok(options.variables.since); + st.ok(new Date(options.variables.since).getTime() > 0); + return Promise.resolve({data: {getEventsSince: []}}); + } + }; + + await service.pollEvents(); + st.end(); + }); + t.end(); }); From 1c524d78d34369008ba27050ff05f20f76aaa711 Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 22:20:50 +0900 Subject: [PATCH 13/14] fix: remove createdAt from LIST_GROUPS_BY_DOMAIN and CREATE_GROUP - Matched client operations with updated server schema - Ensured consistency across all group-related GraphQL operations --- src/extensions/scratch3_mesh_v2/gql-operations.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/extensions/scratch3_mesh_v2/gql-operations.js b/src/extensions/scratch3_mesh_v2/gql-operations.js index 0ceba8ea74..75cf5e264d 100644 --- a/src/extensions/scratch3_mesh_v2/gql-operations.js +++ b/src/extensions/scratch3_mesh_v2/gql-operations.js @@ -9,7 +9,6 @@ const LIST_GROUPS_BY_DOMAIN = gql` fullId name hostId - createdAt expiresAt } } @@ -34,7 +33,6 @@ const CREATE_GROUP = gql` fullId name hostId - createdAt expiresAt heartbeatIntervalSeconds useWebSocket From 3f43b3e595a6b53179a9257f0e271777d1544a42 Mon Sep 17 00:00:00 2001 From: Kouji Takao Date: Sun, 11 Jan 2026 22:24:47 +0900 Subject: [PATCH 14/14] style: fix arrow-body-style lint error in tests --- test/unit/mesh_service_v2_polling.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/unit/mesh_service_v2_polling.js b/test/unit/mesh_service_v2_polling.js index 7c5ea0fc0c..03da178e70 100644 --- a/test/unit/mesh_service_v2_polling.js +++ b/test/unit/mesh_service_v2_polling.js @@ -182,9 +182,7 @@ test('MeshV2Service Polling', t => { ]; service.client = { - query: () => { - return Promise.resolve({data: {getEventsSince: events}}); - } + query: () => Promise.resolve({data: {getEventsSince: events}}) }; await service.pollEvents();