diff --git a/Tokenization/backend/wrapper/proto/wrapper.proto b/Tokenization/backend/proto/wrapper.proto similarity index 86% rename from Tokenization/backend/wrapper/proto/wrapper.proto rename to Tokenization/backend/proto/wrapper.proto index 9cb03f258..ad2cac80a 100644 --- a/Tokenization/backend/wrapper/proto/wrapper.proto +++ b/Tokenization/backend/proto/wrapper.proto @@ -35,17 +35,18 @@ message EmptyMessage {} message Token { string token = 1; string targetAddress = 2; + ConnectionDirection connectionDirection = 3; } // Stream message that can contain one of specific messages message Payload { // Message event type MessageEvent event = 1; + // Data related to specific event type oneof data { EmptyMessage emptyMessage = 2; - Token newToken = 3; - Token revokeToken = 4; + Token payload = 3; } } @@ -63,3 +64,14 @@ enum MessageEvent { // Revoke token message type, contains a token to be revoked MESSAGE_EVENT_REVOKE_TOKEN = 2; } + +enum ConnectionDirection { + // Direction from client to server + SENDING = 1; + + // Direction from server to client + RECEIVING = 2; + + // Duplex connection, both sending and receiving + DUPLEX = 3; +} \ No newline at end of file diff --git a/Tokenization/backend/wrapper/central/CentralSystemWrapper.ts b/Tokenization/backend/wrapper/central/CentralSystemWrapper.ts deleted file mode 100644 index 95c9c7e40..000000000 --- a/Tokenization/backend/wrapper/central/CentralSystemWrapper.ts +++ /dev/null @@ -1,113 +0,0 @@ -/** - * @license - * Copyright 2019-2020 CERN and copyright holders of ALICE O2. - * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. - * All rights not expressly granted are reserved. - * - * This software is distributed under the terms of the GNU General Public - * License v3 (GPL Version 3), copied verbatim in the file "COPYING". - * - * In applying this license CERN does not waive the privileges and immunities - * granted to it by virtue of its status as an Intergovernmental Organization - * or submit itself to any jurisdiction. - */ - -import * as grpc from "@grpc/grpc-js"; -import * as protoLoader from "@grpc/proto-loader"; -import { LogManager } from "@aliceo2/web-ui"; -import { CentralSystemConfig } from "../models/config.model"; - -/** - * @description Central System gRPC wrapper that manages client connections and handles gRPC streams with them. - */ -export class CentralSystemWrapper { - // utilities - private logger = LogManager.getLogger("CentralSystemWrapper"); - - // class properties - private server: grpc.Server; - private protoPath: string; - private host: string; - private port: number; - - /** - * Initializes the Wrapper for CentralSystem. - * @param port The port number to bind the gRPC server to. - */ - constructor(config: CentralSystemConfig) { - this.protoPath = config.protoPath; - this.host = config.host || "0.0.0.0"; - this.port = config.port || 50051; - this.server = new grpc.Server(); - this.setupService(); - } - - /** - * @description Loads the gRPC proto definition and sets up the CentralSystem service. - */ - private setupService(): void { - // Load the proto definition with options - const packageDef = protoLoader.loadSync(this.protoPath, { - keepCase: true, - longs: String, - enums: String, - defaults: true, - oneofs: true, - }); - - // Load the package definition into a gRPC object - const proto = grpc.loadPackageDefinition(packageDef) as any; - const wrapper = proto.webui.tokenization; - - // Add the CentralSystem service and bind the stream handler - this.server.addService(wrapper.CentralSystem.service, { - ClientStream: this.clientStreamHandler.bind(this), - }); - } - - /** - * @description Handles the duplex stream from the client. - * @param call The duplex stream call object. - */ - private clientStreamHandler(call: grpc.ServerDuplexStream): void { - this.logger.infoMessage( - `Client ${call.getPeer()} connected to CentralSystem stream stream` - ); - - // Listen for data events from the client - call.on("data", (payload: any) => { - // TODO: Implement data handling logic - }); - - // Handle stream end event - call.on("end", () => { - this.logger.infoMessage(`Client ${call.getPeer()} ended stream.`); - call.end(); - }); - - // Handle stream error event - call.on("error", (err) => - this.logger.errorMessage( - `Stream error from client ${call.getPeer()}: ${err}` - ) - ); - } - - /** - * @desciprion Starts the gRPC server and binds it to the specified in class port. - */ - public listen() { - const addr = `${this.host}:${this.port}`; - this.server.bindAsync( - addr, - grpc.ServerCredentials.createInsecure(), - (err, _port) => { - if (err) { - this.logger.errorMessage(`Server bind error: ${err}`); - return; - } - this.logger.infoMessage(`CentralSytem started listening on ${addr}`); - } - ); - } -} diff --git a/Tokenization/backend/wrapper/client/ConnectionManager/ConnectionManager.ts b/Tokenization/backend/wrapper/client/ConnectionManager/ConnectionManager.ts deleted file mode 100644 index 8f0339f07..000000000 --- a/Tokenization/backend/wrapper/client/ConnectionManager/ConnectionManager.ts +++ /dev/null @@ -1,98 +0,0 @@ -/** - * @license - * Copyright 2019-2020 CERN and copyright holders of ALICE O2. - * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. - * All rights not expressly granted are reserved. - * - * This software is distributed under the terms of the GNU General Public - * License v3 (GPL Version 3), copied verbatim in the file "COPYING". - * - * In applying this license CERN does not waive the privileges and immunities - * granted to it by virtue of its status as an Intergovernmental Organization - * or submit itself to any jurisdiction. - */ -import * as grpc from "@grpc/grpc-js"; -import * as protoLoader from "@grpc/proto-loader"; -import { CentralConnection } from "./CentralConnection"; -import { EventDispatcher } from "../ConnectionManager/EventManagement/EventDispatcher"; -import { Connection } from "../Connection/Connection"; -import { LogManager } from "@aliceo2/web-ui"; - -/** - * - Managing a duplex stream (`stream`) for bidirectional communication. - * - Handling automatic reconnection with exponential backoff on stream errors or disconnects. - * - Providing methods to start (`connectToCentralSystem`) and stop (`disconnect`) the connection with central system. - * - * @remarks - * - `client`: The gRPC client instance for communicating with the central system. - * - `stream`: The active duplex stream for sending and receiving messages (optional). - * - `address`: The address of the central gRPC server. - * - `reconnectAttempts`: The number of consecutive reconnection attempts made after a disconnect or error. - */ -export class ConnectionManager { - // utilities - private logger = LogManager.getLogger("ConnectionManager"); - - // class properties - private client: any; - private stream?: grpc.ClientDuplexStream; - private reconnectAttempts = 0; - - private centralConnection: CentralConnection; - private sendingConnections = new Map(); - private receivingConnections = new Map(); - - /** - * @description Initializes a new instance of the ConnectionManager class. - * - * This constructor sets up the gRPC client for communication with the central system. - * - * @param protoPath - The file path to the gRPC proto definition. - * @param centralAddress - The address of the central gRPC server (default: "localhost:50051"). - */ - constructor( - protoPath: string, - private centralAddress: string = "localhost:50051" - ) { - const packageDef = protoLoader.loadSync(protoPath, { - keepCase: true, - longs: String, - enums: String, - defaults: true, - oneofs: true, - }); - - const proto = grpc.loadPackageDefinition(packageDef) as any; - const wrapper = proto.webui.tokenization; - - const client = new wrapper.CentralSystem( - centralAddress, - grpc.credentials.createInsecure() - ); - - const dispatcher = new EventDispatcher(); - this.centralConnection = new CentralConnection(client, dispatcher); - - this.sendingConnections.set("a", new Connection("1", "a")); - this.sendingConnections.set("b", new Connection("2", "b")); - // Create gRPC client - this.client = new wrapper.CentralSystem( - this.centralAddress, - grpc.credentials.createInsecure() - ); - } - - /** - * @description Starts the connection to the central system. - */ - connectToCentralSystem() { - this.centralConnection.start(); - } - - /** - * @description Disconnects from the central system. - */ - disconnectFromCentralSystem() { - this.centralConnection.disconnect(); - } -} diff --git a/Tokenization/backend/wrapper/client/ConnectionManager/EventManagement/EventDispatcher.ts b/Tokenization/backend/wrapper/client/ConnectionManager/EventManagement/EventDispatcher.ts deleted file mode 100644 index 1f9d17be0..000000000 --- a/Tokenization/backend/wrapper/client/ConnectionManager/EventManagement/EventDispatcher.ts +++ /dev/null @@ -1,37 +0,0 @@ -/** - * @license - * Copyright 2019-2020 CERN and copyright holders of ALICE O2. - * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. - * All rights not expressly granted are reserved. - * - * This software is distributed under the terms of the GNU General Public - * License v3 (GPL Version 3), copied verbatim in the file "COPYING". - * - * In applying this license CERN does not waive the privileges and immunities - * granted to it by virtue of its status as an Intergovernmental Organization - * or submit itself to any jurisdiction. - */ -import type { MessageHandler } from "../../../models/events.model"; -import { LogManager } from "@aliceo2/web-ui"; - -/** - * @description Dispatches gRPC stream events received from CentralSystem. - */ -export class EventDispatcher implements MessageHandler { - private logger = LogManager.getLogger("ConnectionManager"); - - /** - * @description Handles incoming events from the gRPC stream. - * - * @param event - The event object received from the stream. - */ - handle(event: any): void { - switch (event.event) { - case "EMPTY_EVENT": - // handle empty event - break; - default: - this.logger.infoMessage("Unknown event type:", event.event); - } - } -} diff --git a/Tokenization/backend/wrapper/jest.config.js b/Tokenization/backend/wrapper/jest.config.js deleted file mode 100644 index 3a5675ad5..000000000 --- a/Tokenization/backend/wrapper/jest.config.js +++ /dev/null @@ -1,13 +0,0 @@ -/** @type {import('jest').Config} */ -const config = { - verbose: true, - transform: { - "^.+\\.ts$": ["ts-jest", { useESM: true }], - }, - extensionsToTreatAsEsm: [".ts"], - moduleNameMapper: { - "^(\\.{1,2}/.*)\\.js$": "$1", - }, -}; - -export default config; diff --git a/Tokenization/backend/wrapper/jest.config.ts b/Tokenization/backend/wrapper/jest.config.ts new file mode 100644 index 000000000..9187fb239 --- /dev/null +++ b/Tokenization/backend/wrapper/jest.config.ts @@ -0,0 +1,13 @@ +import type { Config } from "jest"; + +const config: Config = { + preset: "ts-jest", + testEnvironment: "node", + testMatch: ["**/test/**/*.test.ts"], + moduleFileExtensions: ["ts", "js", "json"], + moduleNameMapper: { + "^@/(.*)$": "/src/$1", + }, +}; + +export default config; diff --git a/Tokenization/backend/wrapper/models/config.model.ts b/Tokenization/backend/wrapper/models/config.model.ts deleted file mode 100644 index 5f4c82a0f..000000000 --- a/Tokenization/backend/wrapper/models/config.model.ts +++ /dev/null @@ -1,29 +0,0 @@ -/** - * @license - * Copyright 2019-2020 CERN and copyright holders of ALICE O2. - * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. - * All rights not expressly granted are reserved. - * - * This software is distributed under the terms of the GNU General Public - * License v3 (GPL Version 3), copied verbatim in the file "COPYING". - * - * In applying this license CERN does not waive the privileges and immunities - * granted to it by virtue of its status as an Intergovernmental Organization - * or submit itself to any jurisdiction. - */ - -export interface CentralSystemConfig { - /** Path to the proto file defining the services. */ - protoPath: string; - /** Host/IP to bind the gRPC server on. Defaults to "0.0.0.0" which is docker-friendly. */ - host?: string; - /** Port to bind. Defaults to 50051. */ - port?: number; -} - -export interface gRPCWrapperConfig { - /** Path to the proto file defining the services. */ - protoPath: string; - /** Address of the CentralSystem server. */ - centralAddress: string; -} diff --git a/Tokenization/backend/wrapper/package.json b/Tokenization/backend/wrapper/package.json index 060701af5..7f472886a 100644 --- a/Tokenization/backend/wrapper/package.json +++ b/Tokenization/backend/wrapper/package.json @@ -1,14 +1,12 @@ { "name": "grpc-wrapper", "version": "1.0.0", - "type": "module", + "type": "commonjs", "scripts": { "test": "jest", - "build": "tsc && tsc-alias --resolve-full-paths && cp -r proto dist", - "start-client": "node dist/client/gRPCWrapper.js", - "start-central": "node dist/central/CentralSystem.js", - "dev-client": "ts-node-esm client/gRPCWrapper.ts", - "dev-central": "ts-node-esm central/CentralSystem.ts" + "build": "tsc -p tsconfig.build.json && cp -r src/proto dist", + "client": "node dist/client/gRPCWrapper.js", + "central": "node dist/central/CentralSystem.js" }, "author": "ALICEO2", "devDependencies": { diff --git a/Tokenization/backend/wrapper/src/central/CentralSystemWrapper.ts b/Tokenization/backend/wrapper/src/central/CentralSystemWrapper.ts new file mode 100644 index 000000000..a007c2ed9 --- /dev/null +++ b/Tokenization/backend/wrapper/src/central/CentralSystemWrapper.ts @@ -0,0 +1,181 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ +import * as grpc from "@grpc/grpc-js"; +import * as protoLoader from "@grpc/proto-loader"; +import { LogManager } from "@aliceo2/web-ui"; +import { DuplexMessageModel } from "../models/message.model"; + +/** + * @description Central System gRPC wrapper that manages client connections and handles gRPC streams with them. + */ +export class CentralSystemWrapper { + // utilities + private logger = LogManager.getLogger("CentralSystemWrapper"); + + // class properties + private server: grpc.Server; + + // clients management + private clients = new Map>(); + private clientIps = new Map(); // Peer -> IP map + + /** + * Initializes the Wrapper for CentralSystem. + * @param port The port number to bind the gRPC server to. + */ + constructor(private protoPath: string, private port: number) { + this.server = new grpc.Server(); + this.setupService(); + } + + /** + * @description Loads the gRPC proto definition and sets up the CentralSystem service. + */ + private setupService(): void { + // Load the proto definition with options + const packageDef = protoLoader.loadSync(this.protoPath, { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + }); + + // Load the package definition into a gRPC object + const proto = grpc.loadPackageDefinition(packageDef) as any; + const wrapper = proto.webui.tokenization; + + // Add the CentralSystem service and bind the stream handler + this.server.addService(wrapper.CentralSystem.service, { + ClientStream: this.clientStreamHandler.bind(this), + }); + } + + /** + * @description Extracts IP address from peer string + * @param peer string e.g. ipv4:127.0.0.1:12345 + * @returns Extracted IP address + */ + private extractIpFromPeer(peer: string): string { + // Context + // IPv4 format: "ipv4:127.0.0.1:12345" + // IPv6 format: "ipv6:[::1]:12345" + + const ipv4Match = peer.match(/^ipv4:(.+?):\d+$/); + if (ipv4Match) return ipv4Match[1]; + + const ipv6Match = peer.match(/^ipv6:\[(.+?)\]:\d+$/); + if (ipv6Match) return ipv6Match[1]; + + // fallback to original peer if pattern doesn't match any + return peer; + } + + /** + * @description Handles the duplex stream from the client. + * @param call The duplex stream call object. + */ + private clientStreamHandler(call: grpc.ServerDuplexStream): void { + const peer = call.getPeer(); + const clientIp = this.extractIpFromPeer(peer); + + this.logger.infoMessage( + `Client ${clientIp} (${peer}) connected to CentralSystem stream` + ); + + // Add client to maps + this.clients.set(clientIp, call); + this.clientIps.set(peer, clientIp); + + // Listen for data events from the client + call.on("data", (payload: any) => { + this.logger.infoMessage(`Received from ${clientIp}:`, payload); + }); + + // Handle stream end event + call.on("end", () => { + this.logger.infoMessage(`Client ${clientIp} ended stream.`); + this.cleanupClient(peer); + call.end(); + }); + + // Handle stream error event + call.on("error", (err) => { + this.logger.infoMessage(`Stream error from client ${clientIp}:`, err); + this.cleanupClient(peer); + }); + } + + /** + * @description Cleans up client resources + * @param peer Original peer string + */ + private cleanupClient(peer: string): void { + const clientIp = this.clientIps.get(peer); + if (clientIp) { + this.clients.delete(clientIp); + this.clientIps.delete(peer); + this.logger.infoMessage(`Cleaned up resources of ${clientIp}`); + } + } + + /** + * @description Sends data to a specific client by IP address + * @param ip Client IP address + * @param data Data to send + * @returns Whether the data was successfully sent + */ + public sendEvent(ip: string, data: DuplexMessageModel): boolean { + const client = this.clients.get(ip); + if (!client) { + this.logger.warnMessage(`Client ${ip} not found for sending event`); + return false; + } + + try { + client.write(data); + this.logger.infoMessage(`Sent event to ${ip}:`, data); + return true; + } catch (err) { + this.logger.errorMessage(`Error sending to ${ip}:`, err); + return false; + } + } + + /** + * @description Gets all connected client IPs + * @returns Array of connected client IPs + */ + public get connectedClients(): string[] { + return Array.from(this.clients.keys()); + } + + /** + * @desciprion Starts the gRPC server and binds it to the specified in class port. + */ + public listen() { + const addr = `localhost:${this.port}`; + this.server.bindAsync( + addr, + grpc.ServerCredentials.createInsecure(), + (err, _port) => { + if (err) { + this.logger.infoMessage("Server bind error:", err); + return; + } + this.logger.infoMessage(`CentralSytem started listening on ${addr}`); + } + ); + } +} diff --git a/Tokenization/backend/wrapper/client/Connection/Connection.ts b/Tokenization/backend/wrapper/src/client/Commands/revokeToken.command.ts similarity index 58% rename from Tokenization/backend/wrapper/client/Connection/Connection.ts rename to Tokenization/backend/wrapper/src/client/Commands/revokeToken.command.ts index b45dd354e..0e6668ccc 100644 --- a/Tokenization/backend/wrapper/client/Connection/Connection.ts +++ b/Tokenization/backend/wrapper/src/client/Commands/revokeToken.command.ts @@ -11,19 +11,11 @@ * granted to it by virtue of its status as an Intergovernmental Organization * or submit itself to any jurisdiction. */ -/** - * @description This class represents a connection to a target client and manages sending messages to it. - */ -export class Connection { - private token: string; - private targetAddress: string; - constructor(token: string, targetAddress: string) { - this.token = token; - this.targetAddress = targetAddress; - } +import { Command } from "../../models/commands.model"; +import { DuplexMessageEvent, TokenMessage } from "../../models/message.model"; - public handleNewToken(token: string): void { - this.token = token; - } +export class RevokeTokenCommand implements Command { + readonly event = DuplexMessageEvent.MESSAGE_EVENT_REVOKE_TOKEN; + constructor(public payload: TokenMessage) {} } diff --git a/Tokenization/backend/wrapper/src/client/Commands/revokeToken.handler.ts b/Tokenization/backend/wrapper/src/client/Commands/revokeToken.handler.ts new file mode 100644 index 000000000..1cf9d00d8 --- /dev/null +++ b/Tokenization/backend/wrapper/src/client/Commands/revokeToken.handler.ts @@ -0,0 +1,34 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ +import { CommandHandler } from "../../models/commands.model"; +import { RevokeTokenCommand } from "./revokeToken.command"; +import { ConnectionManager } from "../ConnectionManager/ConnectionManager"; + +export class RevokeTokenHandler implements CommandHandler { + constructor(private manager: ConnectionManager) {} + + async handle(command: RevokeTokenCommand): Promise { + const { targetAddress } = command.payload || {}; + if (!targetAddress) { + throw new Error("Target address is required to revoke token."); + } + + const conn = this.manager.getConnectionByAddress( + targetAddress, + command.payload.connectionDirection + ); + + conn?.handleRevokeToken(); + } +} diff --git a/Tokenization/backend/wrapper/src/client/Connection/Connection.ts b/Tokenization/backend/wrapper/src/client/Connection/Connection.ts new file mode 100644 index 000000000..1d287fd5c --- /dev/null +++ b/Tokenization/backend/wrapper/src/client/Connection/Connection.ts @@ -0,0 +1,72 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ +import { ConnectionDirection } from "../../models/message.model"; +import { ConnectionStatus } from "../../models/connection.model"; + +/** + * @description This class represents a connection to a target client and manages sending messages to it. + */ +export class Connection { + private _token: string; + private _targetAddress: string; + private _status: ConnectionStatus; + + constructor( + token: string, + targetAddress: string, + public direction: ConnectionDirection + ) { + this._token = token; + this._targetAddress = targetAddress; + + this._status = ConnectionStatus.CONNECTED; + } + + /** + * @description Replace newly generated token + * @param token New token to be replaced + */ + public set token(token: string) { + this._token = token; + } + + public handleRevokeToken(): void { + this._token = ""; + this._status = ConnectionStatus.UNAUTHORIZED; + } + + /** + * @description Returns token for this Connection object + * @returns Connection token + */ + public get token(): string { + return this._token; + } + + /** + * @description Returns status for specific + * @returns Connection status + */ + public get status(): string { + return this._status; + } + + /** + * @description Returns target address for this Connection object + * @returns Target address + */ + public get targetAddress(): string { + return this._targetAddress; + } +} diff --git a/Tokenization/backend/wrapper/client/ConnectionManager/CentralConnection.ts b/Tokenization/backend/wrapper/src/client/ConnectionManager/CentralConnection.ts similarity index 84% rename from Tokenization/backend/wrapper/client/ConnectionManager/CentralConnection.ts rename to Tokenization/backend/wrapper/src/client/ConnectionManager/CentralConnection.ts index fee5a82b5..30cc02b96 100644 --- a/Tokenization/backend/wrapper/client/ConnectionManager/CentralConnection.ts +++ b/Tokenization/backend/wrapper/src/client/ConnectionManager/CentralConnection.ts @@ -13,7 +13,8 @@ */ import * as grpc from "@grpc/grpc-js"; import { LogManager } from "@aliceo2/web-ui"; -import type { MessageHandler } from "../../models/events.model"; +import { CentralCommandDispatcher } from "./EventManagement/CentralCommandDispatcher"; +import { DuplexMessageModel } from "../../models/message.model"; /** * @description This class manages the duplex stream with the CentralSystem gRPC service. @@ -23,7 +24,10 @@ export class CentralConnection { private logger = LogManager.getLogger("CentralConnection"); private stream?: grpc.ClientDuplexStream; - constructor(private client: any, private handler: MessageHandler) {} + constructor( + private client: any, + private dispatcher: CentralCommandDispatcher + ) {} /** * @description Initializes the duplex stream and sets up event handlers. @@ -33,8 +37,9 @@ export class CentralConnection { this.stream = this.client.ClientStream(); - this.stream!.on("data", (payload) => { - this.handler.handle(payload); + this.stream!.on("data", (payload: DuplexMessageModel) => { + console.log("Received payload:", JSON.stringify(payload)); + this.dispatcher.dispatch(payload); }); this.stream!.on("end", () => { diff --git a/Tokenization/backend/wrapper/src/client/ConnectionManager/ConnectionManager.ts b/Tokenization/backend/wrapper/src/client/ConnectionManager/ConnectionManager.ts new file mode 100644 index 000000000..28457195f --- /dev/null +++ b/Tokenization/backend/wrapper/src/client/ConnectionManager/ConnectionManager.ts @@ -0,0 +1,149 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ +import * as grpc from "@grpc/grpc-js"; +import * as protoLoader from "@grpc/proto-loader"; +import { CentralConnection } from "./CentralConnection"; +import { CentralCommandDispatcher } from "./EventManagement/CentralCommandDispatcher"; +import { Connection } from "../Connection/Connection"; +import { LogManager } from "@aliceo2/web-ui"; +import { Command, CommandHandler } from "models/commands.model"; +import { + ConnectionDirection, + DuplexMessageEvent, +} from "../../models/message.model"; + +/** + * @description Manages all the connection between clients and central system. + */ +/** + * Manages the lifecycle and connection logic for a gRPC client communicating with the central system. + * + * This class is responsible for: + * - Initializing the gRPC client using the provided proto definition and address. + * - Delegating stream handling to CentralConnection. + * - Managing sending/receiving connections to other clients. + * + * @remarks + * - `centralConnection`: Handles the duplex stream with the central gRPC server. + * - `sendingConnections`: Map of active outbound connections. + * - `receivingConnections`: Map of active inbound connections. + */ +export class ConnectionManager { + private logger = LogManager.getLogger("ConnectionManager"); + private centralDispatcher: CentralCommandDispatcher; + private centralConnection: CentralConnection; + private sendingConnections = new Map(); + private receivingConnections = new Map(); + + /** + * @description Initializes a new instance of the ConnectionManager class. + * + * This constructor sets up the gRPC client for communication with the central system. + * + * @param protoPath - The file path to the gRPC proto definition. + * @param centralAddress - The address of the central gRPC server (default: "localhost:50051"). + */ + constructor(protoPath: string, centralAddress: string = "localhost:50051") { + const packageDef = protoLoader.loadSync(protoPath, { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + }); + + const proto = grpc.loadPackageDefinition(packageDef) as any; + const wrapper = proto.webui.tokenization; + + const client = new wrapper.CentralSystem( + centralAddress, + grpc.credentials.createInsecure() + ); + + this.centralDispatcher = new CentralCommandDispatcher(); + + this.centralConnection = new CentralConnection( + client, + this.centralDispatcher + ); + + this.sendingConnections.set( + "a", + new Connection("1", "a", ConnectionDirection.SENDING) + ); + this.sendingConnections.set( + "b", + new Connection("2", "b", ConnectionDirection.SENDING) + ); + } + + registerCommandHandlers( + commandHandlers: { + event: DuplexMessageEvent; + handler: CommandHandler; + }[] + ): void { + commandHandlers.forEach(({ event, handler }) => { + this.centralDispatcher.register(event, handler); + }); + } + + /** + * @description Starts the connection to the central system. + */ + connectToCentralSystem(): void { + this.centralConnection.start(); + } + + /** + * @description Disconnects from the central system. + */ + disconnectFromCentralSystem(): void { + this.centralConnection.disconnect(); + } + + /** + * @description Gets the connection instance by address. + * @returns{Connection} connection instance. + */ + getConnectionByAddress( + address: string, + direction: ConnectionDirection + ): Connection | undefined { + switch (direction) { + case ConnectionDirection.SENDING: + return this.sendingConnections.get(address); + case ConnectionDirection.RECEIVING: + return this.receivingConnections.get(address); + default: + this.logger.errorMessage(`Invalid connection direction: ${direction}`); + return undefined; + } + } + + /** + * @description Returns all saved connections. + * + * @returns An object containing the sending and receiving connections. + */ + public get connections(): { + sending: Connection[]; + receiving: Connection[]; + } { + return { + sending: [...this.sendingConnections.values()], + receiving: [...this.receivingConnections.values()], + }; + } +} diff --git a/Tokenization/backend/wrapper/src/client/ConnectionManager/EventManagement/CentralCommandDispatcher.ts b/Tokenization/backend/wrapper/src/client/ConnectionManager/EventManagement/CentralCommandDispatcher.ts new file mode 100644 index 000000000..451f275b8 --- /dev/null +++ b/Tokenization/backend/wrapper/src/client/ConnectionManager/EventManagement/CentralCommandDispatcher.ts @@ -0,0 +1,50 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +import { LogManager } from "@aliceo2/web-ui"; +import { Command, CommandHandler } from "models/commands.model"; +import { DuplexMessageEvent } from "../../../models/message.model"; + +export class CentralCommandDispatcher { + private handlers = new Map(); + private logger = LogManager.getLogger("CentralCommandDispatcher"); + + register( + event: DuplexMessageEvent, + handler: CommandHandler + ): void { + console.log(`Registering handler for command type: ${event}`); + this.handlers.set(event, handler); + } + + async dispatch(command: Command): Promise { + const handler = this.handlers.get(command.event); + this.logger.debugMessage(`Dispatching command: ${command.event}`); + if (!handler) { + this.logger.warnMessage( + `No handler registered for command type: ${command.event}` + ); + return; + } + + try { + await handler.handle(command); + } catch (error) { + this.logger.errorMessage( + `Error handling command ${command.event}:`, + error + ); + } + } +} diff --git a/Tokenization/backend/wrapper/client/gRPCWrapper.ts b/Tokenization/backend/wrapper/src/client/gRPCWrapper.ts similarity index 53% rename from Tokenization/backend/wrapper/client/gRPCWrapper.ts rename to Tokenization/backend/wrapper/src/client/gRPCWrapper.ts index 7ac5a4bae..c3c4f3225 100644 --- a/Tokenization/backend/wrapper/client/gRPCWrapper.ts +++ b/Tokenization/backend/wrapper/src/client/gRPCWrapper.ts @@ -11,9 +11,10 @@ * granted to it by virtue of its status as an Intergovernmental Organization * or submit itself to any jurisdiction. */ - -import { ConnectionManager } from "./ConnectionManager/ConnectionManager.ts"; -import { gRPCWrapperConfig } from "../models/config.model.ts"; +import { ConnectionManager } from "./ConnectionManager/ConnectionManager"; +import { RevokeTokenHandler } from "./Commands/revokeToken.handler"; +import { DuplexMessageEvent } from "../models/message.model"; +import { Connection } from "./Connection/Connection"; /** * @description Wrapper class for managing secure gRPC wrapper. @@ -38,17 +39,47 @@ export class gRPCWrapper { * @param protoPath - The file path to the gRPC proto definition. * @param centralAddress - The address of the central gRPC server (default: "localhost:50051"). */ - constructor(config: gRPCWrapperConfig) { - this.ConnectionManager = new ConnectionManager( - config.protoPath, - config.centralAddress - ); + constructor(protoPath: string, centralAddress: string = "localhost:50051") { + this.ConnectionManager = new ConnectionManager(protoPath, centralAddress); + this.ConnectionManager.registerCommandHandlers([ + { + event: DuplexMessageEvent.MESSAGE_EVENT_REVOKE_TOKEN, + handler: new RevokeTokenHandler(this.ConnectionManager), + }, + ]); } /** * @description Starts the Connection Manager stream connection with Central System */ - public connectToCentralSystem(): void { + public connectToCentralSystem() { this.ConnectionManager.connectToCentralSystem(); } + + /** + * @description Returns all saved connections. + * + * @returns An object containing the sending and receiving connections. + */ + public get connections(): { + sending: Connection[]; + receiving: Connection[]; + } { + return this.ConnectionManager.connections; + } + + public getSummary(): string { + const conn = this.ConnectionManager.connections; + return ( + `Wrapper Summary: ` + + `\nSending Connections: ${conn.sending.length}` + + `\nReceiving Connections: ${conn.receiving.length}` + + conn.sending + .map((c) => `\n- ${c.targetAddress} - ${c.direction}\n\t(${c.status})`) + .join("") + + conn.receiving + .map((c) => `\n- ${c.targetAddress} - ${c.direction}\n\t(${c.status})`) + .join("") + ); + } } diff --git a/Tokenization/backend/wrapper/models/events.model.ts b/Tokenization/backend/wrapper/src/models/commands.model.ts similarity index 76% rename from Tokenization/backend/wrapper/models/events.model.ts rename to Tokenization/backend/wrapper/src/models/commands.model.ts index ff20b71ff..874a301d4 100644 --- a/Tokenization/backend/wrapper/models/events.model.ts +++ b/Tokenization/backend/wrapper/src/models/commands.model.ts @@ -12,12 +12,20 @@ * or submit itself to any jurisdiction. */ +import { DuplexMessageEvent } from "./message.model"; + /** * Interface representing a handler for processing events. * * @remarks * The `handle` method receives an event object and performs the necessary processing. */ -export interface MessageHandler { - handle(event: any): void; + +export interface Command { + event: DuplexMessageEvent; + payload: any; +} + +export interface CommandHandler { + handle(command: T): Promise; } diff --git a/Tokenization/backend/wrapper/src/models/connection.model.ts b/Tokenization/backend/wrapper/src/models/connection.model.ts new file mode 100644 index 000000000..8bfde9da3 --- /dev/null +++ b/Tokenization/backend/wrapper/src/models/connection.model.ts @@ -0,0 +1,31 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +export enum ConnectionStatus { + // The connection is in the process of being established + CONNECTING = "CONNECTING", + // The connection has been successfully established + CONNECTED = "CONNECTED", + // The connection attempt failed due to authorization issues + // or token has expired/been revoked + UNAUTHORIZED = "UNAUTHORIZED", + // The connection has been closed + CLOSED = "CLOSED", + // An error occurred with the connection + ERROR = "ERROR", + // The connection is attempting to re-establish after a disruption + RECONNECTING = "RECONNECTING", + // The connection is refreshing its authentication token + TOKEN_REFRESH = "TOKEN_REFRESH", +} diff --git a/Tokenization/backend/wrapper/models/message.model.ts b/Tokenization/backend/wrapper/src/models/message.model.ts similarity index 60% rename from Tokenization/backend/wrapper/models/message.model.ts rename to Tokenization/backend/wrapper/src/models/message.model.ts index d74c98872..fb64a4002 100644 --- a/Tokenization/backend/wrapper/models/message.model.ts +++ b/Tokenization/backend/wrapper/src/models/message.model.ts @@ -22,10 +22,22 @@ * @property MESSAGE_EVENT_NEW_TOKEN: Event for replacing with newly generated token. * @property MESSAGE_EVENT_REVOKE_TOKEN: Event for revoking an existing token. */ -enum DuplexMessageEvent { - MESSAGE_EVENT_EMPTY = 0, - MESSAGE_EVENT_NEW_TOKEN = 1, - MESSAGE_EVENT_REVOKE_TOKEN = 2, +export enum DuplexMessageEvent { + MESSAGE_EVENT_EMPTY = "MESSAGE_EVENT_EMPTY", + MESSAGE_EVENT_NEW_TOKEN = "MESSAGE_EVENT_NEW_TOKEN", + MESSAGE_EVENT_REVOKE_TOKEN = "MESSAGE_EVENT_REVOKE_TOKEN", +} + +/** + * @enum Represents the direction of a connection in the system. + * @property SENDING: Indicates a connection where messages are sent to another client. + * @property RECEIVING: Indicates a connection where messages are received from another client. + * @property DUPLEX: Indicates a connection that can both send and receive messages. + */ +export enum ConnectionDirection { + SENDING = "SENDING", + RECEIVING = "RECEIVING", + DUPLEX = "DUPLEX", } // ====================================== @@ -37,18 +49,25 @@ enum DuplexMessageEvent { * @property {string} token - The token to be replaced or revoked. * @property {string} targetAddress - The address of connection binded to this token. */ -interface TokenMessage { - token: string; +export interface TokenMessage { + token?: string; + connectionDirection: ConnectionDirection; targetAddress: string; } /** * @description Model for duplex stream messages between client and central system. * @property {DuplexMessageEvent} event - The event type of the message. + * @property {ConnectionDirection} connectionDirection - The direction of the connection, optional for some events. * @property {TokenMessage} data - The data associated with the event, it may be undefined for some events. - * @example {event: DuplexMessageEvent.MESSAGE_EVENT_NEW_TOKEN, data: {token: '', targetAddress: ''}} + * @example + * { + * event: DuplexMessageEvent.MESSAGE_EVENT_NEW_TOKEN, + * connectionDirection: ConnectionDirection.SENDING, + * payload: {token: 'abc', targetAddress: 'localhost:50051'} + * } */ -interface DuplexMessageModel { +export interface DuplexMessageModel { event: DuplexMessageEvent; - data?: TokenMessage; + payload: TokenMessage; } diff --git a/Tokenization/backend/wrapper/test/central/CentralSystemWrapper.test.ts b/Tokenization/backend/wrapper/src/test/central/CentralSystemWrapper.test.ts similarity index 90% rename from Tokenization/backend/wrapper/test/central/CentralSystemWrapper.test.ts rename to Tokenization/backend/wrapper/src/test/central/CentralSystemWrapper.test.ts index 8f6be6203..3da27e23e 100644 --- a/Tokenization/backend/wrapper/test/central/CentralSystemWrapper.test.ts +++ b/Tokenization/backend/wrapper/src/test/central/CentralSystemWrapper.test.ts @@ -21,7 +21,6 @@ const mockServerInstance = { const logger = { infoMessage: jest.fn(), - errorMessage: jest.fn(), }; jest.mock("@aliceo2/web-ui", () => ({ @@ -64,10 +63,7 @@ describe("CentralSystemWrapper", () => { beforeEach(() => { jest.clearAllMocks(); - wrapper = new CentralSystemWrapper({ - protoPath: "dummy.proto", - port: 12345, - }); + wrapper = new CentralSystemWrapper("dummy.proto", 12345); }); test("should set up gRPC service and add it to the server", () => { @@ -83,7 +79,7 @@ describe("CentralSystemWrapper", () => { wrapper.listen(); expect(mockBindAsync).toHaveBeenCalledWith( - "0.0.0.0:12345", + "localhost:12345", "mock-credentials", expect.any(Function) ); @@ -95,8 +91,9 @@ describe("CentralSystemWrapper", () => { wrapper.listen(); - expect(logger.errorMessage).toHaveBeenCalledWith( - "Server bind error: Error: bind failed" + expect(logger.infoMessage).toHaveBeenCalledWith( + "Server bind error:", + error ); }); @@ -121,10 +118,15 @@ describe("CentralSystemWrapper", () => { expect(mockCall.end).toHaveBeenCalled(); expect(logger.infoMessage).toHaveBeenCalledWith( - "Client client123 connected to CentralSystem stream stream" + expect.stringContaining("Client client123") ); + expect(logger.infoMessage).toHaveBeenCalledWith( "Client client123 ended stream." ); + expect(logger.infoMessage).toHaveBeenCalledWith( + "Stream error from client client123:", + expect.any(Error) + ); }); }); diff --git a/Tokenization/backend/wrapper/src/test/client/Commands/revokeToken.test.ts b/Tokenization/backend/wrapper/src/test/client/Commands/revokeToken.test.ts new file mode 100644 index 000000000..f34dc68d1 --- /dev/null +++ b/Tokenization/backend/wrapper/src/test/client/Commands/revokeToken.test.ts @@ -0,0 +1,127 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +import { RevokeTokenCommand } from "../../../client/Commands/revokeToken.command"; +import { RevokeTokenHandler } from "../../../client/Commands/revokeToken.handler"; +import { Connection } from "../../../client/Connection/Connection"; +import { ConnectionManager } from "../../../client/ConnectionManager/ConnectionManager"; +import { + ConnectionDirection, + DuplexMessageEvent, +} from "../../../models/message.model"; +import { ConnectionStatus } from "../../../models/connection.model"; +import { Command } from "models/commands.model"; + +describe("RevokeToken", () => { + function createEventMessage(targetAddress: string) { + return { + event: DuplexMessageEvent.MESSAGE_EVENT_REVOKE_TOKEN, + payload: { + targetAddress: targetAddress, + token: "test-token", + }, + } as Command; + } + + let manager: ConnectionManager; + + beforeEach(() => { + manager = { + sendingConnections: new Map(), + receivingConnections: new Map(), + getConnectionByAddress: jest.fn(function (this: any, address: string) { + return ( + this.sendingConnections.get(address) || + this.receivingConnections.get(address) + ); + }), + } as unknown as ConnectionManager; + }); + + it("should revoke token when connection found in sendingConnections", async () => { + const targetAddress = "peer-123"; + const conn = new Connection( + "valid-token", + targetAddress, + ConnectionDirection.SENDING + ); + (manager as any).sendingConnections!.set(targetAddress, conn); + + const handler = new RevokeTokenHandler(manager); + const command = new RevokeTokenCommand( + createEventMessage(targetAddress).payload + ); + + await handler.handle(command); + + expect(conn.token).toBe(""); + expect(conn.status).toBe(ConnectionStatus.UNAUTHORIZED); + }); + + it("should revoke token when connection found in receivingConnections", async () => { + const targetAddress = "peer-456"; + const conn = new Connection( + "valid-token", + targetAddress, + ConnectionDirection.RECEIVING + ); + (manager as any).receivingConnections.set(targetAddress, conn); + + const handler = new RevokeTokenHandler(manager); + const command = new RevokeTokenCommand( + createEventMessage(targetAddress).payload + ); + + await handler.handle(command); + + expect(conn.token).toBe(""); + expect(conn.status).toBe(ConnectionStatus.UNAUTHORIZED); + }); + + it("should do nothing when connection not found", async () => { + const targetAddress = "non-existent"; + const handler = new RevokeTokenHandler(manager); + const command = new RevokeTokenCommand( + createEventMessage(targetAddress).payload + ); + + await expect(handler.handle(command)).resolves.toBeUndefined(); + expect(manager.getConnectionByAddress).toHaveBeenCalledWith( + targetAddress, + undefined + ); + }); + + it("should throw error when targetAddress is missing", async () => { + const invalidMessage = { + event: DuplexMessageEvent.MESSAGE_EVENT_REVOKE_TOKEN, + revokeToken: { token: "test-token" }, + }; + + const handler = new RevokeTokenHandler(manager); + const command = new RevokeTokenCommand(invalidMessage as any); + + await expect(handler.handle(command)).rejects.toThrow( + "Target address is required to revoke token." + ); + }); + + it("should create command with correct type and payload", () => { + const eventMessage = createEventMessage("peer-001"); + const command = new RevokeTokenCommand(eventMessage.payload); + + expect(command.event).toBe(DuplexMessageEvent.MESSAGE_EVENT_REVOKE_TOKEN); + expect(command).toEqual(eventMessage); + }); +}); diff --git a/Tokenization/backend/wrapper/test/client/ConnectionManager/ConnectionManager.test.ts b/Tokenization/backend/wrapper/src/test/client/ConnectionManager/ConnectionManager.test..ts similarity index 61% rename from Tokenization/backend/wrapper/test/client/ConnectionManager/ConnectionManager.test.ts rename to Tokenization/backend/wrapper/src/test/client/ConnectionManager/ConnectionManager.test..ts index 6421333ee..f60739154 100644 --- a/Tokenization/backend/wrapper/test/client/ConnectionManager/ConnectionManager.test.ts +++ b/Tokenization/backend/wrapper/src/test/client/ConnectionManager/ConnectionManager.test..ts @@ -14,18 +14,34 @@ import * as grpc from "@grpc/grpc-js"; import { ConnectionManager } from "../../../client/ConnectionManager/ConnectionManager"; -import { CentralConnection } from "../../../client/ConnectionManager/CentralConnection"; +import { DuplexMessageEvent } from "../../../models/message.model"; -// Mock of client and stream +// Mock duplex stream const mockStream = { on: jest.fn(), end: jest.fn(), }; +// Mock gRPC client const mockClient = { ClientStream: jest.fn(() => mockStream), }; +// Mock CentralSystem constructor +const CentralSystemMock = jest.fn(() => mockClient); + +// Mock dispatcher +const mockDispatch = jest.fn(); +jest.mock( + "../../../client/ConnectionManager/EventManagement/CentralCommandDispatcher", + () => ({ + CentralCommandDispatcher: jest.fn(() => ({ + dispatch: mockDispatch, + })), + }) +); + +// Mock logger jest.mock("@aliceo2/web-ui", () => ({ LogManager: { getLogger: () => ({ @@ -34,6 +50,7 @@ jest.mock("@aliceo2/web-ui", () => ({ }, })); +// Mock gRPC proto loader and client jest.mock("@grpc/proto-loader", () => ({ loadSync: jest.fn(() => { return {}; @@ -50,7 +67,7 @@ jest.mock("@grpc/grpc-js", () => { loadPackageDefinition: jest.fn(() => ({ webui: { tokenization: { - CentralSystem: jest.fn(() => mockClient), + CentralSystem: CentralSystemMock, }, }, })), @@ -68,9 +85,13 @@ describe("ConnectionManager", () => { test("should initialize client with correct address", () => { expect(conn).toBeDefined(); expect(grpc.loadPackageDefinition).toHaveBeenCalled(); + expect(CentralSystemMock).toHaveBeenCalledWith( + "localhost:12345", + undefined + ); }); - test("connectToCentralSystem() should create stream and log message", () => { + test("connectToCentralSystem() should set up stream listeners", () => { conn.connectToCentralSystem(); expect(mockClient.ClientStream).toHaveBeenCalled(); @@ -79,52 +100,59 @@ describe("ConnectionManager", () => { expect(mockStream.on).toHaveBeenCalledWith("error", expect.any(Function)); }); - test("disconnect() should end stream and reset reconnectAttempts", () => { + test("disconnectFromCentralSystem() should end stream", () => { conn.connectToCentralSystem(); conn.disconnectFromCentralSystem(); expect(mockStream.end).toHaveBeenCalled(); }); - test("scheduleReconnect() should call connect after delay", () => { - jest.useFakeTimers(); - const spy = jest.spyOn(CentralConnection.prototype, "connect"); - - const centralConnection = new CentralConnection(mockClient, {} as any); - (centralConnection as any).scheduleReconnect(); - - jest.advanceTimersByTime(1000 * 2); - expect(spy).toHaveBeenCalled(); - jest.useRealTimers(); - }); - test("should reconnect on stream 'end'", () => { + jest.useFakeTimers(); conn.connectToCentralSystem(); const onEnd = mockStream.on.mock.calls.find( ([event]) => event === "end" - )[1]; + )?.[1]; - const reconnectSpy = jest.spyOn( - CentralConnection.prototype, - "scheduleReconnect" - ); - onEnd(); + onEnd?.(); // simulate 'end' + jest.advanceTimersByTime(2000); - expect(reconnectSpy).toHaveBeenCalled(); + expect(mockClient.ClientStream).toHaveBeenCalledTimes(2); + jest.useRealTimers(); }); test("should reconnect on stream 'error'", () => { + jest.useFakeTimers(); conn.connectToCentralSystem(); const onError = mockStream.on.mock.calls.find( ([event]) => event === "error" - )[1]; + )?.[1]; - const reconnectSpy = jest.spyOn( - CentralConnection.prototype, - "scheduleReconnect" - ); - onError(new Error("Stream failed")); + onError?.(new Error("Simulated error")); + jest.advanceTimersByTime(2000); + + expect(mockClient.ClientStream).toHaveBeenCalledTimes(2); + jest.useRealTimers(); + }); + + test("should dispatch event when 'data' is received", () => { + conn.connectToCentralSystem(); + const onData = mockStream.on.mock.calls.find( + ([event]) => event === "data" + )?.[1]; + + const mockMessage = { + event: DuplexMessageEvent.MESSAGE_EVENT_REVOKE_TOKEN, + data: { + revokeToken: { + token: "abc123", + targetAddress: "peer-123", + }, + }, + }; + + onData?.(mockMessage); - expect(reconnectSpy).toHaveBeenCalled(); + expect(mockDispatch).toHaveBeenCalledWith(mockMessage); }); }); diff --git a/Tokenization/backend/wrapper/utils/types/webui.d.ts b/Tokenization/backend/wrapper/src/utils/types/webui.d.ts similarity index 100% rename from Tokenization/backend/wrapper/utils/types/webui.d.ts rename to Tokenization/backend/wrapper/src/utils/types/webui.d.ts diff --git a/Tokenization/backend/wrapper/tsconfig.build.json b/Tokenization/backend/wrapper/tsconfig.build.json new file mode 100644 index 000000000..9478b5399 --- /dev/null +++ b/Tokenization/backend/wrapper/tsconfig.build.json @@ -0,0 +1,12 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "noEmit": false, + "outDir": "dist", + "declaration": true, + "declarationMap": true, + "sourceMap": true + }, + "include": ["src/**/*.ts"], + "exclude": ["src/**/test", "node_modules"] +} diff --git a/Tokenization/backend/wrapper/tsconfig.json b/Tokenization/backend/wrapper/tsconfig.json index 17a9fe200..eda421baa 100644 --- a/Tokenization/backend/wrapper/tsconfig.json +++ b/Tokenization/backend/wrapper/tsconfig.json @@ -1,14 +1,20 @@ { "compilerOptions": { - "target": "ES2020", - "module": "ESNext", - "moduleResolution": "node", - "rootDir": "./", - "outDir": "./dist", + "target": "es2020", + "module": "CommonJS", + "moduleResolution": "Node", "esModuleInterop": true, - "forceConsistentCasingInFileNames": true, "strict": true, - "skipLibCheck": true + "skipLibCheck": true, + "resolveJsonModule": true, + "baseUrl": ".", + "paths": { + "*": ["src/*"] + }, + "noEmit": true, + "allowImportingTsExtensions": false, + "forceConsistentCasingInFileNames": true }, - "include": ["client", "central", "utils", "types"] + "include": ["src/**/*.ts", "tests/**/*.ts"], + "exclude": ["dist", "node_modules"] }