Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/matter-server/src/MatterServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ async function start() {
disableDclSeed: cliOptions.disableDclSeed,
serverId: legacyData.serverId,
serverVersion: MATTER_SERVER_VERSION,
enableTimeSync: cliOptions.enableTimeSync,
},
legacyServerData,
);
Expand Down
14 changes: 14 additions & 0 deletions packages/matter-server/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ export interface CliOptions {
disableOta: boolean;
otaProviderDir: string | null;

// Time synchronization configuration
enableTimeSync: boolean;

// Dashboard configuration
disableDashboard: boolean;
productionMode: boolean;
Expand Down Expand Up @@ -178,6 +181,16 @@ export function parseCliArgs(argv?: string[]): CliOptions {
.env("DISABLE_OTA"),
)
.addOption(new Option("--ota-provider-dir <path>", "Directory for OTA Provider files").env("OTA_PROVIDER_DIR"))
.addOption(
new Option(
"--enable-time-sync [value]",
"Enable time synchronization for nodes with the TimeSynchronization cluster. Only enable when host NTP is reliable.",
)
.argParser(parseBooleanEnv)
.preset(true)
.default(false)
.env("ENABLE_TIME_SYNC"),
)
.addOption(
new Option("--disable-dashboard [value]", "Disable the web dashboard")
.argParser(parseBooleanEnv)
Expand Down Expand Up @@ -260,6 +273,7 @@ export function parseCliArgs(argv?: string[]): CliOptions {
bluetoothAdapter: opts.bluetoothAdapter ?? null,
disableOta: opts.disableOta,
otaProviderDir: opts.otaProviderDir ?? null,
enableTimeSync: opts.enableTimeSync,
disableDashboard: opts.disableDashboard,
productionMode: opts.productionMode,
};
Expand Down
95 changes: 84 additions & 11 deletions packages/ws-controller/src/controller/ControllerCommandHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
Minutes,
NodeId,
Observable,
ObserverGroup,
Seconds,
ServerAddress,
SoftwareUpdateInfo,
Expand All @@ -36,6 +37,7 @@ import {
BridgedDeviceBasicInformation,
GeneralCommissioning,
OperationalCredentials,
TimeSynchronization,
} from "@matter/main/clusters";
import { WebRtcTransportDefinitions } from "@matter/main/clusters/web-rtc-transport-definitions";
import { WebRtcTransportProvider } from "@matter/main/clusters/web-rtc-transport-provider";
Expand Down Expand Up @@ -104,13 +106,18 @@ import { pingIp } from "../util/network.js";
import { WebRtcTransportRequestorServer } from "./behaviors/WebRtcTransportRequestorServer.js";
import { CustomClusterPoller } from "./CustomClusterPoller.js";
import { Nodes } from "./Nodes.js";
import { TimeSyncManager } from "./TimeSyncManager.js";
import { attachWebRtcCallbackBridge } from "./WebRtcCallbackBridge.js";

const logger = Logger.get("ControllerCommandHandler");

/** Grace period after leaving Connected before a node is declared unavailable. */
const RECONNECT_TIMEOUT = Minutes(3);

// timeFailure event ID within TimeSynchronization cluster (0x0038)
const TIME_SYNC_CLUSTER_ID = 0x0038;
const TIME_FAILURE_EVENT_ID = 0x03;

/**
* Cluster IDs whose attribute changes should trigger a full node_updated broadcast.
* BasicInformation (0x28) covers firmware version, product name, etc.
Expand Down Expand Up @@ -162,6 +169,10 @@ export class ControllerCommandHandler {
#availableUpdates = new Map<NodeId, SoftwareUpdateInfo>();
/** Poller for custom cluster attributes (Eve energy, etc.) */
#customClusterPoller: CustomClusterPoller;
/** Manages time synchronization for nodes with the TimeSynchronization cluster */
#timeSyncManager?: TimeSyncManager;
/** Per-node ObserverGroups for cleanup on decommission */
#nodeObservers = new Map<NodeId, ObserverGroup>();
/** Per-node timers that fire when Reconnecting state exceeds the timeout */
#reconnectTimers = new Map<NodeId, Timer>();
/** Track in-flight invoke-commands for deduplication across all WebSocket connections */
Expand All @@ -182,7 +193,12 @@ export class ControllerCommandHandler {
};
#peers?: PeerSet;

constructor(controllerInstance: CommissioningController, bleEnabled: boolean, otaEnabled: boolean) {
constructor(
controllerInstance: CommissioningController,
bleEnabled: boolean,
otaEnabled: boolean,
timeSyncEnabled = false,
) {
this.#controller = controllerInstance;

this.#bleEnabled = bleEnabled;
Expand All @@ -196,6 +212,14 @@ export class ControllerCommandHandler {
handleReadAttributes: (peer, paths, fabricFiltered) =>
this.handleReadAttributes(peer.nodeId, paths, fabricFiltered),
});

if (timeSyncEnabled) {
logger.info("Time synchronization enabled");
this.#timeSyncManager = new TimeSyncManager({
syncTime: peer => this.#syncNodeTime(peer.nodeId),
nodeConnected: peer => !!(this.#nodes.has(peer.nodeId) && this.#nodes.get(peer.nodeId).isConnected),
});
}
}

/**
Expand Down Expand Up @@ -373,12 +397,33 @@ export class ControllerCommandHandler {
return response;
}

/**
* Send a setUtcTime command to a node's TimeSynchronization cluster.
*/
async #syncNodeTime(nodeId: NodeId): Promise<void> {
Comment thread
markvp marked this conversation as resolved.
await this.#invokeCommand(this.#nodes.get(nodeId).node, {
endpoint: EndpointNumber(0),
cluster: TimeSynchronization.Cluster,
command: "setUtcTime",
fields: {
utcTime: Time.nowMs * 1000,
granularity: TimeSynchronization.Granularity.MillisecondsGranularity,
timeSource: TimeSynchronization.TimeSource.Admin,
},
});
}

async close() {
for (const timer of this.#reconnectTimers.values()) {
timer.stop();
}
this.#reconnectTimers.clear();
await this.#customClusterPoller.stop();
await this.#timeSyncManager?.stop();
for (const observers of this.#nodeObservers.values()) {
observers.close();
}
this.#nodeObservers.clear();
if (!this.#started) {
return;
}
Expand All @@ -389,30 +434,49 @@ export class ControllerCommandHandler {
const node = await this.#controller.getNode(nodeId);
const attributeCache = this.#nodes.attributeCache;

// Defer the full node_updated until the subscription batch ends (connectionAlive)
// so consumers see one update per batch rather than one per attribute.
// Per-node ObserverGroup so all subscriptions are cleaned up on decommission
const nodeObservers = new ObserverGroup();
this.#nodeObservers.set(nodeId, nodeObservers);

// Wire all Events to the Event emitters
// Track if a BasicInformation or BridgedDeviceBasicInformation attribute changed during
// a subscription batch. When the batch ends (connectionAlive), emit a full node_updated.
let basicInfoChangedInBatch = false;
node.events.attributeChanged.on(data => {
nodeObservers.on(node.events.attributeChanged, data => {
// Update the attribute cache with the new value in WebSocket format
attributeCache.updateAttribute(nodeId, data);
this.events.attributeChanged.emit(nodeId, data);
if (FULL_UPDATE_CLUSTER_IDS.has(data.path.clusterId)) {
basicInfoChangedInBatch = true;
}
});
node.events.connectionAlive.on(() => {
nodeObservers.on(node.events.connectionAlive, () => {
if (basicInfoChangedInBatch) {
basicInfoChangedInBatch = false;
logger.info(`Node ${this.formatNode(nodeId)} basic information changed, sending full node_updated`);
this.events.nodeStructureChanged.emit(nodeId);
}
});
node.events.eventTriggered.on(data => this.events.eventChanged.emit(nodeId, data));
node.events.stateChanged.on(state => {
nodeObservers.on(node.events.eventTriggered, data => {
this.events.eventChanged.emit(nodeId, data);
// Filter timeFailure events to trigger time sync
if (
this.#timeSyncManager !== undefined &&
data.path.clusterId === TIME_SYNC_CLUSTER_ID &&
data.path.eventId === TIME_FAILURE_EVENT_ID
) {
logger.debug(`Received timeFailure event from node ${this.formatNode(nodeId)}, triggering time sync`);
this.#timeSyncManager.syncNode(this.#peerOf(nodeId));
}
});
nodeObservers.on(node.events.stateChanged, state => {
// Only refresh cache on Connected state
if (state === NodeStates.Connected) {
attributeCache.update(node);
const attributes = attributeCache.get(nodeId);
if (attributes) {
this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes);
this.#timeSyncManager?.registerNode(this.#peerOf(nodeId), attributes);
}
}

Expand Down Expand Up @@ -452,7 +516,8 @@ export class ControllerCommandHandler {
this.events.nodeAvailabilityChanged.emit(nodeId, result.available);
}
});
node.events.structureChanged.on(() => {
nodeObservers.on(node.events.structureChanged, () => {
// Structure changed means endpoints may have been added/removed, refresh cache
if (node.isConnected) {
attributeCache.update(node);
}
Expand All @@ -462,12 +527,16 @@ export class ControllerCommandHandler {
this.events.nodeEndpointAdded.emit(nodeId, endpointId);
}
});
node.events.decommissioned.on(() => {
nodeObservers.on(node.events.decommissioned, () => {
this.#cleanupNodeAfterRemoval(nodeId);
this.events.nodeDecommissioned.emit(nodeId);
});
node.events.nodeEndpointAdded.on(endpointId => this.#nodes.queueEndpointAdded(nodeId, endpointId));
node.events.nodeEndpointRemoved.on(endpointId => this.events.nodeEndpointRemoved.emit(nodeId, endpointId));
nodeObservers.on(node.events.nodeEndpointAdded, endpointId =>
this.#nodes.queueEndpointAdded(nodeId, endpointId),
);
nodeObservers.on(node.events.nodeEndpointRemoved, endpointId =>
this.events.nodeEndpointRemoved.emit(nodeId, endpointId),
);

this.#nodes.set(nodeId, node);

Expand All @@ -478,6 +547,7 @@ export class ControllerCommandHandler {
const attributes = attributeCache.get(nodeId);
if (attributes) {
this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes);
this.#timeSyncManager?.registerNode(this.#peerOf(nodeId), attributes);
}
}

Expand Down Expand Up @@ -1153,8 +1223,11 @@ export class ControllerCommandHandler {
#cleanupNodeAfterRemoval(nodeId: NodeId) {
this.#reconnectTimers.get(nodeId)?.stop();
this.#reconnectTimers.delete(nodeId);
this.#nodeObservers.get(nodeId)?.close();
this.#nodeObservers.delete(nodeId);
this.#nodes.delete(nodeId);
this.#customClusterPoller.unregisterNode(this.#peerOf(nodeId));
this.#timeSyncManager?.unregisterNode(this.#peerOf(nodeId));
this.#availableUpdates.delete(nodeId);
}

Expand Down
Loading
Loading