From 7e576e3769c9faf111d9c104d5cd826d30c1fd3e Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 27 Mar 2024 14:14:30 -0300 Subject: [PATCH 1/8] Add "includeOnly" field to core configuration The "includeOnly" field has been added to the core configuration and schema. This is an optional array that takes in glob-like patterns specifying the streams that should be tracked by the node. It's used as a filter; if the list is empty, all streams fetched from the contract will be tracked. --- packages/core/src/config/config.schema.json | 7 +++++++ packages/core/src/config/config.ts | 2 ++ 2 files changed, 9 insertions(+) diff --git a/packages/core/src/config/config.schema.json b/packages/core/src/config/config.schema.json index a5a863a..16b5f25 100644 --- a/packages/core/src/config/config.schema.json +++ b/packages/core/src/config/config.schema.json @@ -101,6 +101,13 @@ "type": "string", "enum": ["network"] }, + "includeOnly": { + "type": "array", + "description": "List of glob-like patterns representing the streams to be tracked by this node. Note: it act as a filter, if the list is empty, all streams fetched from the contract will be tracked.", + "items": { + "type": "string" + } + }, "pool": { "type": "object", "description": "Kyve Pool configuration for network participant mode", diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 90cd25c..1cfa237 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -15,6 +15,8 @@ export type NetworkParticipantMode = { url: string; pollInterval: number; }; + // glob pattern like list of streams that should be tracked by this node + includeOnly?: string[]; }; type StandaloneMode = { type: 'standalone'; From 2f67a758a899ea6c7effbb664a47de23fe40dc7b Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 27 Mar 2024 14:42:07 -0300 Subject: [PATCH 2/8] Add stream filtering based on glob patterns The update introduces glob-based stream filtering in the `LogStoreNetworkConfig` and `LogStoreNetworkPlugin`. This was achieved by adding 'micromatch' to the dependencies and creating a utility function 'globsMatch' that checks if a string matches any provided glob patterns. This function is now used to filter streams based on the newly added 'includeOnly' option in the plugins. --- packages/core/package.json | 2 ++ packages/core/src/Plugin.ts | 1 + .../logStore/network/LogStoreNetworkConfig.ts | 9 +++++---- .../logStore/network/LogStoreNetworkPlugin.ts | 17 +++++++++++++++-- packages/core/src/utils/filterByGlob.ts | 11 +++++++++++ 5 files changed, 34 insertions(+), 6 deletions(-) create mode 100644 packages/core/src/utils/filterByGlob.ts diff --git a/packages/core/package.json b/packages/core/package.json index 8a29fed..b123bd6 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -58,6 +58,7 @@ "js-base64": "^3.7.5", "lodash": "^4.17.21", "merge2": "^1.4.1", + "micromatch": "^4.0.5", "p-limit": "^3.1.0", "p-memoize": "4.0.3", "rxjs": "8.0.0-alpha.12", @@ -85,6 +86,7 @@ "@types/jest": "^29.5.0", "@types/lodash": "^4.14.191", "@types/merge2": "^1.4.0", + "@types/micromatch": "^4.0.6", "@types/node": "^16.18.39", "@types/node-fetch": "^2.6.2", "@types/secp256k1": "^4.0.3", diff --git a/packages/core/src/Plugin.ts b/packages/core/src/Plugin.ts index 9ac0286..a3fc76f 100644 --- a/packages/core/src/Plugin.ts +++ b/packages/core/src/Plugin.ts @@ -14,6 +14,7 @@ export type NetworkModeConfig = { recoveryStream: Stream; systemStream: Stream; nodeManager: LogStoreNodeManager; + includeOnly?: string[]; }; export type StandaloneModeConfig = { diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts index 02a25f7..9579e2c 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts @@ -48,7 +48,8 @@ export class LogStoreNetworkConfig implements LogStoreConfig { pollInterval: number, logStoreClient: LogStoreClient, streamrClient: StreamrClient, - listener: LogStoreConfigListener + listener: LogStoreConfigListener, + streamFilter?: (streamPart: StreamPartID) => boolean ) { this.clusterSize = clusterSize; this.myIndexInCluster = myIndexInCluster; @@ -57,9 +58,9 @@ export class LogStoreNetworkConfig implements LogStoreConfig { pollInterval, logStoreClient, (streams, block) => { - const streamParts = streams.flatMap((stream: Stream) => [ - ...this.createMyStreamParts(stream), - ]); + const streamParts = streams + .flatMap((stream: Stream) => [...this.createMyStreamParts(stream)]) + .filter((streamPart) => streamFilter?.(streamPart) ?? true); this.handleDiff( this.synchronizer.ingestSnapshot( new Set(streamParts), diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts index 4bba71a..b038bc4 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts @@ -1,5 +1,5 @@ import { QueryRequest } from '@logsn/protocol'; -import { StreamPartIDUtils } from '@streamr/protocol'; +import { StreamPartID, StreamPartIDUtils } from '@streamr/protocol'; import { EthereumAddress, Logger } from '@streamr/utils'; import { Schema } from 'ajv'; import { Stream } from 'streamr-client'; @@ -7,6 +7,7 @@ import { Stream } from 'streamr-client'; import { NetworkModeConfig, PluginOptions } from '../../../Plugin'; import { BroadbandPublisher } from '../../../shared/BroadbandPublisher'; import { BroadbandSubscriber } from '../../../shared/BroadbandSubscriber'; +import { globsMatch } from '../../../utils/filterByGlob'; import PLUGIN_CONFIG_SCHEMA from '../config.schema.json'; import { createRecoveryEndpoint } from '../http/recoveryEndpoint'; import { LogStorePlugin } from '../LogStorePlugin'; @@ -217,6 +218,17 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { ): Promise { const node = await this.streamrClient.getNode(); + const streamFilter = (streamPart: StreamPartID): boolean => { + const includeOnlyGlobs = this.networkConfig.includeOnly; + if (!includeOnlyGlobs) { + return true; + } + return globsMatch( + StreamPartIDUtils.getStreamID(streamPart), + ...includeOnlyGlobs + ); + }; + const logStoreConfig = new LogStoreNetworkConfig( this.pluginConfig.cluster.clusterSize, this.pluginConfig.cluster.myIndexInCluster, @@ -257,7 +269,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { StreamPartIDUtils.getStreamID(streamPart) ); }, - } + }, + streamFilter ); await logStoreConfig.start(); return logStoreConfig; diff --git a/packages/core/src/utils/filterByGlob.ts b/packages/core/src/utils/filterByGlob.ts new file mode 100644 index 0000000..719f78a --- /dev/null +++ b/packages/core/src/utils/filterByGlob.ts @@ -0,0 +1,11 @@ +import micromatch from 'micromatch'; + +/** + * Check if the original string matches any of the globs + */ +export const globsMatch = ( + originalStr: string, + ...globs: string[] +): boolean => { + return globs.some((glob) => micromatch.isMatch(originalStr, glob)); +}; From 0e994d6385fd185276eec9f533978aa6d6881cfe Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 27 Mar 2024 15:11:08 -0300 Subject: [PATCH 3/8] Refactor filter logic for stream inclusion Refactored the filter logic for stream inclusion from the streamFilter() function to a newly introduced isStreamIncluded() function, improving code reusability. In addition, we introduced the shouldPublishQueryResponseFn() function in order to decide whether the query response should be published if the node opts to not participate in the propagation of the query response. --- .../logStore/network/LogStoreNetworkPlugin.ts | 28 +++++++++---------- .../network/NetworkQueryRequestManager.ts | 12 ++++++-- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts index b038bc4..8499577 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts @@ -1,8 +1,8 @@ import { QueryRequest } from '@logsn/protocol'; -import { StreamPartID, StreamPartIDUtils } from '@streamr/protocol'; +import { StreamPartID, StreamPartIDUtils, toStreamID } from '@streamr/protocol'; import { EthereumAddress, Logger } from '@streamr/utils'; import { Schema } from 'ajv'; -import { Stream } from 'streamr-client'; +import { Stream, StreamID } from 'streamr-client'; import { NetworkModeConfig, PluginOptions } from '../../../Plugin'; import { BroadbandPublisher } from '../../../shared/BroadbandPublisher'; @@ -122,7 +122,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { this.queryResponseManager, this.propagationResolver, this.systemPublisher, - this.systemSubscriber + this.systemSubscriber, + (queryRequest) => this.isStreamIncluded(toStreamID(queryRequest.streamId)) ); this.reportPoller = new ReportPoller( @@ -134,6 +135,14 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { ); } + private isStreamIncluded(streamId: StreamID): boolean { + const includeOnlyGlobs = this.networkConfig.includeOnly; + if (!includeOnlyGlobs) { + return true; + } + return globsMatch(streamId, ...includeOnlyGlobs); + } + get networkConfig(): NetworkModeConfig { if (this.modeConfig.type !== 'network') { throw new Error('Something went wrong, this should be a network plugin'); @@ -176,7 +185,6 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { ) ); } - this.metricsTimer = setInterval( this.logMetrics.bind(this), METRICS_INTERVAL @@ -218,16 +226,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { ): Promise { const node = await this.streamrClient.getNode(); - const streamFilter = (streamPart: StreamPartID): boolean => { - const includeOnlyGlobs = this.networkConfig.includeOnly; - if (!includeOnlyGlobs) { - return true; - } - return globsMatch( - StreamPartIDUtils.getStreamID(streamPart), - ...includeOnlyGlobs - ); - }; + const streamFilter = (streamPartId: StreamPartID) => + this.isStreamIncluded(StreamPartIDUtils.getStreamID(streamPartId)); const logStoreConfig = new LogStoreNetworkConfig( this.pluginConfig.cluster.clusterSize, diff --git a/packages/core/src/plugins/logStore/network/NetworkQueryRequestManager.ts b/packages/core/src/plugins/logStore/network/NetworkQueryRequestManager.ts index 56a54bd..61cc848 100644 --- a/packages/core/src/plugins/logStore/network/NetworkQueryRequestManager.ts +++ b/packages/core/src/plugins/logStore/network/NetworkQueryRequestManager.ts @@ -7,6 +7,7 @@ import { import { createSignaturePayload, StreamMessage } from '@streamr/protocol'; import { Logger } from '@streamr/utils'; import { keccak256 } from 'ethers/lib/utils'; +import { EMPTY } from 'rxjs'; import { Readable } from 'stream'; import { MessageMetadata } from 'streamr-client'; @@ -24,7 +25,12 @@ export class NetworkQueryRequestManager extends BaseQueryRequestManager { private readonly queryResponseManager: QueryResponseManager, private readonly propagationResolver: PropagationResolver, private readonly publisher: BroadbandPublisher, - private readonly subscriber: BroadbandSubscriber + private readonly subscriber: BroadbandSubscriber, + // This function is used to determine whether the query response should be published. + // There are cases in which this node can choose not to participate in the query response propagation. + private readonly shouldPublishQueryResponseFn: ( + queryRequest: QueryRequest + ) => boolean ) { // super(); @@ -51,7 +57,9 @@ export class NetworkQueryRequestManager extends BaseQueryRequestManager { content, metadata, }); - const readableStream = this.getDataForQueryRequest(queryRequest); + const readableStream = this.shouldPublishQueryResponseFn(queryRequest) + ? this.getDataForQueryRequest(queryRequest) + : Readable.from(EMPTY); const hashMap = await this.getHashMap(readableStream); const queryResponse = new QueryResponse({ From 24cdfc1dad6e870da12dfe38be710b27455ea8af Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 27 Mar 2024 15:22:52 -0300 Subject: [PATCH 4/8] Refactor query validation and add error codes The method validateUserQueryAccess has been changed to validateQueryRequest, now taking a Request object as parameter. This allows more comprehensive validation such as checking if a stream is included in the network configuration. Error codes have also been added to the response, providing richer error handling capabilities. --- .../src/plugins/logStore/LogStorePlugin.ts | 7 +++--- .../logStore/http/dataQueryEndpoint.ts | 13 +++------- .../src/plugins/logStore/http/httpHelpers.ts | 8 ++++-- .../logStore/network/LogStoreNetworkPlugin.ts | 25 +++++++++++++++---- .../standalone/LogStoreStandalonePlugin.ts | 2 +- 5 files changed, 34 insertions(+), 21 deletions(-) diff --git a/packages/core/src/plugins/logStore/LogStorePlugin.ts b/packages/core/src/plugins/logStore/LogStorePlugin.ts index 044f55c..7545705 100644 --- a/packages/core/src/plugins/logStore/LogStorePlugin.ts +++ b/packages/core/src/plugins/logStore/LogStorePlugin.ts @@ -19,6 +19,7 @@ import { MessageListener } from './MessageListener'; import { MessageProcessor } from './MessageProcessor'; import { NodeStreamsRegistry } from './NodeStreamsRegistry'; import { ValidationSchemaManager } from './validation-schema/ValidationSchemaManager'; +import { Request } from 'express'; const logger = new Logger(module); @@ -156,9 +157,9 @@ export abstract class LogStorePlugin extends Plugin { data: Readable; }>; - public abstract validateUserQueryAccess( - address: EthereumAddress - ): Promise<{ valid: true } | { valid: false; message: string }>; + public abstract validateQueryRequest( + request: Request + ): Promise<{ valid: true } | { valid: false; message: string, errorCode?: number }>; private async startStorage( metricsContext: MetricsContext diff --git a/packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts b/packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts index 6ba30ae..b1fccc3 100644 --- a/packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts +++ b/packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts @@ -1,12 +1,7 @@ /** * Endpoints for RESTful data requests */ -import { - MetricsContext, - MetricsDefinition, - RateMetric, - toEthereumAddress, -} from '@streamr/utils'; +import { MetricsContext, MetricsDefinition, RateMetric } from '@streamr/utils'; import { Request, RequestHandler, Response } from 'express'; import { HttpServerEndpoint } from '../../../Plugin'; @@ -111,8 +106,6 @@ const createHandler = (metrics: MetricsDefinition): RequestHandler => { const format = getFormat(req.query.format as string | undefined); - const consumer = toEthereumAddress(req.consumer!); - const store = logStoreContext.getStore(); if (!store) { throw new Error('LogStore context was not initialized'); @@ -120,9 +113,9 @@ const createHandler = (metrics: MetricsDefinition): RequestHandler => { const { logStorePlugin } = store; - const validation = await logStorePlugin.validateUserQueryAccess(consumer); + const validation = await logStorePlugin.validateQueryRequest(req); if (!validation.valid) { - sendError(validation.message, res); + sendError(validation.message, res, validation.errorCode); return; } diff --git a/packages/core/src/plugins/logStore/http/httpHelpers.ts b/packages/core/src/plugins/logStore/http/httpHelpers.ts index df29aca..f94dbcf 100644 --- a/packages/core/src/plugins/logStore/http/httpHelpers.ts +++ b/packages/core/src/plugins/logStore/http/httpHelpers.ts @@ -74,9 +74,13 @@ export const sendSuccess = ( }); }; -export const sendError = (message: string, res: Response) => { +export const sendError = ( + message: string, + res: Response, + errorCode?: number +) => { logger.error(message); - res.status(400).json({ + res.status(errorCode ?? 400).json({ error: message, }); }; diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts index 8499577..ca7b9a0 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts @@ -1,7 +1,8 @@ import { QueryRequest } from '@logsn/protocol'; import { StreamPartID, StreamPartIDUtils, toStreamID } from '@streamr/protocol'; -import { EthereumAddress, Logger } from '@streamr/utils'; +import { Logger, toEthereumAddress } from '@streamr/utils'; import { Schema } from 'ajv'; +import { Request } from 'express'; import { Stream, StreamID } from 'streamr-client'; import { NetworkModeConfig, PluginOptions } from '../../../Plugin'; @@ -291,18 +292,32 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { }; } - public async validateUserQueryAccess(address: EthereumAddress) { - const balance = await this.logStoreClient.getQueryBalanceOf(address); + public async validateQueryRequest(req: Request) { + const consumerAddress = toEthereumAddress(req.consumer!); + + const balance = await this.logStoreClient.getQueryBalanceOf( + consumerAddress + ); if (balance <= 0) { return { valid: false, message: 'Not enough balance staked for query', + errorCode: 402, } as const; - } else { + } + + const streamId = req.params.id; + const isStreamIncluded = this.isStreamIncluded(toStreamID(streamId)); + + if (!isStreamIncluded) { return { - valid: true, + valid: false, + message: 'Stream is excluded by the network configuration', + errorCode: 404, } as const; } + + return { valid: true } as const; } private logMetrics() { diff --git a/packages/core/src/plugins/logStore/standalone/LogStoreStandalonePlugin.ts b/packages/core/src/plugins/logStore/standalone/LogStoreStandalonePlugin.ts index 4a1e358..5e6b973 100644 --- a/packages/core/src/plugins/logStore/standalone/LogStoreStandalonePlugin.ts +++ b/packages/core/src/plugins/logStore/standalone/LogStoreStandalonePlugin.ts @@ -50,7 +50,7 @@ export class LogStoreStandalonePlugin extends LogStorePlugin { }; } - public async validateUserQueryAccess() { + public async validateQueryRequest() { return { valid: true } as const; } From 88c7337f0da3723fa191fc01eae9122b3cde2daf Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 27 Mar 2024 15:27:30 -0300 Subject: [PATCH 5/8] include helper comments on LogStoreNetworkConfig.ts --- .../core/src/plugins/logStore/network/LogStoreNetworkConfig.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts index 9579e2c..922cda1 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts @@ -49,6 +49,7 @@ export class LogStoreNetworkConfig implements LogStoreConfig { logStoreClient: LogStoreClient, streamrClient: StreamrClient, listener: LogStoreConfigListener, + // helps us not even include stream parts that we're not interested in streamFilter?: (streamPart: StreamPartID) => boolean ) { this.clusterSize = clusterSize; @@ -60,6 +61,7 @@ export class LogStoreNetworkConfig implements LogStoreConfig { (streams, block) => { const streamParts = streams .flatMap((stream: Stream) => [...this.createMyStreamParts(stream)]) + // if there's a filter, apply it, otherwise include everything .filter((streamPart) => streamFilter?.(streamPart) ?? true); this.handleDiff( this.synchronizer.ingestSnapshot( From 9b2a8715a8a672633f4ef433ab26aae50ead18f8 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 27 Mar 2024 16:12:48 -0300 Subject: [PATCH 6/8] Add check for incompatible node configuration Added a periodic check to ensure that the node configuration doesn't include specific streams while having an HTTP server enabled. This feature helps in generating meaningful alerts as these configurations might produce undesired 404 responses from clients. The check is implemented using a new 'createIncompatibleNodeUrlLogger' utility. The logger is subscribed in the LogStoreNetworkPlugin's start method and unsubscribed in its stop method. --- .../logStore/network/LogStoreNetworkPlugin.ts | 13 +++++ .../network/log-utils/checkConfigLogger.ts | 48 +++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 packages/core/src/plugins/logStore/network/log-utils/checkConfigLogger.ts diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts index ca7b9a0..3d8d73c 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts @@ -3,6 +3,7 @@ import { StreamPartID, StreamPartIDUtils, toStreamID } from '@streamr/protocol'; import { Logger, toEthereumAddress } from '@streamr/utils'; import { Schema } from 'ajv'; import { Request } from 'express'; +import { Subscription } from 'rxjs'; import { Stream, StreamID } from 'streamr-client'; import { NetworkModeConfig, PluginOptions } from '../../../Plugin'; @@ -14,6 +15,7 @@ import { createRecoveryEndpoint } from '../http/recoveryEndpoint'; import { LogStorePlugin } from '../LogStorePlugin'; import { Heartbeat } from './Heartbeat'; import { KyvePool } from './KyvePool'; +import { createIncompatibleNodeUrlLogger } from './log-utils/checkConfigLogger'; import { LogStoreNetworkConfig } from './LogStoreNetworkConfig'; import { MessageMetricsCollector } from './MessageMetricsCollector'; import { NetworkQueryRequestManager } from './NetworkQueryRequestManager'; @@ -43,6 +45,7 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { private readonly propagationResolver: PropagationResolver; private readonly propagationDispatcher: PropagationDispatcher; private readonly reportPoller: ReportPoller; + private readonly otherSubscriptions: Subscription[] = []; private metricsTimer?: NodeJS.Timer; @@ -134,6 +137,14 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { this.systemPublisher, this.systemSubscriber ); + + this.otherSubscriptions.push( + createIncompatibleNodeUrlLogger( + this.logStoreClient, + this.streamrClient, + this.networkConfig + ).subscribe() + ); } private isStreamIncluded(streamId: StreamID): boolean { @@ -214,6 +225,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { : Promise.resolve(), ]); + this.otherSubscriptions.forEach((s) => s.unsubscribe()); + await super.stop(); } diff --git a/packages/core/src/plugins/logStore/network/log-utils/checkConfigLogger.ts b/packages/core/src/plugins/logStore/network/log-utils/checkConfigLogger.ts new file mode 100644 index 0000000..8daf1aa --- /dev/null +++ b/packages/core/src/plugins/logStore/network/log-utils/checkConfigLogger.ts @@ -0,0 +1,48 @@ +import { LogStoreClient } from '@logsn/client'; +import { Logger } from '@streamr/utils'; +import { defer, EMPTY, filter, interval, map, shareReplay, tap } from 'rxjs'; +import { switchMap } from 'rxjs/internal/operators/switchMap'; +import StreamrClient from 'streamr-client'; + +import { NetworkModeConfig } from '../../../../Plugin'; + +const logger = new Logger(module); + +/** + * Reports if the node is configured to include only specific streams, but also has an HTTP server enabled. + * This is not acceptable as it may produce undesired 404 responses from clients. + * + * However, this should not shutdown the node, as it's not a critical issue for its operation. + */ +export const createIncompatibleNodeUrlLogger = ( + logStoreClient: LogStoreClient, + streamrClient: StreamrClient, + config: NetworkModeConfig +) => { + const address$ = defer(() => streamrClient.getAddress()).pipe( + // won't change, we cache it + shareReplay(1) + ); + const hasNodeUrl$ = address$.pipe( + map((nodeAddress) => logStoreClient.getNodeUrl(nodeAddress)) + ); + + const hasIncludeOnly = + config.includeOnly !== undefined && config.includeOnly.length > 0; + + if (!hasIncludeOnly) { + // this config is static, won't change, so we can already return it + return EMPTY; + } + + // run periodically + return interval(5 * 60 * 1000).pipe( + switchMap(() => hasNodeUrl$), + filter(Boolean), + tap(() => { + logger.error( + `This node is configured to include only specific streams, but also has an HTTP server enabled. This may produce undesired 404 responses from clients.` + ); + }) + ); +}; From 96623a944e9a693e65f56b2ea8136284261c3f98 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 27 Mar 2024 16:14:08 -0300 Subject: [PATCH 7/8] Rename log utility function for clarity --- .../core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts | 2 +- .../{checkConfigLogger.ts => checkNodeUrlCompatibility.ts} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename packages/core/src/plugins/logStore/network/log-utils/{checkConfigLogger.ts => checkNodeUrlCompatibility.ts} (100%) diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts index 3d8d73c..17fb55d 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts @@ -15,7 +15,7 @@ import { createRecoveryEndpoint } from '../http/recoveryEndpoint'; import { LogStorePlugin } from '../LogStorePlugin'; import { Heartbeat } from './Heartbeat'; import { KyvePool } from './KyvePool'; -import { createIncompatibleNodeUrlLogger } from './log-utils/checkConfigLogger'; +import { createIncompatibleNodeUrlLogger } from './log-utils/checkNodeUrlCompatibility'; import { LogStoreNetworkConfig } from './LogStoreNetworkConfig'; import { MessageMetricsCollector } from './MessageMetricsCollector'; import { NetworkQueryRequestManager } from './NetworkQueryRequestManager'; diff --git a/packages/core/src/plugins/logStore/network/log-utils/checkConfigLogger.ts b/packages/core/src/plugins/logStore/network/log-utils/checkNodeUrlCompatibility.ts similarity index 100% rename from packages/core/src/plugins/logStore/network/log-utils/checkConfigLogger.ts rename to packages/core/src/plugins/logStore/network/log-utils/checkNodeUrlCompatibility.ts From 1cc82199accd807b4335a91a2add2ee828362e46 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Wed, 27 Mar 2024 19:10:38 -0300 Subject: [PATCH 8/8] Add tests and utilities for log store node The commit introduces utilities needed for effective management of 'logStoreNode' - start, stop, setup, and tear down of nodes. There's also enhancement on the utilities for account setup and cleanup. Alongside the utility scripts, a new integration test was added to verify if queries from a partially included stream are handled correctly by the 'logStoreNode'. --- .../logStore/includeOnlyBehavior.test.ts | 98 +++++++++++++ packages/core/test/resourceUtils.ts | 138 ++++++++++++++++++ packages/core/test/utils.ts | 1 + 3 files changed, 237 insertions(+) create mode 100644 packages/core/test/integration/plugins/logStore/includeOnlyBehavior.test.ts create mode 100644 packages/core/test/resourceUtils.ts diff --git a/packages/core/test/integration/plugins/logStore/includeOnlyBehavior.test.ts b/packages/core/test/integration/plugins/logStore/includeOnlyBehavior.test.ts new file mode 100644 index 0000000..e5ceae8 --- /dev/null +++ b/packages/core/test/integration/plugins/logStore/includeOnlyBehavior.test.ts @@ -0,0 +1,98 @@ +import { Tracker } from '@streamr/network-tracker'; +import { KeyServer } from '@streamr/test-utils'; +import { providers, Wallet } from 'ethers'; +import { Stream } from 'streamr-client'; +import { CONFIG_TEST as STREAMR_CLIENT_CONFIG_TEST } from 'streamr-client/types/src/ConfigTest'; + +import { accountUtils, getLogStoreNodeTestUtils } from '../../../resourceUtils'; +import { CONTRACT_OWNER_PRIVATE_KEY, createTestStream, startTestTracker } from '../../../utils'; + +jest.setTimeout(60000); + +// There are two options to run the test managed by a value of the TRACKER_PORT constant: +// 1. TRACKER_PORT = undefined - run the test against the brokers running in dev-env and brokers run by the test script. +// 2. TRACKER_PORT = 17771 - run the test against only brokers run by the test script. +// In this case dev-env doesn't run any brokers and there is no brokers joined the network (NodeManager.totalNodes == 0) +const TRACKER_PORT = undefined; + +describe('Network Mode Queries', () => { + const provider = new providers.JsonRpcProvider( + STREAMR_CLIENT_CONFIG_TEST.contracts?.streamRegistryChainRPCs?.rpcs[0].url, + STREAMR_CLIENT_CONFIG_TEST.contracts?.streamRegistryChainRPCs?.chainId + ); + + const fullNodeUtils = getLogStoreNodeTestUtils( + provider, + { + plugins: { + logStore: { db: { type: 'sqlite' } } + }, + httpServerPort: 7171 + }, + TRACKER_PORT + ); + const partialNodeUtils = getLogStoreNodeTestUtils( + provider, + { + plugins: { + logStore: { db: { type: 'sqlite' } } + }, + httpServerPort: 7172, + mode: { + includeOnly: ['**/*-include-test-stream'], + type: 'network' + } + }, + TRACKER_PORT + ); + + // Accounts + const client = accountUtils(provider); + + let tracker: Tracker; + + let nonIncludedStream: Stream; + let includedStream: Stream; + const includedStreamId = `/${Date.now()}-include-test-stream`; + + beforeAll(async () => { + await client.setup(); + nonIncludedStream = await createTestStream(client.streamrClient, module); + includedStream = await client.streamrClient.createStream(includedStreamId); + + await client.stakeStream(nonIncludedStream.id); + await client.stakeStream(includedStream.id); + await client.stakeQuery(); + }); + + afterAll(async () => { + // TODO: Setup global tear-down + await KeyServer.stopIfRunning(); + await client.teardown(); + }); + + beforeEach(async () => { + if (TRACKER_PORT) { + tracker = await startTestTracker(TRACKER_PORT); + } + const adminWallet = new Wallet(CONTRACT_OWNER_PRIVATE_KEY, provider); + await fullNodeUtils.setup(adminWallet, { http: 'http://127.0.0.1:7171' }); + await partialNodeUtils.setup(adminWallet, {}); + }); + + afterEach(async () => { + await fullNodeUtils.teardown(); + await partialNodeUtils.teardown(); + await tracker?.stop(); + }); + + test('query partial node - included stream', async () => { + const messageStream = await client + .logStoreClientForNodeUrl('http://localhost:7172') + .query({ streamId: includedStreamId }, { last: 1 }); + + for await (const message of messageStream) { + expect(message).toBeDefined(); + } + }); +}); diff --git a/packages/core/test/resourceUtils.ts b/packages/core/test/resourceUtils.ts new file mode 100644 index 0000000..1a2e4e9 --- /dev/null +++ b/packages/core/test/resourceUtils.ts @@ -0,0 +1,138 @@ +import { LogStoreClient, NodeMetadata } from '@logsn/client'; +import { LogStoreNodeManager } from '@logsn/contracts'; +import { + getNodeManagerContract, + getQueryManagerContract, + getStoreManagerContract, + getTokenManagerContract, + prepareStakeForNodeManager, + prepareStakeForQueryManager, + prepareStakeForStoreManager, +} from '@logsn/shared'; +import { fetchPrivateKeyWithGas } from '@streamr/test-utils'; +import { providers, Wallet } from 'ethers'; +import StreamrClient from 'streamr-client'; + +import { LogStoreNode } from '../src/node'; +import { LogStoreBrokerTestConfig, sleep, startLogStoreBroker } from './utils'; + +const STAKE_AMOUNT = BigInt('1000000000000000000'); + +const getOrError = (value: T | undefined): T => { + if (value === undefined) { + throw new Error('Something is not initialized'); + } + return value; +}; +export const getLogStoreNodeTestUtils = ( + provider: providers.Provider, + logStoreConfig: Omit, + trackerPort: number | undefined +) => { + let logstoreNode: LogStoreNode | undefined; + let wallet: Wallet | undefined; + let nodeManager: LogStoreNodeManager | undefined; + return { + teardown: async () => { + await getOrError(nodeManager) + .leave() + .then((tx) => tx.wait()); + await logstoreNode?.stop(); + }, + wallet: () => getOrError(wallet), + setup: async ( + adminWallet: Wallet, + metadata: NodeMetadata + ): Promise => { + wallet = new Wallet(await fetchPrivateKeyWithGas(), provider); + const nodeAdminManager = await getNodeManagerContract(adminWallet); + const tokenAdminManager = await getTokenManagerContract(adminWallet); + nodeManager = await getNodeManagerContract(wallet); + + await nodeAdminManager + .whitelistApproveNode(wallet.address) + .then((tx) => tx.wait()); + await tokenAdminManager + .addWhitelist(wallet.address, nodeManager.address) + .then((tx) => tx.wait()); + + await prepareStakeForNodeManager(wallet, STAKE_AMOUNT); + await nodeManager + .join(STAKE_AMOUNT, JSON.stringify(metadata)) + .then((tx) => tx.wait()); + + await sleep(5000); + + logstoreNode = await startLogStoreBroker({ + privateKey: wallet.privateKey, + trackerPort: trackerPort, + ...logStoreConfig, + }); + await logstoreNode.start(); + }, + }; +}; +export const accountUtils = ( + provider: providers.Provider, + initialWallet?: Wallet +) => { + let wallet: Wallet | undefined = initialWallet; + let logStoreClient: LogStoreClient | undefined; + let streamrClient: StreamrClient | undefined; + + const getWallet = () => getOrError(wallet); + + function getStreamrClient() { + if (!streamrClient) { + streamrClient = new StreamrClient({ + auth: { + privateKey: getWallet().privateKey, + }, + }); + } + return streamrClient; + } + + const logStoreNodeForUrlMap = new Map(); + + return { + setup: async () => { + wallet = new Wallet(await fetchPrivateKeyWithGas(), provider); + }, + teardown: async () => { + logStoreClient?.destroy(); + await streamrClient?.destroy(); + Array.from(logStoreNodeForUrlMap.values()).map((client) => + client.destroy() + ); + }, + get streamrClient() { + return getStreamrClient(); + }, + get logStoreClient() { + if (!logStoreClient) { + logStoreClient = new LogStoreClient(getStreamrClient()); + } + return logStoreClient; + }, + logStoreClientForNodeUrl: (nodeUrl: string) => { + if (!logStoreNodeForUrlMap.has(nodeUrl)) { + logStoreNodeForUrlMap.set( + nodeUrl, + new LogStoreClient(getStreamrClient(), { nodeUrl }) + ); + } + return logStoreNodeForUrlMap.get(nodeUrl)!; + }, + stakeStream: async (streamId: string) => { + const storeManager = await getStoreManagerContract(getWallet()); + await prepareStakeForStoreManager(getWallet(), STAKE_AMOUNT); + await storeManager.stake(streamId, '1').then((tx) => tx.wait()); + }, + stakeQuery: async () => { + const queryManager = await getQueryManagerContract(getWallet()); + await prepareStakeForQueryManager(getWallet(), STAKE_AMOUNT); + await queryManager.stake('1').then((tx) => tx.wait()); + }, + }; +}; diff --git a/packages/core/test/utils.ts b/packages/core/test/utils.ts index 0b151f9..51921fc 100644 --- a/packages/core/test/utils.ts +++ b/packages/core/test/utils.ts @@ -58,6 +58,7 @@ export interface LogStorePluginTestConfig { type: 'sqlite'; dbPath?: string; }; + refreshInterval?: number; }