From aad065ee5abba9191602ff4c85ac025ef0d2a19f Mon Sep 17 00:00:00 2001 From: konard Date: Sat, 17 Jan 2026 00:59:02 +0100 Subject: [PATCH 1/4] Initial commit with task details Adding CLAUDE.md with task information for AI processing. This file will be removed when the task is complete. Issue: https://github.com/link-foundation/links-queue/issues/16 --- CLAUDE.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..a941524 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,5 @@ +Issue to solve: https://github.com/link-foundation/links-queue/issues/16 +Your prepared branch: issue-16-8f27ebbd262d +Your prepared working directory: /tmp/gh-issue-solver-1768607940943 + +Proceed. From 20a50afd31b390c7bb7aea05f7f8625b2100eee9 Mon Sep 17 00:00:00 2001 From: konard Date: Sat, 17 Jan 2026 01:09:18 +0100 Subject: [PATCH 2/4] feat(js): Integrate links-notation for parsing and serialization Implement Phase 4 requirements for Links Notation integration: Protocol Notation Module: - Add LinksNotation class with parse() and stringify() methods - Add NotationParser for custom parser configurations - Add NotationStreamParser for streaming large inputs - Add NotationParseError for detailed parse error information Protocol Messages Module: - Add RequestType enum (enqueue, dequeue, ack, reject, query, etc.) - Add ResponseStatus enum (ok, error) - Add ErrorCode enum (QUEUE_EMPTY, QUEUE_NOT_FOUND, etc.) - Add Message class for protocol message representation - Add MessageBuilder class with fluent API - Add helper functions for creating request/response messages Integration: - Add links-notation ^0.13.0 as production dependency - Export all protocol types from main package entry point - Full TypeScript type definitions for all new exports - 177 unit tests including parsing, serialization, and messages Closes #16 Co-Authored-By: Claude Opus 4.5 --- js/.changeset/integrate-links-notation.md | 16 + js/package-lock.json | 9 + js/package.json | 3 + js/src/index.d.ts | 44 ++ js/src/index.js | 31 + js/src/protocol/messages.d.ts | 445 ++++++++++++++ js/src/protocol/messages.js | 698 ++++++++++++++++++++++ js/src/protocol/notation.d.ts | 317 ++++++++++ js/src/protocol/notation.js | 600 +++++++++++++++++++ js/tests/messages.test.js | 391 ++++++++++++ js/tests/notation.test.js | 281 +++++++++ 11 files changed, 2835 insertions(+) create mode 100644 js/.changeset/integrate-links-notation.md create mode 100644 js/src/protocol/messages.d.ts create mode 100644 js/src/protocol/messages.js create mode 100644 js/src/protocol/notation.d.ts create mode 100644 js/src/protocol/notation.js create mode 100644 js/tests/messages.test.js create mode 100644 js/tests/notation.test.js diff --git a/js/.changeset/integrate-links-notation.md b/js/.changeset/integrate-links-notation.md new file mode 100644 index 0000000..b3bf779 --- /dev/null +++ b/js/.changeset/integrate-links-notation.md @@ -0,0 +1,16 @@ +--- +"links-queue-js": minor +--- + +Integrate links-notation library for parsing and serialization + +- Add `links-notation` as a production dependency +- Add `LinksNotation` class with `parse()` and `stringify()` methods +- Add `NotationParser` for custom parser configurations +- Add `NotationStreamParser` for streaming large inputs +- Add `NotationParseError` for detailed parse error information +- Add protocol message types (`RequestType`, `ResponseStatus`, `ErrorCode`) +- Add `Message` and `MessageBuilder` classes for protocol communication +- Add helper functions for creating request/response messages +- Full TypeScript type definitions for all new exports +- Comprehensive unit tests for parsing, serialization, and messages diff --git a/js/package-lock.json b/js/package-lock.json index f2c528a..c72310b 100644 --- a/js/package-lock.json +++ b/js/package-lock.json @@ -8,6 +8,9 @@ "name": "links-queue-js", "version": "0.3.0", "license": "Unlicense", + "dependencies": { + "links-notation": "^0.13.0" + }, "devDependencies": { "@changesets/cli": "^2.29.7", "eslint": "^9.38.0", @@ -2612,6 +2615,12 @@ "node": ">= 0.8.0" } }, + "node_modules/links-notation": { + "version": "0.13.0", + "resolved": "https://registry.npmjs.org/links-notation/-/links-notation-0.13.0.tgz", + "integrity": "sha512-52FoUAVOhjfC23bdvrxv6b4Eosp02WeM3qTN9Rsb3ouPof5YVQ5fOMlRPMJmKMmdseVzEnewNLFq3lADRpWTFg==", + "license": "Unlicense" + }, "node_modules/lint-staged": { "version": "16.2.7", "resolved": "https://registry.npmjs.org/lint-staged/-/lint-staged-16.2.7.tgz", diff --git a/js/package.json b/js/package.json index 9322405..067b362 100644 --- a/js/package.json +++ b/js/package.json @@ -63,5 +63,8 @@ "prettier --write", "prettier --check" ] + }, + "dependencies": { + "links-notation": "^0.13.0" } } diff --git a/js/src/index.d.ts b/js/src/index.d.ts index 3c77105..cb319b2 100644 --- a/js/src/index.d.ts +++ b/js/src/index.d.ts @@ -165,3 +165,47 @@ export declare const delay: (ms: number) => Promise; // ============================================================================= export { MemoryLinkStore } from './backends/memory.d.ts'; + +// ============================================================================= +// Protocol Exports +// ============================================================================= + +export { + LinksNotation, + NotationParser, + NotationStreamParser, + NotationParseError, + ParseOptions, + StringifyOptions, + ParserOptions, + StreamParserOptions, + ParseLocation, + StreamParserEvent, + StreamParserHandler, +} from './protocol/notation.d.ts'; + +export { + Message, + MessageBuilder, + RequestType, + ResponseStatus, + ErrorCode, + RequestTypeValue, + ResponseStatusValue, + ErrorCodeValue, + MessageData, + ErrorInfo, + EnqueueOptions, + createOkResponse, + createErrorResponse, + createEnqueueRequest, + createDequeueRequest, + createPeekRequest, + createAckRequest, + createRejectRequest, + createQueryRequest, + createGetStatsRequest, + createListQueuesRequest, + createCreateQueueRequest, + createDeleteQueueRequest, +} from './protocol/messages.d.ts'; diff --git a/js/src/index.js b/js/src/index.js index 828ec8f..642bee4 100644 --- a/js/src/index.js +++ b/js/src/index.js @@ -175,3 +175,34 @@ export const delay = (ms) => // ============================================================================= export { MemoryLinkStore } from './backends/memory.js'; + +// ============================================================================= +// Protocol Exports +// ============================================================================= + +export { + LinksNotation, + NotationParser, + NotationStreamParser, + NotationParseError, +} from './protocol/notation.js'; + +export { + Message, + MessageBuilder, + RequestType, + ResponseStatus, + ErrorCode, + createOkResponse, + createErrorResponse, + createEnqueueRequest, + createDequeueRequest, + createPeekRequest, + createAckRequest, + createRejectRequest, + createQueryRequest, + createGetStatsRequest, + createListQueuesRequest, + createCreateQueueRequest, + createDeleteQueueRequest, +} from './protocol/messages.js'; diff --git a/js/src/protocol/messages.d.ts b/js/src/protocol/messages.d.ts new file mode 100644 index 0000000..41316cb --- /dev/null +++ b/js/src/protocol/messages.d.ts @@ -0,0 +1,445 @@ +/** + * Protocol message types for Links Queue communication. + * + * This module defines the message types used for inter-node communication + * and client-server interaction in Links Queue. + * + * @module protocol/messages + */ + +import { Link } from '../index.d.ts'; +import { StringifyOptions } from './notation.d.ts'; + +// ============================================================================= +// Enum Types +// ============================================================================= + +/** + * Request message types for queue operations. + */ +export declare const RequestType: { + /** Add item to queue */ + readonly ENQUEUE: 'enqueue'; + /** Remove item from queue */ + readonly DEQUEUE: 'dequeue'; + /** Acknowledge message processing */ + readonly ACK: 'ack'; + /** Reject message processing */ + readonly REJECT: 'reject'; + /** Query queue status */ + readonly QUERY: 'query'; + /** Synchronization between nodes */ + readonly SYNC: 'sync'; + /** Peek at next message without removing */ + readonly PEEK: 'peek'; + /** Create a new queue */ + readonly CREATE_QUEUE: 'create_queue'; + /** Delete a queue */ + readonly DELETE_QUEUE: 'delete_queue'; + /** List all queues */ + readonly LIST_QUEUES: 'list_queues'; + /** Get queue statistics */ + readonly GET_STATS: 'get_stats'; +}; + +/** + * Request type values. + */ +export type RequestTypeValue = (typeof RequestType)[keyof typeof RequestType]; + +/** + * Response status types. + */ +export declare const ResponseStatus: { + /** Operation succeeded */ + readonly OK: 'ok'; + /** Operation failed */ + readonly ERROR: 'error'; +}; + +/** + * Response status values. + */ +export type ResponseStatusValue = + (typeof ResponseStatus)[keyof typeof ResponseStatus]; + +/** + * Standard error codes. + */ +export declare const ErrorCode: { + /** Queue is empty */ + readonly QUEUE_EMPTY: 'QUEUE_EMPTY'; + /** Queue not found */ + readonly QUEUE_NOT_FOUND: 'QUEUE_NOT_FOUND'; + /** Queue already exists */ + readonly QUEUE_EXISTS: 'QUEUE_EXISTS'; + /** Message not found */ + readonly MESSAGE_NOT_FOUND: 'MESSAGE_NOT_FOUND'; + /** Invalid request format */ + readonly INVALID_REQUEST: 'INVALID_REQUEST'; + /** Operation not allowed */ + readonly NOT_ALLOWED: 'NOT_ALLOWED'; + /** Internal server error */ + readonly INTERNAL_ERROR: 'INTERNAL_ERROR'; + /** Request timeout */ + readonly TIMEOUT: 'TIMEOUT'; + /** Connection error */ + readonly CONNECTION_ERROR: 'CONNECTION_ERROR'; +}; + +/** + * Error code values. + */ +export type ErrorCodeValue = (typeof ErrorCode)[keyof typeof ErrorCode]; + +// ============================================================================= +// Data Types +// ============================================================================= + +/** + * Message data structure. + */ +export interface MessageData { + type?: RequestTypeValue | null; + status?: ResponseStatusValue | null; + queue?: string | null; + messageId?: string | null; + payload?: unknown; + result?: unknown; + error?: ErrorInfo | string | null; + requeue?: boolean | null; + priority?: number | null; + options?: Record | null; +} + +/** + * Error information structure. + */ +export interface ErrorInfo { + code: ErrorCodeValue | string; + message: string; +} + +/** + * Enqueue request options. + */ +export interface EnqueueOptions { + /** + * Message priority (0-9). + */ + priority?: number; + + /** + * Additional options. + */ + options?: Record; +} + +// ============================================================================= +// MessageBuilder Class +// ============================================================================= + +/** + * Builder class for creating protocol messages. + * + * Provides a fluent API for constructing request and response messages + * that can be serialized to Links Notation. + * + * @example + * const msg = new MessageBuilder() + * .type(RequestType.ENQUEUE) + * .queue('tasks') + * .payload({ action: 'process', data: '...' }) + * .build(); + */ +export declare class MessageBuilder { + /** + * Sets the message type. + * + * @param type - Request type from RequestType enum + * @returns Builder instance for chaining + */ + type(type: RequestTypeValue): this; + + /** + * Sets the target queue name. + * + * @param name - Queue name + * @returns Builder instance for chaining + */ + queue(name: string): this; + + /** + * Sets the message payload. + * + * @param payload - Message payload data + * @returns Builder instance for chaining + */ + payload(payload: unknown): this; + + /** + * Sets the message ID (for ack/reject operations). + * + * @param id - Message identifier + * @returns Builder instance for chaining + */ + messageId(id: string): this; + + /** + * Sets whether to requeue on reject. + * + * @param requeue - True to requeue + * @returns Builder instance for chaining + */ + requeue(requeue: boolean): this; + + /** + * Sets message priority. + * + * @param priority - Priority level (0-9) + * @returns Builder instance for chaining + */ + priority(priority: number): this; + + /** + * Sets message options. + * + * @param options - Additional options + * @returns Builder instance for chaining + */ + options(options: Record): this; + + /** + * Builds the message object. + * + * @returns The constructed message + */ + build(): Message; +} + +// ============================================================================= +// Message Class +// ============================================================================= + +/** + * Represents a protocol message. + * + * Messages can be converted to/from Links Notation strings for + * wire transport. + */ +export declare class Message { + /** + * Request type. + */ + readonly type: RequestTypeValue | null; + + /** + * Response status. + */ + readonly status: ResponseStatusValue | null; + + /** + * Target queue name. + */ + readonly queue: string | null; + + /** + * Message identifier. + */ + readonly messageId: string | null; + + /** + * Message payload. + */ + readonly payload: unknown; + + /** + * Response result. + */ + readonly result: unknown; + + /** + * Error information. + */ + readonly error: ErrorInfo | string | null; + + /** + * Whether to requeue on reject. + */ + readonly requeue: boolean | null; + + /** + * Message priority (0-9). + */ + readonly priority: number | null; + + /** + * Additional options. + */ + readonly options: Record | null; + + /** + * Creates a new Message. + * + * @param data - Message data + */ + constructor(data: MessageData); + + /** + * Converts the message to a Link object. + * + * @returns Link representation of message + */ + toLink(): Link; + + /** + * Converts the message to Links Notation string. + * + * @param options - Serialization options + * @returns Links Notation representation + */ + toNotation(options?: StringifyOptions): string; + + /** + * Creates a Message from a Link object. + * + * @param link - Link to parse + * @returns Parsed message + */ + static fromLink(link: Link): Message; + + /** + * Creates a Message from a Links Notation string. + * + * @param notation - Links Notation string + * @returns Parsed message + */ + static fromNotation(notation: string): Message; +} + +// ============================================================================= +// Response Builders +// ============================================================================= + +/** + * Creates a success response message. + * + * @param result - Result data + * @returns Success response message + */ +export declare function createOkResponse(result?: unknown): Message; + +/** + * Creates an error response message. + * + * @param code - Error code from ErrorCode enum + * @param message - Human-readable error message + * @returns Error response message + */ +export declare function createErrorResponse( + code: ErrorCodeValue | string, + message: string +): Message; + +// ============================================================================= +// Request Builders +// ============================================================================= + +/** + * Creates an enqueue request message. + * + * @param queue - Target queue name + * @param payload - Message payload + * @param options - Additional options + * @returns Enqueue request message + */ +export declare function createEnqueueRequest( + queue: string, + payload: unknown, + options?: EnqueueOptions +): Message; + +/** + * Creates a dequeue request message. + * + * @param queue - Target queue name + * @returns Dequeue request message + */ +export declare function createDequeueRequest(queue: string): Message; + +/** + * Creates a peek request message. + * + * @param queue - Target queue name + * @returns Peek request message + */ +export declare function createPeekRequest(queue: string): Message; + +/** + * Creates an acknowledge request message. + * + * @param queue - Target queue name + * @param messageId - ID of message to acknowledge + * @returns Acknowledge request message + */ +export declare function createAckRequest( + queue: string, + messageId: string +): Message; + +/** + * Creates a reject request message. + * + * @param queue - Target queue name + * @param messageId - ID of message to reject + * @param requeue - Whether to requeue the message + * @returns Reject request message + */ +export declare function createRejectRequest( + queue: string, + messageId: string, + requeue?: boolean +): Message; + +/** + * Creates a query request message. + * + * @param queue - Target queue name + * @returns Query request message + */ +export declare function createQueryRequest(queue: string): Message; + +/** + * Creates a get stats request message. + * + * @param queue - Target queue name + * @returns Get stats request message + */ +export declare function createGetStatsRequest(queue: string): Message; + +/** + * Creates a list queues request message. + * + * @returns List queues request message + */ +export declare function createListQueuesRequest(): Message; + +/** + * Creates a create queue request message. + * + * @param name - Queue name to create + * @param options - Queue options + * @returns Create queue request message + */ +export declare function createCreateQueueRequest( + name: string, + options?: Record +): Message; + +/** + * Creates a delete queue request message. + * + * @param name - Queue name to delete + * @returns Delete queue request message + */ +export declare function createDeleteQueueRequest(name: string): Message; diff --git a/js/src/protocol/messages.js b/js/src/protocol/messages.js new file mode 100644 index 0000000..a74979c --- /dev/null +++ b/js/src/protocol/messages.js @@ -0,0 +1,698 @@ +/** + * Protocol message types for Links Queue communication. + * + * This module defines the message types used for inter-node communication + * and client-server interaction in Links Queue. + * + * @module protocol/messages + */ + +import { LinksNotation } from './notation.js'; +import { createLink } from '../index.js'; + +// ============================================================================= +// Message Types +// ============================================================================= + +/** + * Request message types for queue operations. + * @readonly + * @enum {string} + */ +export const RequestType = Object.freeze({ + /** Add item to queue */ + ENQUEUE: 'enqueue', + /** Remove item from queue */ + DEQUEUE: 'dequeue', + /** Acknowledge message processing */ + ACK: 'ack', + /** Reject message processing */ + REJECT: 'reject', + /** Query queue status */ + QUERY: 'query', + /** Synchronization between nodes */ + SYNC: 'sync', + /** Peek at next message without removing */ + PEEK: 'peek', + /** Create a new queue */ + CREATE_QUEUE: 'create_queue', + /** Delete a queue */ + DELETE_QUEUE: 'delete_queue', + /** List all queues */ + LIST_QUEUES: 'list_queues', + /** Get queue statistics */ + GET_STATS: 'get_stats', +}); + +/** + * Response status types. + * @readonly + * @enum {string} + */ +export const ResponseStatus = Object.freeze({ + /** Operation succeeded */ + OK: 'ok', + /** Operation failed */ + ERROR: 'error', +}); + +/** + * Standard error codes. + * @readonly + * @enum {string} + */ +export const ErrorCode = Object.freeze({ + /** Queue is empty */ + QUEUE_EMPTY: 'QUEUE_EMPTY', + /** Queue not found */ + QUEUE_NOT_FOUND: 'QUEUE_NOT_FOUND', + /** Queue already exists */ + QUEUE_EXISTS: 'QUEUE_EXISTS', + /** Message not found */ + MESSAGE_NOT_FOUND: 'MESSAGE_NOT_FOUND', + /** Invalid request format */ + INVALID_REQUEST: 'INVALID_REQUEST', + /** Operation not allowed */ + NOT_ALLOWED: 'NOT_ALLOWED', + /** Internal server error */ + INTERNAL_ERROR: 'INTERNAL_ERROR', + /** Request timeout */ + TIMEOUT: 'TIMEOUT', + /** Connection error */ + CONNECTION_ERROR: 'CONNECTION_ERROR', +}); + +// ============================================================================= +// Message Builders +// ============================================================================= + +/** + * Builder class for creating protocol messages. + * + * Provides a fluent API for constructing request and response messages + * that can be serialized to Links Notation. + * + * @example + * // Build an enqueue request + * const msg = new MessageBuilder() + * .type(RequestType.ENQUEUE) + * .queue('tasks') + * .payload({ action: 'process', data: '...' }) + * .build(); + * + * // Convert to notation + * const notation = msg.toNotation(); + */ +export class MessageBuilder { + /** @type {Object} */ + #data = {}; + + /** + * Sets the message type. + * + * @param {string} type - Request type from RequestType enum + * @returns {this} Builder instance for chaining + */ + type(type) { + this.#data.type = type; + return this; + } + + /** + * Sets the target queue name. + * + * @param {string} name - Queue name + * @returns {this} Builder instance for chaining + */ + queue(name) { + this.#data.queue = name; + return this; + } + + /** + * Sets the message payload. + * + * @param {*} payload - Message payload data + * @returns {this} Builder instance for chaining + */ + payload(payload) { + this.#data.payload = payload; + return this; + } + + /** + * Sets the message ID (for ack/reject operations). + * + * @param {string} id - Message identifier + * @returns {this} Builder instance for chaining + */ + messageId(id) { + this.#data.messageId = id; + return this; + } + + /** + * Sets whether to requeue on reject. + * + * @param {boolean} requeue - True to requeue + * @returns {this} Builder instance for chaining + */ + requeue(requeue) { + this.#data.requeue = requeue; + return this; + } + + /** + * Sets message priority. + * + * @param {number} priority - Priority level (0-9) + * @returns {this} Builder instance for chaining + */ + priority(priority) { + this.#data.priority = priority; + return this; + } + + /** + * Sets message options. + * + * @param {Object} options - Additional options + * @returns {this} Builder instance for chaining + */ + options(options) { + this.#data.options = options; + return this; + } + + /** + * Builds the message object. + * + * @returns {Message} The constructed message + */ + build() { + return new Message(this.#data); + } +} + +// ============================================================================= +// Message Class +// ============================================================================= + +/** + * Represents a protocol message. + * + * Messages can be converted to/from Links Notation strings for + * wire transport. + */ +export class Message { + /** + * Creates a new Message. + * + * @param {Object} data - Message data + */ + constructor(data) { + this.type = data.type || null; + this.queue = data.queue || null; + this.payload = data.payload || null; + this.messageId = data.messageId || null; + this.requeue = data.requeue ?? null; + this.priority = data.priority ?? null; + this.options = data.options || null; + this.status = data.status || null; + this.result = data.result || null; + this.error = data.error || null; + } + + /** + * Converts the message to a Link object. + * + * @returns {import('../index.js').Link} Link representation of message + */ + toLink() { + const values = []; + + // Add type + if (this.type !== null) { + values.push(createLink(values.length, 'type', this.type)); + } + + // Add status for responses + if (this.status !== null) { + values.push(createLink(values.length, 'status', this.status)); + } + + // Add queue + if (this.queue !== null) { + values.push(createLink(values.length, 'queue', this.queue)); + } + + // Add message ID + if (this.messageId !== null) { + values.push(createLink(values.length, 'id', this.messageId)); + } + + // Add payload + if (this.payload !== null) { + const payloadLink = this.#convertPayloadToLink(this.payload); + values.push(createLink(values.length, 'payload', payloadLink)); + } + + // Add result + if (this.result !== null) { + const resultLink = this.#convertPayloadToLink(this.result); + values.push(createLink(values.length, 'result', resultLink)); + } + + // Add error + if (this.error !== null) { + if (typeof this.error === 'object') { + const errorValues = []; + if (this.error.code) { + errorValues.push( + createLink(errorValues.length, 'code', this.error.code) + ); + } + if (this.error.message) { + errorValues.push( + createLink(errorValues.length, 'message', this.error.message) + ); + } + const errorLink = createLink( + values.length, + 'error', + 'error', + errorValues + ); + values.push(errorLink); + } else { + values.push(createLink(values.length, 'error', String(this.error))); + } + } + + // Add additional fields + if (this.requeue !== null) { + values.push(createLink(values.length, 'requeue', String(this.requeue))); + } + + if (this.priority !== null) { + values.push(createLink(values.length, 'priority', String(this.priority))); + } + + if (this.options !== null) { + const optionsLink = this.#convertPayloadToLink(this.options); + values.push(createLink(values.length, 'options', optionsLink)); + } + + // Create the root message link + if (values.length === 0) { + return createLink(0, '', ''); + } + + if (values.length === 1) { + return values[0]; + } + + // Create a compound link containing all message parts + return createLink( + 0, + values[0], + values.length > 1 ? values[1] : values[0], + values.slice(2) + ); + } + + /** + * Converts a payload object to a Link structure. + * + * @param {*} payload - Payload to convert + * @returns {import('../index.js').Link|string} Converted payload + * @private + */ + #convertPayloadToLink(payload) { + if (payload === null || payload === undefined) { + return ''; + } + + if ( + typeof payload === 'string' || + typeof payload === 'number' || + typeof payload === 'boolean' + ) { + return String(payload); + } + + if (Array.isArray(payload)) { + const values = payload.map((item, i) => { + const converted = this.#convertPayloadToLink(item); + return typeof converted === 'string' + ? createLink(i, converted, converted) + : converted; + }); + return createLink( + 0, + values[0] || '', + values[1] || values[0] || '', + values.slice(2) + ); + } + + if (typeof payload === 'object') { + const entries = Object.entries(payload); + const values = entries.map(([key, value], i) => { + const converted = this.#convertPayloadToLink(value); + return createLink(i, key, converted); + }); + if (values.length === 0) { + return ''; + } + if (values.length === 1) { + return values[0]; + } + return createLink( + 0, + values[0], + values.length > 1 ? values[1] : values[0], + values.slice(2) + ); + } + + return String(payload); + } + + /** + * Converts the message to Links Notation string. + * + * @param {Object} [options] - Serialization options + * @param {boolean} [options.pretty=false] - Use pretty formatting + * @returns {string} Links Notation representation + */ + toNotation(options = {}) { + const link = this.toLink(); + return LinksNotation.stringify(link, options); + } + + /** + * Creates a Message from a Link object. + * + * @param {import('../index.js').Link} link - Link to parse + * @returns {Message} Parsed message + */ + static fromLink(link) { + const data = {}; + + // Helper to get the source value from a link (handles nested Link objects) + const getSourceValue = (l) => { + if (l === null || l === undefined) { + return null; + } + if (typeof l === 'object' && 'source' in l) { + return l.source; + } + return l; + }; + + // Helper to get the target value from a link (handles nested Link objects) + const getTargetValue = (l) => { + if (l === null || l === undefined) { + return null; + } + if (typeof l === 'object' && 'target' in l) { + return l.target; + } + return l; + }; + + // Recursively extract a field by name from a link structure + const extractField = (l, fieldName) => { + if (l === null || l === undefined) { + return undefined; + } + + // Check if this link's source matches the field name + const source = getSourceValue(l); + if (source === fieldName) { + return getTargetValue(l); + } + + // Check if source is itself a Link with matching source + if (typeof source === 'object' && source !== null && 'source' in source) { + const result = extractField(source, fieldName); + if (result !== undefined) { + return result; + } + } + + // Check if target is a Link we should search + const target = getTargetValue(l); + if (typeof target === 'object' && target !== null && 'source' in target) { + const result = extractField(target, fieldName); + if (result !== undefined) { + return result; + } + } + + // Check values array + if (l.values && Array.isArray(l.values)) { + for (const v of l.values) { + const result = extractField(v, fieldName); + if (result !== undefined) { + return result; + } + } + } + + return undefined; + }; + + data.type = extractField(link, 'type'); + data.status = extractField(link, 'status'); + data.queue = extractField(link, 'queue'); + data.messageId = extractField(link, 'id'); + data.payload = extractField(link, 'payload'); + data.result = extractField(link, 'result'); + data.error = extractField(link, 'error'); + data.requeue = extractField(link, 'requeue'); + data.priority = extractField(link, 'priority'); + + // Convert requeue to boolean + if (typeof data.requeue === 'string') { + data.requeue = data.requeue === 'true'; + } + + // Convert priority to number + if (typeof data.priority === 'string') { + data.priority = parseInt(data.priority, 10); + } + + return new Message(data); + } + + /** + * Creates a Message from a Links Notation string. + * + * @param {string} notation - Links Notation string + * @returns {Message} Parsed message + */ + static fromNotation(notation) { + const links = LinksNotation.parse(notation); + if (links.length === 0) { + return new Message({}); + } + return Message.fromLink(links[0]); + } +} + +// ============================================================================= +// Response Builders +// ============================================================================= + +/** + * Creates a success response message. + * + * @param {*} [result] - Result data + * @returns {Message} Success response message + * + * @example + * const response = createOkResponse({ id: 'abc123', position: 42 }); + */ +export function createOkResponse(result = null) { + return new Message({ + status: ResponseStatus.OK, + result, + }); +} + +/** + * Creates an error response message. + * + * @param {string} code - Error code from ErrorCode enum + * @param {string} message - Human-readable error message + * @returns {Message} Error response message + * + * @example + * const response = createErrorResponse(ErrorCode.QUEUE_EMPTY, 'No messages available'); + */ +export function createErrorResponse(code, message) { + return new Message({ + status: ResponseStatus.ERROR, + error: { code, message }, + }); +} + +// ============================================================================= +// Request Builders +// ============================================================================= + +/** + * Creates an enqueue request message. + * + * @param {string} queue - Target queue name + * @param {*} payload - Message payload + * @param {Object} [options] - Additional options + * @param {number} [options.priority] - Message priority (0-9) + * @returns {Message} Enqueue request message + * + * @example + * const request = createEnqueueRequest('tasks', { action: 'process' }); + */ +export function createEnqueueRequest(queue, payload, options = {}) { + return new Message({ + type: RequestType.ENQUEUE, + queue, + payload, + priority: options.priority ?? null, + options: options.options ?? null, + }); +} + +/** + * Creates a dequeue request message. + * + * @param {string} queue - Target queue name + * @returns {Message} Dequeue request message + * + * @example + * const request = createDequeueRequest('tasks'); + */ +export function createDequeueRequest(queue) { + return new Message({ + type: RequestType.DEQUEUE, + queue, + }); +} + +/** + * Creates a peek request message. + * + * @param {string} queue - Target queue name + * @returns {Message} Peek request message + */ +export function createPeekRequest(queue) { + return new Message({ + type: RequestType.PEEK, + queue, + }); +} + +/** + * Creates an acknowledge request message. + * + * @param {string} queue - Target queue name + * @param {string} messageId - ID of message to acknowledge + * @returns {Message} Acknowledge request message + * + * @example + * const request = createAckRequest('tasks', 'abc123'); + */ +export function createAckRequest(queue, messageId) { + return new Message({ + type: RequestType.ACK, + queue, + messageId, + }); +} + +/** + * Creates a reject request message. + * + * @param {string} queue - Target queue name + * @param {string} messageId - ID of message to reject + * @param {boolean} [requeue=false] - Whether to requeue the message + * @returns {Message} Reject request message + * + * @example + * const request = createRejectRequest('tasks', 'abc123', true); + */ +export function createRejectRequest(queue, messageId, requeue = false) { + return new Message({ + type: RequestType.REJECT, + queue, + messageId, + requeue, + }); +} + +/** + * Creates a query request message. + * + * @param {string} queue - Target queue name + * @returns {Message} Query request message + */ +export function createQueryRequest(queue) { + return new Message({ + type: RequestType.QUERY, + queue, + }); +} + +/** + * Creates a get stats request message. + * + * @param {string} queue - Target queue name + * @returns {Message} Get stats request message + */ +export function createGetStatsRequest(queue) { + return new Message({ + type: RequestType.GET_STATS, + queue, + }); +} + +/** + * Creates a list queues request message. + * + * @returns {Message} List queues request message + */ +export function createListQueuesRequest() { + return new Message({ + type: RequestType.LIST_QUEUES, + }); +} + +/** + * Creates a create queue request message. + * + * @param {string} name - Queue name to create + * @param {Object} [options] - Queue options + * @returns {Message} Create queue request message + */ +export function createCreateQueueRequest(name, options = {}) { + return new Message({ + type: RequestType.CREATE_QUEUE, + queue: name, + options, + }); +} + +/** + * Creates a delete queue request message. + * + * @param {string} name - Queue name to delete + * @returns {Message} Delete queue request message + */ +export function createDeleteQueueRequest(name) { + return new Message({ + type: RequestType.DELETE_QUEUE, + queue: name, + }); +} diff --git a/js/src/protocol/notation.d.ts b/js/src/protocol/notation.d.ts new file mode 100644 index 0000000..8701518 --- /dev/null +++ b/js/src/protocol/notation.d.ts @@ -0,0 +1,317 @@ +/** + * Links Notation integration module + * + * Provides parsing and serialization of Links Notation strings + * for protocol communication and data exchange. + * + * @module protocol/notation + */ + +import { Link, LinkRef } from '../index.d.ts'; + +// ============================================================================= +// Options Types +// ============================================================================= + +/** + * Options for parsing Links Notation. + */ +export interface ParseOptions { + /** + * Custom parser instance to use. + */ + parser?: NotationParser; +} + +/** + * Options for serializing to Links Notation. + */ +export interface StringifyOptions { + /** + * Use pretty formatting (less parentheses, indentation). + * @default false + */ + pretty?: boolean; + + /** + * Remove optional parentheses. + * @default false + */ + lessParentheses?: boolean; + + /** + * Maximum line length before wrapping. + * @default 80 + */ + maxLineLength?: number; + + /** + * Indent lines exceeding maxLineLength. + * @default false + */ + indentLongLines?: boolean; + + /** + * Group consecutive links with same ID. + * @default false + */ + groupConsecutive?: boolean; +} + +/** + * Options for creating a parser. + */ +export interface ParserOptions { + /** + * Maximum input size in bytes. + * @default 10485760 (10MB) + */ + maxInputSize?: number; + + /** + * Maximum nesting depth. + * @default 1000 + */ + maxDepth?: number; +} + +/** + * Options for stream parser. + */ +export interface StreamParserOptions { + /** + * Maximum buffer size in bytes. + * @default 10485760 (10MB) + */ + maxBufferSize?: number; +} + +/** + * Location information for parse errors. + */ +export interface ParseLocation { + /** + * Line number (1-based). + */ + line: number; + + /** + * Column number (1-based). + */ + column: number; + + /** + * Character offset from start. + */ + offset: number; +} + +// ============================================================================= +// LinksNotation Class +// ============================================================================= + +/** + * Wrapper class for Links Notation parsing and serialization. + * + * Integrates the links-notation library with links-queue's Link type, + * providing bidirectional conversion between notation strings and Link objects. + * + * @example + * // Parse notation string + * const links = LinksNotation.parse('((hello world) (foo bar))'); + * + * // Serialize to notation string + * const notation = LinksNotation.stringify(links); + * const pretty = LinksNotation.stringify(links, { pretty: true }); + */ +export declare class LinksNotation { + /** + * Parses a Links Notation string into an array of Link objects. + * + * @param input - The Links Notation string to parse + * @param options - Optional parsing options + * @returns Array of parsed Link objects + * @throws {NotationParseError} If parsing fails + * + * @example + * const links = LinksNotation.parse('(type: enqueue)'); + */ + static parse(input: string, options?: ParseOptions): Link[]; + + /** + * Serializes Link objects to a Links Notation string. + * + * @param links - Link or array of links to serialize + * @param options - Serialization options + * @returns The Links Notation string representation + * + * @example + * const str = LinksNotation.stringify(links); + * const pretty = LinksNotation.stringify(links, { pretty: true }); + */ + static stringify(links: Link | Link[], options?: StringifyOptions): string; + + /** + * Creates a parser instance with custom options. + * + * @param options - Parser configuration + * @returns A configured parser instance + */ + static createParser(options?: ParserOptions): NotationParser; + + /** + * Creates a stream parser for processing large inputs. + * + * @param options - Stream parser configuration + * @returns A stream parser instance + */ + static createStreamParser( + options?: StreamParserOptions + ): NotationStreamParser; +} + +// ============================================================================= +// NotationParser Class +// ============================================================================= + +/** + * Stateful parser for Links Notation with custom configuration. + * + * @example + * const parser = new NotationParser({ maxInputSize: 1024 * 1024 }); + * const links = parser.parse(input); + */ +export declare class NotationParser { + /** + * Creates a new NotationParser instance. + * + * @param options - Parser configuration + */ + constructor(options?: ParserOptions); + + /** + * Parses a Links Notation string. + * + * @param input - Input to parse + * @returns Parsed links + */ + parse(input: string): Link[]; +} + +// ============================================================================= +// NotationStreamParser Class +// ============================================================================= + +/** + * Stream parser event types. + */ +export type StreamParserEvent = 'link' | 'error' | 'end'; + +/** + * Stream parser event handler types. + */ +export type StreamParserHandler = E extends 'link' + ? (link: Link) => void + : E extends 'error' + ? (error: Error) => void + : E extends 'end' + ? () => void + : never; + +/** + * Stream parser for processing large Links Notation inputs incrementally. + * + * @example + * const parser = new NotationStreamParser(); + * parser.on('link', (link) => console.log('Got link:', link)); + * parser.on('error', (err) => console.error('Parse error:', err)); + * parser.on('end', () => console.log('Parsing complete')); + * + * parser.write('((type: '); + * parser.write('enqueue)'); + * parser.write(' (queue: tasks))'); + * parser.end(); + */ +export declare class NotationStreamParser { + /** + * Parser configuration options. + */ + readonly options: Required; + + /** + * Creates a new NotationStreamParser instance. + * + * @param options - Parser configuration + */ + constructor(options?: StreamParserOptions); + + /** + * Registers an event listener. + * + * @param event - Event name + * @param callback - Event handler + * @returns The parser instance for chaining + */ + on( + event: E, + callback: StreamParserHandler + ): this; + + /** + * Removes an event listener. + * + * @param event - Event name + * @param callback - Event handler to remove + * @returns The parser instance for chaining + */ + off( + event: E, + callback: StreamParserHandler + ): this; + + /** + * Writes data to the parser buffer. + * + * @param chunk - Data chunk to add + * @returns The parser instance for chaining + * @throws {Error} If parser has ended or buffer overflow + */ + write(chunk: string): this; + + /** + * Signals end of input and processes the buffer. + */ + end(): void; +} + +// ============================================================================= +// NotationParseError Class +// ============================================================================= + +/** + * Error thrown when parsing Links Notation fails. + */ +export declare class NotationParseError extends Error { + /** + * Error name. + */ + readonly name: 'NotationParseError'; + + /** + * Location of the error in the input. + */ + readonly location: ParseLocation | null; + + /** + * The underlying error that caused this error. + */ + cause?: Error; + + /** + * Creates a new NotationParseError. + * + * @param message - Error message + * @param location - Error location in input + */ + constructor(message: string, location?: ParseLocation); +} diff --git a/js/src/protocol/notation.js b/js/src/protocol/notation.js new file mode 100644 index 0000000..0a137e7 --- /dev/null +++ b/js/src/protocol/notation.js @@ -0,0 +1,600 @@ +/** + * Links Notation integration module + * + * Provides parsing and serialization of Links Notation strings + * for protocol communication and data exchange. + * + * @module protocol/notation + * @see https://github.com/link-foundation/links-notation + */ + +import { + Parser as LinoParser, + Link as LinoLink, + formatLinks, + FormatOptions as LinoFormatOptions, +} from 'links-notation'; +import { createLink, isLink, getLinkId } from '../index.js'; + +// ============================================================================= +// LinksNotation Class +// ============================================================================= + +/** + * Wrapper class for Links Notation parsing and serialization. + * + * Integrates the links-notation library with links-queue's Link type, + * providing bidirectional conversion between notation strings and Link objects. + * + * @example + * // Parse notation string + * const links = LinksNotation.parse('((hello world) (foo bar))'); + * + * // Serialize to notation string + * const notation = LinksNotation.stringify(links); + * const pretty = LinksNotation.stringify(links, { pretty: true }); + */ +export class LinksNotation { + /** + * Default parser instance (singleton). + * @type {LinoParser} + * @private + */ + static #parser = null; + + /** + * Gets or creates the default parser instance. + * @returns {LinoParser} The parser instance + * @private + */ + static #getParser() { + if (!this.#parser) { + this.#parser = new LinoParser(); + } + return this.#parser; + } + + /** + * Parses a Links Notation string into an array of Link objects. + * + * @param {string} input - The Links Notation string to parse + * @param {ParseOptions} [options] - Optional parsing options + * @returns {import('../index.js').Link[]} Array of parsed Link objects + * @throws {NotationParseError} If parsing fails + * + * @example + * // Simple reference + * const links1 = LinksNotation.parse('hello'); + * // -> [{ id: 0, source: 'hello', target: 'hello' }] + * + * // Link with values + * const links2 = LinksNotation.parse('(type: enqueue)'); + * // -> [{ id: 0, source: 'type', target: 'enqueue' }] + * + * // Nested structure + * const links3 = LinksNotation.parse('((type: enqueue) (queue: tasks))'); + */ + static parse(input, options = {}) { + if (typeof input !== 'string') { + throw new TypeError('Input must be a string'); + } + + const parser = options.parser || this.#getParser(); + + try { + const linoLinks = parser.parse(input); + return linoLinks.map((linoLink, index) => + this.#convertLinoLinkToLink(linoLink, index) + ); + } catch (error) { + const parseError = new NotationParseError( + `Failed to parse Links Notation: ${error.message}`, + error.location + ); + parseError.cause = error; + throw parseError; + } + } + + /** + * Serializes Link objects to a Links Notation string. + * + * @param {import('../index.js').Link | import('../index.js').Link[]} links - Link or array of links to serialize + * @param {StringifyOptions} [options] - Serialization options + * @returns {string} The Links Notation string representation + * + * @example + * // Compact output (default) + * const str = LinksNotation.stringify(links); + * // -> "(type: enqueue)" + * + * // Pretty output with less parentheses + * const pretty = LinksNotation.stringify(links, { pretty: true }); + * // -> "type: enqueue" + * + * // Custom formatting + * const custom = LinksNotation.stringify(links, { + * lessParentheses: true, + * maxLineLength: 40 + * }); + */ + static stringify(links, options = {}) { + const linksArray = Array.isArray(links) ? links : [links]; + const linoLinks = linksArray.map((link) => + this.#convertLinkToLinoLink(link) + ); + + const formatOpts = this.#buildFormatOptions(options); + return formatLinks(linoLinks, formatOpts); + } + + /** + * Creates a parser instance with custom options. + * + * @param {ParserOptions} [options] - Parser configuration + * @returns {NotationParser} A configured parser instance + * + * @example + * const parser = LinksNotation.createParser({ maxInputSize: 1024 * 1024 }); + * const links = parser.parse(input); + */ + static createParser(options = {}) { + return new NotationParser(options); + } + + /** + * Creates a stream parser for processing large inputs. + * + * @param {StreamParserOptions} [options] - Stream parser configuration + * @returns {NotationStreamParser} A stream parser instance + * + * @example + * const parser = LinksNotation.createStreamParser(); + * parser.on('link', (link) => console.log(link)); + * parser.write(chunk1); + * parser.write(chunk2); + * parser.end(); + */ + static createStreamParser(options = {}) { + return new NotationStreamParser(options); + } + + /** + * Converts a links-notation Link to a links-queue Link. + * + * @param {LinoLink} linoLink - The links-notation Link object + * @param {number} index - Index used for generating ID + * @returns {import('../index.js').Link} The links-queue Link object + * @private + */ + static #convertLinoLinkToLink(linoLink, index) { + if (!linoLink) { + return createLink(index, null, null); + } + + // Simple reference (id only, no values) + if (!linoLink.values || linoLink.values.length === 0) { + const id = linoLink.id || null; + return createLink(index, id, id); + } + + // Link with values - convert to source/target model + const id = linoLink.id; + const values = linoLink.values; + + if (values.length === 1) { + const firstValue = values[0]; + + // Check if this is a wrapper around a simple reference (id: null with single value) + if (id === null) { + // This is "hello" parsed as { id: null, values: [{ id: "hello", values: [] }] } + // Return as a self-referencing link: source=hello, target=hello + const ref = this.#extractValue(firstValue); + return createLink(index, ref, ref); + } + + // Named link with value: (id: value) -> source=id, target=value + const target = this.#extractValue(firstValue); + return createLink(index, id, target); + } + + if (values.length === 2) { + // Two values: (source target) or (id: source target) + const source = this.#extractValue(values[0]); + const target = this.#extractValue(values[1]); + + if (id !== null) { + // (id: source target) -> has additional values + return createLink(index, source, target, [id]); + } + return createLink(index, source, target); + } + + // More than 2 values: first two are source/target, rest are values + const source = this.#extractValue(values[0]); + const target = this.#extractValue(values[1]); + const additionalValues = + id !== null + ? [id, ...values.slice(2).map((v) => this.#extractValue(v))] + : values.slice(2).map((v) => this.#extractValue(v)); + + return createLink( + index, + source, + target, + additionalValues.length > 0 ? additionalValues : undefined + ); + } + + /** + * Extracts value from a links-notation element. + * + * @param {LinoLink|string} element - Element to extract from + * @returns {import('../index.js').LinkRef} Extracted value + * @private + */ + static #extractValue(element) { + if (!element) { + return null; + } + if (typeof element === 'string') { + return element; + } + if (element instanceof LinoLink || (element.id !== undefined && element)) { + // If it's a nested link with values, recursively convert + if (element.values && element.values.length > 0) { + return this.#convertLinoLinkToLink(element, 0); + } + // Simple reference + return element.id || null; + } + return String(element); + } + + /** + * Converts a links-queue Link to a links-notation Link. + * + * @param {import('../index.js').Link} link - The links-queue Link object + * @returns {LinoLink} The links-notation Link object + * @private + */ + static #convertLinkToLinoLink(link) { + if (!link) { + return new LinoLink(null, []); + } + + if (!isLink(link)) { + // If it's a primitive value, create a simple reference + return new LinoLink(String(link), []); + } + + const source = this.#convertRefToLinoValue(link.source); + const target = this.#convertRefToLinoValue(link.target); + + // Check if this is a self-referencing (named) link + const sourceId = getLinkId(link.source); + const targetId = getLinkId(link.target); + + if (sourceId === targetId && !link.values?.length) { + // Named link: (name: name) -> (name) + return new LinoLink(source, []); + } + + // Build values array + const values = [ + typeof source === 'string' + ? new LinoLink(source, []) + : this.#convertLinkToLinoLink(source), + typeof target === 'string' + ? new LinoLink(target, []) + : this.#convertLinkToLinoLink(target), + ]; + + // Add additional values if present + if (link.values && link.values.length > 0) { + for (const val of link.values) { + const converted = this.#convertRefToLinoValue(val); + values.push( + typeof converted === 'string' + ? new LinoLink(converted, []) + : this.#convertLinkToLinoLink(converted) + ); + } + } + + return new LinoLink(null, values); + } + + /** + * Converts a LinkRef to a value suitable for links-notation. + * + * @param {import('../index.js').LinkRef} ref - Reference to convert + * @returns {string|import('../index.js').Link} Converted value + * @private + */ + static #convertRefToLinoValue(ref) { + if (ref === null || ref === undefined) { + return ''; + } + if (isLink(ref)) { + return ref; + } + return String(ref); + } + + /** + * Builds FormatOptions from stringify options. + * + * @param {StringifyOptions} options - Stringify options + * @returns {LinoFormatOptions|boolean} Format options or boolean + * @private + */ + static #buildFormatOptions(options) { + if (options.pretty) { + return new LinoFormatOptions({ + lessParentheses: true, + indentLongLines: true, + maxLineLength: 80, + }); + } + + if ( + options.lessParentheses !== undefined || + options.maxLineLength !== undefined || + options.indentLongLines !== undefined || + options.groupConsecutive !== undefined + ) { + return new LinoFormatOptions({ + lessParentheses: options.lessParentheses ?? false, + maxLineLength: options.maxLineLength ?? 80, + indentLongLines: options.indentLongLines ?? false, + groupConsecutive: options.groupConsecutive ?? false, + }); + } + + return false; // Default: full parentheses + } +} + +// ============================================================================= +// NotationParser Class +// ============================================================================= + +/** + * Stateful parser for Links Notation with custom configuration. + * + * @example + * const parser = new NotationParser({ maxInputSize: 1024 * 1024 }); + * const links = parser.parse(input); + */ +export class NotationParser { + /** @type {LinoParser} */ + #parser; + + /** + * Creates a new NotationParser instance. + * + * @param {ParserOptions} [options] - Parser configuration + */ + constructor(options = {}) { + this.#parser = new LinoParser({ + maxInputSize: options.maxInputSize || 10 * 1024 * 1024, + maxDepth: options.maxDepth || 1000, + }); + } + + /** + * Parses a Links Notation string. + * + * @param {string} input - Input to parse + * @returns {import('../index.js').Link[]} Parsed links + */ + parse(input) { + return LinksNotation.parse(input, { parser: this.#parser }); + } +} + +// ============================================================================= +// NotationStreamParser Class +// ============================================================================= + +/** + * Stream parser for processing large Links Notation inputs incrementally. + * + * Note: This is a basic implementation. Full streaming support requires + * the underlying links-notation library to support incremental parsing. + * + * @example + * const parser = new NotationStreamParser(); + * parser.on('link', (link) => console.log('Got link:', link)); + * parser.on('error', (err) => console.error('Parse error:', err)); + * parser.on('end', () => console.log('Parsing complete')); + * + * parser.write('((type: '); + * parser.write('enqueue)'); + * parser.write(' (queue: tasks))'); + * parser.end(); + */ +export class NotationStreamParser { + /** @type {string} */ + #buffer = ''; + + /** @type {Map} */ + #listeners = new Map(); + + /** @type {boolean} */ + #ended = false; + + /** + * Creates a new NotationStreamParser instance. + * + * @param {StreamParserOptions} [options] - Parser configuration + */ + constructor(options = {}) { + this.options = { + maxBufferSize: options.maxBufferSize || 10 * 1024 * 1024, + }; + } + + /** + * Registers an event listener. + * + * @param {'link'|'error'|'end'} event - Event name + * @param {Function} callback - Event handler + * @returns {this} The parser instance for chaining + */ + on(event, callback) { + if (!this.#listeners.has(event)) { + this.#listeners.set(event, []); + } + this.#listeners.get(event).push(callback); + return this; + } + + /** + * Removes an event listener. + * + * @param {'link'|'error'|'end'} event - Event name + * @param {Function} callback - Event handler to remove + * @returns {this} The parser instance for chaining + */ + off(event, callback) { + const listeners = this.#listeners.get(event); + if (listeners) { + const index = listeners.indexOf(callback); + if (index !== -1) { + listeners.splice(index, 1); + } + } + return this; + } + + /** + * Writes data to the parser buffer. + * + * @param {string} chunk - Data chunk to add + * @returns {this} The parser instance for chaining + * @throws {Error} If parser has ended or buffer overflow + */ + write(chunk) { + if (this.#ended) { + throw new Error('Cannot write to ended stream parser'); + } + + this.#buffer += chunk; + + if (this.#buffer.length > this.options.maxBufferSize) { + const error = new Error('Buffer size exceeded maximum'); + this.#emit('error', error); + throw error; + } + + return this; + } + + /** + * Signals end of input and processes the buffer. + */ + end() { + if (this.#ended) { + return; + } + + this.#ended = true; + + try { + const links = LinksNotation.parse(this.#buffer); + for (const link of links) { + this.#emit('link', link); + } + this.#emit('end'); + } catch (error) { + this.#emit('error', error); + } + + this.#buffer = ''; + } + + /** + * Emits an event to registered listeners. + * + * @param {string} event - Event name + * @param {*} data - Event data + * @private + */ + #emit(event, data) { + const listeners = this.#listeners.get(event); + if (listeners) { + for (const listener of listeners) { + try { + listener(data); + } catch { + // Ignore listener errors + } + } + } + } +} + +// ============================================================================= +// NotationParseError Class +// ============================================================================= + +/** + * Error thrown when parsing Links Notation fails. + * + * @extends Error + */ +export class NotationParseError extends Error { + /** + * Creates a new NotationParseError. + * + * @param {string} message - Error message + * @param {Object} [location] - Error location in input + * @param {number} [location.line] - Line number (1-based) + * @param {number} [location.column] - Column number (1-based) + * @param {number} [location.offset] - Character offset + */ + constructor(message, location) { + super(message); + this.name = 'NotationParseError'; + this.location = location || null; + } +} + +// ============================================================================= +// Type Definitions (JSDoc) +// ============================================================================= + +/** + * Options for parsing Links Notation. + * + * @typedef {Object} ParseOptions + * @property {LinoParser} [parser] - Custom parser instance to use + */ + +/** + * Options for serializing to Links Notation. + * + * @typedef {Object} StringifyOptions + * @property {boolean} [pretty=false] - Use pretty formatting (less parentheses, indentation) + * @property {boolean} [lessParentheses=false] - Remove optional parentheses + * @property {number} [maxLineLength=80] - Maximum line length before wrapping + * @property {boolean} [indentLongLines=false] - Indent lines exceeding maxLineLength + * @property {boolean} [groupConsecutive=false] - Group consecutive links with same ID + */ + +/** + * Options for creating a parser. + * + * @typedef {Object} ParserOptions + * @property {number} [maxInputSize=10485760] - Maximum input size in bytes (default 10MB) + * @property {number} [maxDepth=1000] - Maximum nesting depth + */ + +/** + * Options for stream parser. + * + * @typedef {Object} StreamParserOptions + * @property {number} [maxBufferSize=10485760] - Maximum buffer size in bytes + */ diff --git a/js/tests/messages.test.js b/js/tests/messages.test.js new file mode 100644 index 0000000..121e23e --- /dev/null +++ b/js/tests/messages.test.js @@ -0,0 +1,391 @@ +/** + * Unit tests for protocol messages. + * + * Tests the creation and serialization of protocol messages. + */ + +import { describe, it } from 'node:test'; +import assert from 'node:assert/strict'; +import { + Message, + MessageBuilder, + RequestType, + ResponseStatus, + ErrorCode, + createOkResponse, + createErrorResponse, + createEnqueueRequest, + createDequeueRequest, + createPeekRequest, + createAckRequest, + createRejectRequest, + createQueryRequest, + createGetStatsRequest, + createListQueuesRequest, + createCreateQueueRequest, + createDeleteQueueRequest, +} from '../src/protocol/messages.js'; + +describe('RequestType', () => { + it('should have correct values', () => { + assert.equal(RequestType.ENQUEUE, 'enqueue'); + assert.equal(RequestType.DEQUEUE, 'dequeue'); + assert.equal(RequestType.ACK, 'ack'); + assert.equal(RequestType.REJECT, 'reject'); + assert.equal(RequestType.QUERY, 'query'); + assert.equal(RequestType.SYNC, 'sync'); + assert.equal(RequestType.PEEK, 'peek'); + assert.equal(RequestType.CREATE_QUEUE, 'create_queue'); + assert.equal(RequestType.DELETE_QUEUE, 'delete_queue'); + assert.equal(RequestType.LIST_QUEUES, 'list_queues'); + assert.equal(RequestType.GET_STATS, 'get_stats'); + }); + + it('should be frozen', () => { + assert.ok(Object.isFrozen(RequestType)); + }); +}); + +describe('ResponseStatus', () => { + it('should have correct values', () => { + assert.equal(ResponseStatus.OK, 'ok'); + assert.equal(ResponseStatus.ERROR, 'error'); + }); + + it('should be frozen', () => { + assert.ok(Object.isFrozen(ResponseStatus)); + }); +}); + +describe('ErrorCode', () => { + it('should have correct values', () => { + assert.equal(ErrorCode.QUEUE_EMPTY, 'QUEUE_EMPTY'); + assert.equal(ErrorCode.QUEUE_NOT_FOUND, 'QUEUE_NOT_FOUND'); + assert.equal(ErrorCode.QUEUE_EXISTS, 'QUEUE_EXISTS'); + assert.equal(ErrorCode.MESSAGE_NOT_FOUND, 'MESSAGE_NOT_FOUND'); + assert.equal(ErrorCode.INVALID_REQUEST, 'INVALID_REQUEST'); + assert.equal(ErrorCode.NOT_ALLOWED, 'NOT_ALLOWED'); + assert.equal(ErrorCode.INTERNAL_ERROR, 'INTERNAL_ERROR'); + assert.equal(ErrorCode.TIMEOUT, 'TIMEOUT'); + assert.equal(ErrorCode.CONNECTION_ERROR, 'CONNECTION_ERROR'); + }); + + it('should be frozen', () => { + assert.ok(Object.isFrozen(ErrorCode)); + }); +}); + +describe('MessageBuilder', () => { + it('should build a message with all fields', () => { + const msg = new MessageBuilder() + .type(RequestType.ENQUEUE) + .queue('tasks') + .payload({ action: 'process' }) + .messageId('abc123') + .requeue(true) + .priority(5) + .options({ ttl: 60000 }) + .build(); + + assert.equal(msg.type, RequestType.ENQUEUE); + assert.equal(msg.queue, 'tasks'); + assert.deepEqual(msg.payload, { action: 'process' }); + assert.equal(msg.messageId, 'abc123'); + assert.equal(msg.requeue, true); + assert.equal(msg.priority, 5); + assert.deepEqual(msg.options, { ttl: 60000 }); + }); + + it('should support chaining', () => { + const builder = new MessageBuilder(); + const result = builder.type(RequestType.ENQUEUE).queue('test'); + assert.strictEqual(result, builder); + }); + + it('should build message with minimal fields', () => { + const msg = new MessageBuilder().type(RequestType.DEQUEUE).build(); + + assert.equal(msg.type, RequestType.DEQUEUE); + assert.equal(msg.queue, null); + assert.equal(msg.payload, null); + }); +}); + +describe('Message', () => { + describe('constructor', () => { + it('should create message with data', () => { + const msg = new Message({ + type: RequestType.ENQUEUE, + queue: 'tasks', + payload: { data: 'test' }, + }); + + assert.equal(msg.type, RequestType.ENQUEUE); + assert.equal(msg.queue, 'tasks'); + assert.deepEqual(msg.payload, { data: 'test' }); + }); + + it('should handle missing fields', () => { + const msg = new Message({}); + + assert.equal(msg.type, null); + assert.equal(msg.queue, null); + assert.equal(msg.payload, null); + }); + }); + + describe('toLink', () => { + it('should convert message to link', () => { + const msg = new Message({ + type: RequestType.ENQUEUE, + queue: 'tasks', + }); + + const link = msg.toLink(); + assert.ok(link); + assert.ok('id' in link); + assert.ok('source' in link); + assert.ok('target' in link); + }); + + it('should include all message fields in link', () => { + const msg = new Message({ + type: RequestType.ENQUEUE, + queue: 'tasks', + payload: { action: 'process' }, + priority: 5, + }); + + const link = msg.toLink(); + assert.ok(link); + }); + }); + + describe('toNotation', () => { + it('should convert message to notation string', () => { + const msg = new Message({ + type: RequestType.ENQUEUE, + queue: 'tasks', + }); + + const notation = msg.toNotation(); + assert.ok(typeof notation === 'string'); + assert.ok(notation.includes('type')); + assert.ok(notation.includes('enqueue')); + }); + + it('should support pretty option', () => { + const msg = new Message({ + type: RequestType.ENQUEUE, + queue: 'tasks', + }); + + const notation = msg.toNotation({ pretty: true }); + assert.ok(typeof notation === 'string'); + }); + }); + + describe('fromLink', () => { + it('should create message from link', () => { + const link = { id: 0, source: 'type', target: 'enqueue' }; + const msg = Message.fromLink(link); + + assert.equal(msg.type, 'enqueue'); + }); + }); + + describe('fromNotation', () => { + it('should create message from notation string', () => { + const notation = '(type: enqueue)'; + const msg = Message.fromNotation(notation); + + assert.equal(msg.type, 'enqueue'); + }); + + it('should handle empty input', () => { + const msg = Message.fromNotation(''); + assert.ok(msg instanceof Message); + }); + }); +}); + +describe('Response builders', () => { + describe('createOkResponse', () => { + it('should create ok response without result', () => { + const response = createOkResponse(); + + assert.equal(response.status, ResponseStatus.OK); + assert.equal(response.result, null); + }); + + it('should create ok response with result', () => { + const result = { id: 'abc123', position: 42 }; + const response = createOkResponse(result); + + assert.equal(response.status, ResponseStatus.OK); + assert.deepEqual(response.result, result); + }); + }); + + describe('createErrorResponse', () => { + it('should create error response', () => { + const response = createErrorResponse( + ErrorCode.QUEUE_EMPTY, + 'No messages available' + ); + + assert.equal(response.status, ResponseStatus.ERROR); + assert.deepEqual(response.error, { + code: ErrorCode.QUEUE_EMPTY, + message: 'No messages available', + }); + }); + }); +}); + +describe('Request builders', () => { + describe('createEnqueueRequest', () => { + it('should create enqueue request', () => { + const request = createEnqueueRequest('tasks', { action: 'process' }); + + assert.equal(request.type, RequestType.ENQUEUE); + assert.equal(request.queue, 'tasks'); + assert.deepEqual(request.payload, { action: 'process' }); + }); + + it('should support options', () => { + const request = createEnqueueRequest( + 'tasks', + { action: 'process' }, + { priority: 5 } + ); + + assert.equal(request.priority, 5); + }); + }); + + describe('createDequeueRequest', () => { + it('should create dequeue request', () => { + const request = createDequeueRequest('tasks'); + + assert.equal(request.type, RequestType.DEQUEUE); + assert.equal(request.queue, 'tasks'); + }); + }); + + describe('createPeekRequest', () => { + it('should create peek request', () => { + const request = createPeekRequest('tasks'); + + assert.equal(request.type, RequestType.PEEK); + assert.equal(request.queue, 'tasks'); + }); + }); + + describe('createAckRequest', () => { + it('should create ack request', () => { + const request = createAckRequest('tasks', 'abc123'); + + assert.equal(request.type, RequestType.ACK); + assert.equal(request.queue, 'tasks'); + assert.equal(request.messageId, 'abc123'); + }); + }); + + describe('createRejectRequest', () => { + it('should create reject request without requeue', () => { + const request = createRejectRequest('tasks', 'abc123'); + + assert.equal(request.type, RequestType.REJECT); + assert.equal(request.queue, 'tasks'); + assert.equal(request.messageId, 'abc123'); + assert.equal(request.requeue, false); + }); + + it('should create reject request with requeue', () => { + const request = createRejectRequest('tasks', 'abc123', true); + + assert.equal(request.requeue, true); + }); + }); + + describe('createQueryRequest', () => { + it('should create query request', () => { + const request = createQueryRequest('tasks'); + + assert.equal(request.type, RequestType.QUERY); + assert.equal(request.queue, 'tasks'); + }); + }); + + describe('createGetStatsRequest', () => { + it('should create get stats request', () => { + const request = createGetStatsRequest('tasks'); + + assert.equal(request.type, RequestType.GET_STATS); + assert.equal(request.queue, 'tasks'); + }); + }); + + describe('createListQueuesRequest', () => { + it('should create list queues request', () => { + const request = createListQueuesRequest(); + + assert.equal(request.type, RequestType.LIST_QUEUES); + }); + }); + + describe('createCreateQueueRequest', () => { + it('should create create queue request', () => { + const request = createCreateQueueRequest('new-queue'); + + assert.equal(request.type, RequestType.CREATE_QUEUE); + assert.equal(request.queue, 'new-queue'); + }); + + it('should support options', () => { + const request = createCreateQueueRequest('new-queue', { maxSize: 1000 }); + + assert.deepEqual(request.options, { maxSize: 1000 }); + }); + }); + + describe('createDeleteQueueRequest', () => { + it('should create delete queue request', () => { + const request = createDeleteQueueRequest('old-queue'); + + assert.equal(request.type, RequestType.DELETE_QUEUE); + assert.equal(request.queue, 'old-queue'); + }); + }); +}); + +describe('Message round-trip', () => { + it('should round-trip enqueue request through notation', () => { + const original = createEnqueueRequest('tasks', { action: 'process' }); + const notation = original.toNotation(); + const restored = Message.fromNotation(notation); + + assert.equal(restored.type, original.type); + assert.equal(restored.queue, original.queue); + }); + + it('should round-trip ok response through notation', () => { + const original = createOkResponse({ id: 'abc123' }); + const notation = original.toNotation(); + const restored = Message.fromNotation(notation); + + assert.equal(restored.status, original.status); + }); + + it('should round-trip error response through notation', () => { + const original = createErrorResponse( + ErrorCode.QUEUE_EMPTY, + 'No messages available' + ); + const notation = original.toNotation(); + const restored = Message.fromNotation(notation); + + assert.equal(restored.status, original.status); + }); +}); diff --git a/js/tests/notation.test.js b/js/tests/notation.test.js new file mode 100644 index 0000000..06d6cf7 --- /dev/null +++ b/js/tests/notation.test.js @@ -0,0 +1,281 @@ +/** + * Unit tests for Links Notation integration. + * + * Tests the parsing and serialization of Links Notation strings. + */ + +import { describe, it } from 'node:test'; +import assert from 'node:assert/strict'; +import { + LinksNotation, + NotationParser, + NotationStreamParser, + NotationParseError, +} from '../src/protocol/notation.js'; + +describe('LinksNotation', () => { + describe('parse', () => { + it('should parse a simple reference', () => { + const links = LinksNotation.parse('hello'); + assert.equal(links.length, 1); + assert.equal(links[0].source, 'hello'); + assert.equal(links[0].target, 'hello'); + }); + + it('should parse a link with id and value', () => { + const links = LinksNotation.parse('(type: enqueue)'); + assert.equal(links.length, 1); + assert.equal(links[0].source, 'type'); + assert.equal(links[0].target, 'enqueue'); + }); + + it('should parse space-separated values as a link', () => { + // In links-notation, "hello world" forms a single link with source=hello, target=world + const links = LinksNotation.parse('hello world'); + assert.equal(links.length, 1); + assert.equal(links[0].source, 'hello'); + assert.equal(links[0].target, 'world'); + }); + + it('should parse parenthesized links', () => { + // (hello) (world) forms a single link containing two nested self-referencing links + const links = LinksNotation.parse('(hello) (world)'); + assert.equal(links.length, 1); + // The source and target are nested Link objects + assert.ok(typeof links[0].source === 'object'); + assert.ok(typeof links[0].target === 'object'); + }); + + it('should parse links with multiple values', () => { + const links = LinksNotation.parse('(id: value1 value2)'); + assert.equal(links.length, 1); + // The first two values become source/target + assert.equal(links[0].source, 'value1'); + assert.equal(links[0].target, 'value2'); + }); + + it('should handle nested structures', () => { + const links = LinksNotation.parse('((hello world))'); + assert.equal(links.length, 1); + }); + + it('should throw NotationParseError on invalid input', () => { + assert.throws( + () => LinksNotation.parse('((unclosed'), + (err) => err instanceof NotationParseError + ); + }); + + it('should throw TypeError on non-string input', () => { + assert.throws(() => LinksNotation.parse(123), TypeError); + assert.throws(() => LinksNotation.parse(null), TypeError); + assert.throws(() => LinksNotation.parse(undefined), TypeError); + }); + + it('should parse quoted strings', () => { + const links = LinksNotation.parse('"hello world"'); + assert.equal(links.length, 1); + assert.equal(links[0].source, 'hello world'); + }); + + it('should parse empty input as empty array', () => { + const links = LinksNotation.parse(''); + assert.equal(links.length, 0); + }); + }); + + describe('stringify', () => { + it('should serialize a simple link', () => { + const link = { id: 0, source: 'hello', target: 'hello' }; + const notation = LinksNotation.stringify(link); + // Result should contain 'hello' + assert.ok(notation.includes('hello')); + }); + + it('should serialize a link with different source and target', () => { + const link = { id: 0, source: 'type', target: 'enqueue' }; + const notation = LinksNotation.stringify(link); + assert.ok(notation.includes('type')); + assert.ok(notation.includes('enqueue')); + }); + + it('should serialize an array of links', () => { + const links = [ + { id: 0, source: 'hello', target: 'hello' }, + { id: 1, source: 'world', target: 'world' }, + ]; + const notation = LinksNotation.stringify(links); + assert.ok(notation.includes('hello')); + assert.ok(notation.includes('world')); + }); + + it('should support pretty option', () => { + const link = { id: 0, source: 'type', target: 'enqueue' }; + const notation = LinksNotation.stringify(link, { pretty: true }); + // Pretty output should have less parentheses + assert.ok(typeof notation === 'string'); + }); + + it('should support lessParentheses option', () => { + const link = { id: 0, source: 'type', target: 'enqueue' }; + const notation = LinksNotation.stringify(link, { lessParentheses: true }); + assert.ok(typeof notation === 'string'); + }); + + it('should handle null/undefined gracefully', () => { + const link = { id: 0, source: null, target: null }; + const notation = LinksNotation.stringify(link); + assert.ok(typeof notation === 'string'); + }); + + it('should handle links with values', () => { + const link = { id: 0, source: 'a', target: 'b', values: ['c', 'd'] }; + const notation = LinksNotation.stringify(link); + assert.ok(typeof notation === 'string'); + }); + }); + + describe('createParser', () => { + it('should create a parser with default options', () => { + const parser = LinksNotation.createParser(); + assert.ok(parser instanceof NotationParser); + }); + + it('should create a parser with custom options', () => { + const parser = LinksNotation.createParser({ + maxInputSize: 1024, + maxDepth: 100, + }); + assert.ok(parser instanceof NotationParser); + }); + + it('should parse input using custom parser', () => { + const parser = LinksNotation.createParser(); + const links = parser.parse('hello'); + assert.equal(links.length, 1); + }); + }); + + describe('createStreamParser', () => { + it('should create a stream parser', () => { + const parser = LinksNotation.createStreamParser(); + assert.ok(parser instanceof NotationStreamParser); + }); + + it('should emit links on end', (_, done) => { + const parser = LinksNotation.createStreamParser(); + const received = []; + + parser.on('link', (link) => { + received.push(link); + }); + + parser.on('end', () => { + assert.ok(received.length > 0); + done(); + }); + + parser.write('hello'); + parser.end(); + }); + + it('should emit error on parse failure', (_, done) => { + const parser = LinksNotation.createStreamParser(); + + parser.on('error', (err) => { + assert.ok(err instanceof Error); + done(); + }); + + parser.write('((unclosed'); + parser.end(); + }); + + it('should throw on write after end', () => { + const parser = LinksNotation.createStreamParser(); + parser.end(); + + assert.throws(() => parser.write('data'), /ended/); + }); + + it('should support chained writes', () => { + const parser = LinksNotation.createStreamParser(); + const result = parser.write('hel').write('lo'); + assert.strictEqual(result, parser); + }); + + it('should support off to remove listeners', () => { + const parser = LinksNotation.createStreamParser(); + const handler = () => {}; + + parser.on('link', handler); + parser.off('link', handler); + // No assertion needed - just checking it doesn't throw + }); + }); +}); + +describe('NotationParser', () => { + it('should create instance with default options', () => { + const parser = new NotationParser(); + assert.ok(parser); + }); + + it('should parse valid input', () => { + const parser = new NotationParser(); + const links = parser.parse('(test: value)'); + assert.equal(links.length, 1); + }); +}); + +describe('NotationStreamParser', () => { + it('should create instance with default options', () => { + const parser = new NotationStreamParser(); + assert.ok(parser); + assert.ok(parser.options.maxBufferSize > 0); + }); + + it('should create instance with custom options', () => { + const parser = new NotationStreamParser({ maxBufferSize: 1024 }); + assert.equal(parser.options.maxBufferSize, 1024); + }); +}); + +describe('NotationParseError', () => { + it('should have correct name', () => { + const error = new NotationParseError('test error'); + assert.equal(error.name, 'NotationParseError'); + }); + + it('should store location', () => { + const location = { line: 1, column: 5, offset: 4 }; + const error = new NotationParseError('test error', location); + assert.deepEqual(error.location, location); + }); + + it('should handle null location', () => { + const error = new NotationParseError('test error'); + assert.equal(error.location, null); + }); +}); + +describe('Round-trip conversion', () => { + it('should round-trip simple link', () => { + const original = '(type: enqueue)'; + const links = LinksNotation.parse(original); + const serialized = LinksNotation.stringify(links); + + // Re-parse and verify structure is preserved + const reparsed = LinksNotation.parse(serialized); + assert.equal(reparsed.length, links.length); + }); + + it('should round-trip multiple links', () => { + const original = 'hello world'; + const links = LinksNotation.parse(original); + const serialized = LinksNotation.stringify(links); + + const reparsed = LinksNotation.parse(serialized); + assert.equal(reparsed.length, links.length); + }); +}); From 0c883d3506d5ef5465b1255e311ab68a045189cf Mon Sep 17 00:00:00 2001 From: konard Date: Sat, 17 Jan 2026 01:11:22 +0100 Subject: [PATCH 3/4] fix: Apply formatting to changeset file Co-Authored-By: Claude Opus 4.5 --- js/.changeset/integrate-links-notation.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/.changeset/integrate-links-notation.md b/js/.changeset/integrate-links-notation.md index b3bf779..5a8d27f 100644 --- a/js/.changeset/integrate-links-notation.md +++ b/js/.changeset/integrate-links-notation.md @@ -1,5 +1,5 @@ --- -"links-queue-js": minor +'links-queue-js': minor --- Integrate links-notation library for parsing and serialization From 03356e5e9835ce22bfe588ec4bfc1a2b37171799 Mon Sep 17 00:00:00 2001 From: konard Date: Sat, 17 Jan 2026 01:14:00 +0100 Subject: [PATCH 4/4] Revert "Initial commit with task details" This reverts commit aad065ee5abba9191602ff4c85ac025ef0d2a19f. --- CLAUDE.md | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index a941524..0000000 --- a/CLAUDE.md +++ /dev/null @@ -1,5 +0,0 @@ -Issue to solve: https://github.com/link-foundation/links-queue/issues/16 -Your prepared branch: issue-16-8f27ebbd262d -Your prepared working directory: /tmp/gh-issue-solver-1768607940943 - -Proceed.