From 369353c481c2c49f3d53bb4d4d15b16b5d3b5bd0 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Sun, 1 Mar 2026 17:33:31 +0100 Subject: [PATCH 01/24] Setup database dependencies --- apps/ensindexer/package.json | 3 +++ pnpm-lock.yaml | 49 +++++++++++++++++++++++++----------- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/apps/ensindexer/package.json b/apps/ensindexer/package.json index 0f22cb35c..40d2ac337 100644 --- a/apps/ensindexer/package.json +++ b/apps/ensindexer/package.json @@ -30,10 +30,12 @@ "@ensnode/ensrainbow-sdk": "workspace:*", "@ensnode/ponder-metadata": "workspace:*", "@ensnode/ponder-sdk": "workspace:*", + "@ponder/client": "catalog:", "caip": "catalog:", "date-fns": "catalog:", "deepmerge-ts": "^7.1.5", "dns-packet": "^5.6.1", + "drizzle-orm": "catalog:", "pg-connection-string": "catalog:", "hono": "catalog:", "ponder": "catalog:", @@ -44,6 +46,7 @@ "@ensnode/shared-configs": "workspace:*", "@types/dns-packet": "^5.6.5", "@types/node": "catalog:", + "@types/pg": "8.16.0", "typescript": "catalog:", "vitest": "catalog:" } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 032fefb25..c83b6701a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -362,7 +362,7 @@ importers: version: 1.37.0 '@ponder/client': specifier: 'catalog:' - version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) + version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) '@ponder/utils': specifier: 'catalog:' version: 0.2.16(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6)) @@ -386,7 +386,7 @@ importers: version: 4.1.0 drizzle-orm: specifier: 'catalog:' - version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) graphql: specifier: ^16.11.0 version: 16.11.0 @@ -475,6 +475,9 @@ importers: '@ensnode/ponder-sdk': specifier: workspace:* version: link:../../packages/ponder-sdk + '@ponder/client': + specifier: 'catalog:' + version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) caip: specifier: 'catalog:' version: 1.1.1 @@ -487,6 +490,9 @@ importers: dns-packet: specifier: ^5.6.1 version: 5.6.1 + drizzle-orm: + specifier: 'catalog:' + version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) hono: specifier: 'catalog:' version: 4.12.2 @@ -495,7 +501,7 @@ importers: version: 2.9.1 ponder: specifier: 'catalog:' - version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6))(zod@4.3.6) + version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6))(zod@4.3.6) viem: specifier: 'catalog:' version: 2.38.5(typescript@5.9.3)(zod@4.3.6) @@ -512,6 +518,9 @@ importers: '@types/node': specifier: 'catalog:' version: 24.10.9 + '@types/pg': + specifier: 8.16.0 + version: 8.16.0 typescript: specifier: 'catalog:' version: 5.9.3 @@ -856,7 +865,7 @@ importers: dependencies: ponder: specifier: 'catalog:' - version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76) + version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76) viem: specifier: 'catalog:' version: 2.38.5(typescript@5.9.3)(zod@3.25.76) @@ -1037,7 +1046,7 @@ importers: version: link:../ensrainbow-sdk drizzle-orm: specifier: 'catalog:' - version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) parse-prometheus-text-format: specifier: ^1.1.1 version: 1.1.1 @@ -1056,7 +1065,7 @@ importers: version: 4.12.2 ponder: specifier: 'catalog:' - version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76) + version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76) tsup: specifier: 'catalog:' version: 8.5.0(jiti@2.6.1)(postcss@8.5.6)(tsx@4.20.6)(typescript@5.9.3)(yaml@2.8.1) @@ -1108,13 +1117,13 @@ importers: version: 2.5.1 '@ponder/client': specifier: 'catalog:' - version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) + version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) dataloader: specifier: ^2.2.3 version: 2.2.3 drizzle-orm: specifier: 0.41.0 - version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) graphql: specifier: ^16.10.0 version: 16.11.0 @@ -4263,6 +4272,9 @@ packages: '@types/node@24.10.9': resolution: {integrity: sha512-ne4A0IpG3+2ETuREInjPNhUGis1SFjv1d5asp8MzEAGtOZeTeHVDOYqOgqfhvseqg/iXty2hjBf1zAOb7RNiNw==} + '@types/pg@8.16.0': + resolution: {integrity: sha512-RmhMd/wD+CF8Dfo+cVIy3RR5cl8CyfXQ0tGgW6XBL8L4LM/UTEbNXYRbLwU6w+CgrKBNbrQWt4FUtTfaU5jSYQ==} + '@types/prismjs@1.26.6': resolution: {integrity: sha512-vqlvI7qlMvcCBbVe0AKAb4f97//Hy0EBTaiW8AalRnG/xAN5zOiWWyrNqNXeq8+KAuvRewjCVY1+IPxk4RdNYw==} @@ -11281,9 +11293,9 @@ snapshots: '@pkgjs/parseargs@0.11.0': optional: true - '@ponder/client@0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3)': + '@ponder/client@0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3)': dependencies: - drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) eventsource: 3.0.7 superjson: 2.2.6 optionalDependencies: @@ -12723,6 +12735,12 @@ snapshots: dependencies: undici-types: 7.16.0 + '@types/pg@8.16.0': + dependencies: + '@types/node': 24.10.9 + pg-protocol: 1.10.3 + pg-types: 2.2.0 + '@types/prismjs@1.26.6': {} '@types/progress@2.0.7': @@ -14014,10 +14032,11 @@ snapshots: dotenv@8.6.0: {} - drizzle-orm@0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3): + drizzle-orm@0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3): optionalDependencies: '@electric-sql/pglite': 0.2.13 '@opentelemetry/api': 1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a) + '@types/pg': 8.16.0 kysely: 0.26.3 pg: 8.16.3 @@ -16193,7 +16212,7 @@ snapshots: graphql: 16.11.0 hono: 4.12.2 - ponder@0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76): + ponder@0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76): dependencies: '@babel/code-frame': 7.27.1 '@commander-js/extra-typings': 12.1.0(commander@12.1.0) @@ -16210,7 +16229,7 @@ snapshots: dataloader: 2.2.3 detect-package-manager: 3.0.2 dotenv: 16.6.1 - drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) glob: 10.5.0 graphql: 16.8.2 graphql-yoga: 5.17.1(graphql@16.8.2) @@ -16275,7 +16294,7 @@ snapshots: - utf-8-validate - zod - ponder@0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6))(zod@4.3.6): + ponder@0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6))(zod@4.3.6): dependencies: '@babel/code-frame': 7.27.1 '@commander-js/extra-typings': 12.1.0(commander@12.1.0) @@ -16292,7 +16311,7 @@ snapshots: dataloader: 2.2.3 detect-package-manager: 3.0.2 dotenv: 16.6.1 - drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) glob: 10.5.0 graphql: 16.8.2 graphql-yoga: 5.17.1(graphql@16.8.2) From 90ebd288086624ebc8050716709419683cfaefa7 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Sun, 1 Mar 2026 18:10:10 +0100 Subject: [PATCH 02/24] Create `EnsDbClient` for ENSIndexer ENSIndexer modules can import the singleton instance from `@/lib/ensdb-client`. --- .../src/lib/ensdb-client/drizzle.ts | 24 +++ .../src/lib/ensdb-client/ensdb-client.ts | 189 ++++++++++++++++++ .../src/lib/ensdb-client/singleton.ts | 8 + 3 files changed, 221 insertions(+) create mode 100644 apps/ensindexer/src/lib/ensdb-client/drizzle.ts create mode 100644 apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts create mode 100644 apps/ensindexer/src/lib/ensdb-client/singleton.ts diff --git a/apps/ensindexer/src/lib/ensdb-client/drizzle.ts b/apps/ensindexer/src/lib/ensdb-client/drizzle.ts new file mode 100644 index 000000000..cee05366b --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/drizzle.ts @@ -0,0 +1,24 @@ +// This file was copied 1-to-1 from ENSApi. + +import { setDatabaseSchema } from "@ponder/client"; +import { drizzle } from "drizzle-orm/node-postgres"; + +type Schema = { [name: string]: unknown }; + +/** + * Makes a Drizzle DB object. + */ +export const makeDrizzle = ({ + schema, + databaseUrl, + databaseSchema, +}: { + schema: SCHEMA; + databaseUrl: string; + databaseSchema: string; +}) => { + // monkeypatch schema onto tables + setDatabaseSchema(schema, databaseSchema); + + return drizzle(databaseUrl, { schema, casing: "snake_case" }); +}; diff --git a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts new file mode 100644 index 000000000..c23161d5b --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts @@ -0,0 +1,189 @@ +import type { NodePgDatabase } from "drizzle-orm/node-postgres"; +import { eq } from "drizzle-orm/sql"; + +import { ensNodeMetadata } from "@ensnode/ensnode-schema"; +import { + type CrossChainIndexingStatusSnapshot, + deserializeCrossChainIndexingStatusSnapshot, + deserializeEnsIndexerPublicConfig, + type EnsDbClientMutation, + type EnsDbClientQuery, + type EnsIndexerPublicConfig, + EnsNodeMetadataKeys, + type SerializedEnsNodeMetadata, + type SerializedEnsNodeMetadataEnsDbVersion, + type SerializedEnsNodeMetadataEnsIndexerIndexingStatus, + type SerializedEnsNodeMetadataEnsIndexerPublicConfig, + serializeCrossChainIndexingStatusSnapshot, + serializeEnsIndexerPublicConfig, +} from "@ensnode/ensnode-sdk"; + +import { makeDrizzle } from "./drizzle"; + +/** + * ENSDb Client Schema + * + * Includes schema definitions for {@link EnsDbClient} queries and mutations. + */ +const schema = { + ensNodeMetadata, +}; + +/** + * Drizzle database + * + * Allows interacting with Postgres database for ENSDb, using Drizzle ORM. + */ +interface DrizzleDb extends NodePgDatabase {} + +/** + * ENSDb Client + * + * This client exists to provide an abstraction layer for interacting with ENSDb. + * It enables ENSIndexer and ENSApi to decouple from each other, and use + * the ENSDb Client as the integration point between the two. + * + * Enables querying and mutating ENSDb data, such as: + * - ENSDb version + * - ENSIndexer Public Config, and Indexing Status Snapshot and CrossChainIndexingStatusSnapshot. + */ +export class EnsDbClient implements EnsDbClientQuery, EnsDbClientMutation { + /** + * Drizzle database instance for ENSDb. + */ + private db: DrizzleDb; + + /** + * @param databaseUrl connection string for ENSDb Postgres database + * @param databaseSchemaName Postgres schema name for ENSDb tables + */ + constructor(databaseUrl: string, databaseSchemaName: string) { + this.db = makeDrizzle({ + databaseSchema: databaseSchemaName, + databaseUrl, + schema, + }); + } + + /** + * @inheritdoc + */ + async getEnsDbVersion(): Promise { + const record = await this.getEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsDbVersion, + }); + + return record; + } + + /** + * @inheritdoc + */ + async getEnsIndexerPublicConfig(): Promise { + const record = await this.getEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsIndexerPublicConfig, + }); + + if (!record) { + return undefined; + } + + return deserializeEnsIndexerPublicConfig(record); + } + + /** + * @inheritdoc + */ + async getIndexingStatusSnapshot(): Promise { + const record = await this.getEnsNodeMetadata( + { + key: EnsNodeMetadataKeys.EnsIndexerIndexingStatus, + }, + ); + + if (!record) { + return undefined; + } + + return deserializeCrossChainIndexingStatusSnapshot(record); + } + + /** + * @inheritdoc + */ + async upsertEnsDbVersion(ensDbVersion: string): Promise { + await this.upsertEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsDbVersion, + value: ensDbVersion, + }); + } + + /** + * @inheritdoc + */ + async upsertEnsIndexerPublicConfig( + ensIndexerPublicConfig: EnsIndexerPublicConfig, + ): Promise { + await this.upsertEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsIndexerPublicConfig, + value: serializeEnsIndexerPublicConfig(ensIndexerPublicConfig), + }); + } + + /** + * @inheritdoc + */ + async upsertIndexingStatusSnapshot( + indexingStatus: CrossChainIndexingStatusSnapshot, + ): Promise { + await this.upsertEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsIndexerIndexingStatus, + value: serializeCrossChainIndexingStatusSnapshot(indexingStatus), + }); + } + + /** + * Get ENSNode metadata record + * + * @returns selected record in ENSDb. + * @throws when exactly one matching metadata record was not found + */ + private async getEnsNodeMetadata( + metadata: Pick, + ): Promise { + const result = await this.db + .select() + .from(ensNodeMetadata) + .where(eq(ensNodeMetadata.key, metadata.key)); + + if (result.length === 0) { + return undefined; + } + + if (result.length === 1 && result[0]) { + return result[0].value as EnsNodeMetadataType["value"]; + } + + throw new Error(`There must be exactly one ENSNodeMetadata record for '${metadata.key}' key`); + } + + /** + * Upsert ENSNode metadata + * + * @throws when upsert operation failed. + */ + private async upsertEnsNodeMetadata< + EnsNodeMetadataType extends SerializedEnsNodeMetadata = SerializedEnsNodeMetadata, + >(metadata: EnsNodeMetadataType): Promise { + await this.db + .insert(ensNodeMetadata) + .values({ + key: metadata.key, + value: metadata.value, + }) + .onConflictDoUpdate({ + target: ensNodeMetadata.key, + set: { value: metadata.value }, + }); + } +} diff --git a/apps/ensindexer/src/lib/ensdb-client/singleton.ts b/apps/ensindexer/src/lib/ensdb-client/singleton.ts new file mode 100644 index 000000000..3ab2225fd --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/singleton.ts @@ -0,0 +1,8 @@ +import config from "@/config"; + +import { EnsDbClient } from "./ensdb-client"; + +/** + * Singleton instance of {@link EnsDbClient} for use in ENSIndexer. + */ +export const ensDbClient = new EnsDbClient(config.databaseUrl, config.databaseSchemaName); From 354804028a9f491f30ff3dd6239a595816882deb Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Sun, 1 Mar 2026 18:11:44 +0100 Subject: [PATCH 03/24] Add unit tests for `EnsDbClient` --- .../src/lib/ensdb-client/ensdb-client.mock.ts | 66 ++++++ .../src/lib/ensdb-client/ensdb-client.test.ts | 200 ++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 apps/ensindexer/src/lib/ensdb-client/ensdb-client.mock.ts create mode 100644 apps/ensindexer/src/lib/ensdb-client/ensdb-client.test.ts diff --git a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.mock.ts b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.mock.ts new file mode 100644 index 000000000..4b7054aa6 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.mock.ts @@ -0,0 +1,66 @@ +import { + type BlockRef, + ChainIndexingStatusIds, + CrossChainIndexingStrategyIds, + type EnsIndexerPublicConfig, + OmnichainIndexingStatusIds, + PluginName, + RangeTypeIds, + type SerializedCrossChainIndexingStatusSnapshot, +} from "@ensnode/ensnode-sdk"; + +export const earlierBlockRef = { + timestamp: 1672531199, + number: 1024, +} as const satisfies BlockRef; + +export const laterBlockRef = { + timestamp: 1672531200, + number: 1025, +} as const satisfies BlockRef; + +export const databaseUrl = "postgres://user:pass@localhost:5432/ensdb"; + +export const databaseSchemaName = "public"; + +export const publicConfig = { + databaseSchemaName, + labelSet: { + labelSetId: "subgraph", + labelSetVersion: 0, + }, + indexedChainIds: new Set([1]), + isSubgraphCompatible: true, + namespace: "mainnet", + plugins: [PluginName.Subgraph], + versionInfo: { + nodejs: "v22.10.12", + ponder: "0.11.25", + ensDb: "0.32.0", + ensIndexer: "0.32.0", + ensNormalize: "1.11.1", + ensRainbow: "0.32.0", + ensRainbowSchema: 2, + }, +} satisfies EnsIndexerPublicConfig; + +export const serializedSnapshot = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: earlierBlockRef.timestamp, + snapshotTime: earlierBlockRef.timestamp + 20, + omnichainSnapshot: { + omnichainStatus: OmnichainIndexingStatusIds.Following, + chains: { + "1": { + chainStatus: ChainIndexingStatusIds.Following, + config: { + rangeType: RangeTypeIds.LeftBounded, + startBlock: earlierBlockRef, + }, + latestIndexedBlock: earlierBlockRef, + latestKnownBlock: laterBlockRef, + }, + }, + omnichainIndexingCursor: earlierBlockRef.timestamp, + }, +} satisfies SerializedCrossChainIndexingStatusSnapshot; diff --git a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.test.ts b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.test.ts new file mode 100644 index 000000000..d67bca8a9 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.test.ts @@ -0,0 +1,200 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { ensNodeMetadata } from "@ensnode/ensnode-schema"; +import { + deserializeCrossChainIndexingStatusSnapshot, + EnsNodeMetadataKeys, + serializeCrossChainIndexingStatusSnapshot, + serializeEnsIndexerPublicConfig, +} from "@ensnode/ensnode-sdk"; + +import { makeDrizzle } from "./drizzle"; +import { EnsDbClient } from "./ensdb-client"; +import * as ensDbClientMock from "./ensdb-client.mock"; + +// Mock the config module to prevent it from trying to load actual environment variables during tests +vi.mock("@/config", () => ({ default: {} })); + +// Mock the makeDrizzle function to return a mock database instance +vi.mock("./drizzle", () => ({ makeDrizzle: vi.fn() })); + +describe("EnsDbClient", () => { + // Mock database query results and methods + const selectResult = { current: [] as Array<{ value: unknown }> }; + const whereMock = vi.fn(async () => selectResult.current); + const fromMock = vi.fn(() => ({ where: whereMock })); + const selectMock = vi.fn(() => ({ from: fromMock })); + const onConflictDoUpdateMock = vi.fn(async () => undefined); + const valuesMock = vi.fn(() => ({ onConflictDoUpdate: onConflictDoUpdateMock })); + const insertMock = vi.fn(() => ({ values: valuesMock })); + const dbMock = { select: selectMock, insert: insertMock }; + + beforeEach(() => { + selectResult.current = []; + whereMock.mockClear(); + fromMock.mockClear(); + selectMock.mockClear(); + onConflictDoUpdateMock.mockClear(); + valuesMock.mockClear(); + insertMock.mockClear(); + vi.mocked(makeDrizzle).mockReturnValue(dbMock as unknown as ReturnType); + }); + + describe("getEnsDbVersion", () => { + it("returns undefined when no record exists", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsDbVersion()).resolves.toBeUndefined(); + + expect(selectMock).toHaveBeenCalledTimes(1); + expect(fromMock).toHaveBeenCalledWith(ensNodeMetadata); + }); + + it("returns value when one record exists", async () => { + // arrange + selectResult.current = [{ value: "0.1.0" }]; + + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsDbVersion()).resolves.toBe("0.1.0"); + }); + + // This scenario should be impossible due to the primary key constraint on + // the 'key' column of 'ensnode_metadata' table. + it("throws when multiple records exist", async () => { + // arrange + selectResult.current = [{ value: "0.1.0" }, { value: "0.1.1" }]; + + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsDbVersion()).rejects.toThrowError(/ensdb_version/i); + }); + }); + + describe("getEnsIndexerPublicConfig", () => { + it("returns undefined when no record exists", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsIndexerPublicConfig()).resolves.toBeUndefined(); + }); + + it("deserializes the stored config", async () => { + // arrange + const serializedConfig = serializeEnsIndexerPublicConfig(ensDbClientMock.publicConfig); + selectResult.current = [{ value: serializedConfig }]; + + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsIndexerPublicConfig()).resolves.toStrictEqual( + ensDbClientMock.publicConfig, + ); + }); + }); + + describe("getIndexingStatusSnapshot", () => { + it("deserializes the stored indexing status snapshot", async () => { + // arrange + selectResult.current = [{ value: ensDbClientMock.serializedSnapshot }]; + + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + const expected = deserializeCrossChainIndexingStatusSnapshot( + ensDbClientMock.serializedSnapshot, + ); + + // act & assert + await expect(client.getIndexingStatusSnapshot()).resolves.toStrictEqual(expected); + }); + }); + + describe("upsertEnsDbVersion", () => { + it("writes the database version metadata", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act + await client.upsertEnsDbVersion("0.2.0"); + + // assert + expect(insertMock).toHaveBeenCalledWith(ensNodeMetadata); + expect(valuesMock).toHaveBeenCalledWith({ + key: EnsNodeMetadataKeys.EnsDbVersion, + value: "0.2.0", + }); + expect(onConflictDoUpdateMock).toHaveBeenCalledWith({ + target: ensNodeMetadata.key, + set: { value: "0.2.0" }, + }); + }); + }); + + describe("upsertEnsIndexerPublicConfig", () => { + it("serializes and writes the public config", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + const expectedValue = serializeEnsIndexerPublicConfig(ensDbClientMock.publicConfig); + + // act + await client.upsertEnsIndexerPublicConfig(ensDbClientMock.publicConfig); + + // assert + expect(valuesMock).toHaveBeenCalledWith({ + key: EnsNodeMetadataKeys.EnsIndexerPublicConfig, + value: expectedValue, + }); + }); + }); + + describe("upsertIndexingStatusSnapshot", () => { + it("serializes and writes the indexing status snapshot", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + const snapshot = deserializeCrossChainIndexingStatusSnapshot( + ensDbClientMock.serializedSnapshot, + ); + const expectedValue = serializeCrossChainIndexingStatusSnapshot(snapshot); + + // act + await client.upsertIndexingStatusSnapshot(snapshot); + + // assert + expect(valuesMock).toHaveBeenCalledWith({ + key: EnsNodeMetadataKeys.EnsIndexerIndexingStatus, + value: expectedValue, + }); + }); + }); +}); From df97f7e4e1047faa2de55cbba0295f344536f457 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Sun, 1 Mar 2026 18:53:08 +0100 Subject: [PATCH 04/24] Apply singleton file imports Makes using singleton instances for various classes easier --- apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts | 4 +--- apps/ensindexer/src/lib/ensindexer-client/singleton.ts | 5 +++++ apps/ensindexer/src/lib/indexing-status-builder/singleton.ts | 5 +++++ 3 files changed, 11 insertions(+), 3 deletions(-) create mode 100644 apps/ensindexer/src/lib/ensindexer-client/singleton.ts create mode 100644 apps/ensindexer/src/lib/indexing-status-builder/singleton.ts diff --git a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts index 7a4346c1d..33281c43e 100644 --- a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts +++ b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts @@ -15,11 +15,9 @@ import { import { buildENSIndexerPublicConfig } from "@/config/public"; import { createCrossChainIndexingStatusSnapshotOmnichain } from "@/lib/indexing-status/build-index-status"; -import { IndexingStatusBuilder } from "@/lib/indexing-status-builder/indexing-status-builder"; -import { localPonderClient } from "@/lib/local-ponder-client"; +import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; const app = new Hono(); -const indexingStatusBuilder = new IndexingStatusBuilder(localPonderClient); // include ENSIndexer Public Config endpoint app.get("/config", async (c) => { diff --git a/apps/ensindexer/src/lib/ensindexer-client/singleton.ts b/apps/ensindexer/src/lib/ensindexer-client/singleton.ts new file mode 100644 index 000000000..47c09b9ff --- /dev/null +++ b/apps/ensindexer/src/lib/ensindexer-client/singleton.ts @@ -0,0 +1,5 @@ +import config from "@/config"; + +import { EnsIndexerClient } from "@ensnode/ensnode-sdk"; + +export const ensIndexerClient = new EnsIndexerClient({ url: config.ensIndexerUrl }); diff --git a/apps/ensindexer/src/lib/indexing-status-builder/singleton.ts b/apps/ensindexer/src/lib/indexing-status-builder/singleton.ts new file mode 100644 index 000000000..d5d1f26dd --- /dev/null +++ b/apps/ensindexer/src/lib/indexing-status-builder/singleton.ts @@ -0,0 +1,5 @@ +import { localPonderClient } from "@/lib/local-ponder-client"; + +import { IndexingStatusBuilder } from "./indexing-status-builder"; + +export const indexingStatusBuilder = new IndexingStatusBuilder(localPonderClient); From b0a189d6591af671f0ad6a4633313d3ffb4ef6d5 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Sun, 1 Mar 2026 18:53:40 +0100 Subject: [PATCH 05/24] Create `EnsDbWriterWorker` --- .../ensdb-writer-worker.ts | 244 ++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts new file mode 100644 index 000000000..53aa07950 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts @@ -0,0 +1,244 @@ +import { getUnixTime, secondsToMilliseconds } from "date-fns"; + +import { + type CrossChainIndexingStatusSnapshot, + type Duration, + type EnsIndexerClient, + type EnsIndexerPublicConfig, + OmnichainIndexingStatusIds, + validateEnsIndexerPublicConfigCompatibility, +} from "@ensnode/ensnode-sdk"; + +import type { EnsDbClient } from "@/lib/ensdb-client/ensdb-client"; +// TODO: replace with ENSNode SDK export when available (see PR 1701) +import { buildCrossChainIndexingStatusSnapshotOmnichain } from "@/lib/indexing-status-builder/cross-chain-indexing-status-snapshot"; +import type { IndexingStatusBuilder } from "@/lib/indexing-status-builder/indexing-status-builder"; + +/** + * Interval in seconds between two consecutive attempts to upsert + * the Indexing Status Snapshot record into ENSDb. + */ +const INDEXING_STATUS_RECORD_UPDATE_INTERVAL: Duration = 5; + +/** + * ENSDb Writer Worker + * + * A worker responsible for writing ENSIndexer-related data into ENSDb, including: + * - ENSDb version + * - ENSIndexer Public Config + * - ENSIndexer Indexing Status Snapshots + */ +export class EnsDbWriterWorker { + /** + * Indicates whether the worker is stopped. When `true`, all recurring tasks + * in the worker stop functioning. The worker can be stopped by calling + * the {@link stop} method. + */ + private isStopped = false; + + /** + * ENSDb Client instance used by the worker to interact with ENSDb. + */ + private ensDbClient: EnsDbClient; + + /** + * ENSIndexer Client instance used by the worker to read ENSIndexer Public Config. + */ + private ensIndexerClient: EnsIndexerClient; + + /** + * Indexing Status Builder instance used by the worker to read ENSIndexer Indexing Status. + */ + private indexingStatusBuilder: IndexingStatusBuilder; + + /** + * @param ensDbClient ENSDb Client instance used by the worker to interact with ENSDb. + * @param ensIndexerClient ENSIndexer Client instance used by the worker to read ENSIndexer Public Config. + * @param indexingStatusBuilder Indexing Status Builder instance used by the worker to read ENSIndexer Indexing Status. + */ + constructor( + ensDbClient: EnsDbClient, + ensIndexerClient: EnsIndexerClient, + indexingStatusBuilder: IndexingStatusBuilder, + ) { + this.ensDbClient = ensDbClient; + this.ensIndexerClient = ensIndexerClient; + this.indexingStatusBuilder = indexingStatusBuilder; + } + + /** + * Run the ENSDb Writer Worker + * + * The worker performs the following tasks: + * 1) A single attempt to upsert ENSDb version into ENSDb. + * 2) A single attempt to upsert serialized representation of + * {@link EnsIndexerPublicConfig} into ENSDb. + * 3) A recurring attempt to upsert serialized representation of + * {@link CrossChainIndexingStatusSnapshot} into ENSDb. + */ + public async run(): Promise { + const inMemoryConfig = await this.getValidatedEnsIndexerPublicConfig(); + + // Task 1: upsert ENSDb version into ENSDb. + console.log(`[EnsDbWriterWorker]: Upserting ENSDb version into ENSDb...`); + await this.ensDbClient.upsertEnsDbVersion(inMemoryConfig.versionInfo.ensDb); + console.log( + `[EnsDbWriterWorker]: ENSDb version upserted successfully: ${inMemoryConfig.versionInfo.ensDb}`, + ); + + // Task 2: upsert of EnsIndexerPublicConfig into ENSDb. + console.log(`[EnsDbWriterWorker]: Upserting ENSIndexer Public Config into ENSDb...`); + await this.ensDbClient.upsertEnsIndexerPublicConfig(inMemoryConfig); + console.log(`[EnsDbWriterWorker]: ENSIndexer Public Config upserted successfully`); + + // Task 3: recurring upsert of Indexing Status Snapshot into ENSDb. + const indexingStatusSnapshotStream = this.getIndexingStatusSnapshotStream(); + + for await (const snapshot of indexingStatusSnapshotStream) { + await this.ensDbClient.upsertIndexingStatusSnapshot(snapshot); + } + } + + /** + * Run the ENSDb Writer Worker with retry behavior. + * + * Retries the {@link run} method up to `maxRetries` times if it throws, + * waiting the same interval used for indexing status updates between attempts. + * + * @param maxRetries Maximum number of attempts before throwing. + * @throws Error if the number of attempts exceeds `maxRetries` or if the worker is stopped before a successful run. + */ + public async runWithRetries({ maxRetries }: { maxRetries: number }): Promise { + let attempt = 0; + + while (!this.isStopped) { + attempt += 1; + + try { + await this.run(); + return; + } catch (error) { + if (attempt >= maxRetries) { + throw new Error(`ENSDb Writer Worker failed after ${attempt} attempts.`, { + cause: error, + }); + } + + await new Promise((resolve) => + setTimeout(resolve, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL)), + ); + } + } + + throw new Error("ENSDb Writer Worker could not process all tasks successfully."); + } + + /** + * Stop the ENSDb Writer Worker + * + * Stops all recurring tasks in the worker. + */ + public stop(): void { + this.isStopped = true; + } + + /** + * Get validated ENSIndexer Public Config object for the ENSDb Writer Worker. + * + * The function retrieves the ENSIndexer Public Config object from both: + * - stored config in ENSDb, if available, and + * - in-memory config from ENSIndexer Client. + * + * If, and only if, a stored config is available in ENSDb, then the function + * validates the compatibility of the in-memory config object against + * the stored one. Validation criteria are defined in the function body. + * + * @returns In-memory config object, if the validation is successful or + * if there is no stored config. + * @throws Error if the in-memory config object is incompatible with + * the stored one. + */ + private async getValidatedEnsIndexerPublicConfig(): Promise { + const [storedConfig, inMemoryConfig] = await Promise.all([ + this.ensDbClient.getEnsIndexerPublicConfig(), + this.ensIndexerClient.config(), + ]); + + // Validate in-memory config object compatibility with the stored one, + // if the stored one is available + if (storedConfig) { + try { + validateEnsIndexerPublicConfigCompatibility(storedConfig, inMemoryConfig); + } catch (error) { + const errorMessage = `In-memory ENSIndexer Public Config object is not compatible with its counterpart stored in ENSDb.`; + + console.error(`[EnsDbWriterWorker]: ${errorMessage}`); + + // Throw the error to terminate the ENSIndexer process due to + // found config incompatibility + throw new Error(errorMessage, { + cause: error, + }); + } + } + + return inMemoryConfig; + } + + /** + * Get Indexing Status Snapshot Stream + * + * An async generator function that yields validated Indexing Status Snapshots + * retrieved from Indexing Status Builder at a regular interval defined by + * `INDEXING_STATUS_RECORD_UPDATE_INTERVAL`. Validation criteria are defined + * in the function body. The generator stops yielding snapshots when the + * worker is stopped. + * + * Note: failure to retrieve the Indexing Status from Indexing Status Builder + * or failure to validate the retrieved Indexing Status Snapshot does not + * cause the generator to throw an error. Instead, the generator continues + * with the next attempt after the specified delay. + * + * @yields validated Indexing Status Snapshots retrieved from Indexing Status Builder. + * Validation criteria are defined in the function body. + * @returns void when the worker is stopped. + */ + private async *getIndexingStatusSnapshotStream(): AsyncGenerator { + while (!this.isStopped) { + try { + // get system timestamp for the current iteration of the loop + const snapshotTime = getUnixTime(new Date()); + + const omnichainSnapshot = + await this.indexingStatusBuilder.getOmnichainIndexingStatusSnapshot(); + + // Invariant: the Omnichain Status must indicate that indexing has started already. + if (omnichainSnapshot.omnichainStatus === OmnichainIndexingStatusIds.Unstarted) { + throw new Error("Omnichain Status must not be 'Unstarted'."); + } + + const crossChainSnapshot = buildCrossChainIndexingStatusSnapshotOmnichain( + omnichainSnapshot, + snapshotTime, + ); + + // Yield the validated indexing status snapshot + yield crossChainSnapshot; + } catch (error) { + console.error( + `[EnsDbWriterWorker]: Error retrieving or validating Indexing Status Snapshot:`, + error, + ); + // Do not throw the error, as failure to retrieve the Indexing Status + // should not cause the ENSDb Writer Worker to stop functioning. + // Instead, continue with the next attempt after the delay. + } finally { + // Regardless of success or failure of the attempt to retrieve the Indexing Status, + // wait for the specified interval before the next attempt. + await new Promise((resolve) => + setTimeout(resolve, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL)), + ); + } + } + } +} From ef35eeb2e53746120a9e710ea2ea6237ff471ab1 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Sun, 1 Mar 2026 18:53:54 +0100 Subject: [PATCH 06/24] Cover `EnsDbWriterWorker` with unit tests` --- .../ensdb-writer-worker.test.ts | 252 ++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts new file mode 100644 index 000000000..4556bf247 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts @@ -0,0 +1,252 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { + type CrossChainIndexingStatusSnapshot, + CrossChainIndexingStrategyIds, + type EnsIndexerClient, + type EnsIndexerPublicConfig, + OmnichainIndexingStatusIds, + type OmnichainIndexingStatusSnapshot, + validateEnsIndexerPublicConfigCompatibility, +} from "@ensnode/ensnode-sdk"; + +import type { EnsDbClient } from "@/lib/ensdb-client/ensdb-client"; +import { publicConfig } from "@/lib/ensdb-client/ensdb-client.mock"; +import { EnsDbWriterWorker } from "@/lib/ensdb-writer-worker/ensdb-writer-worker"; +import { buildCrossChainIndexingStatusSnapshotOmnichain } from "@/lib/indexing-status-builder/cross-chain-indexing-status-snapshot"; +import type { IndexingStatusBuilder } from "@/lib/indexing-status-builder/indexing-status-builder"; + +vi.mock("@ensnode/ensnode-sdk", async () => { + const actual = + await vi.importActual("@ensnode/ensnode-sdk"); + + return { + ...actual, + validateEnsIndexerPublicConfigCompatibility: vi.fn(), + }; +}); + +vi.mock("@/lib/indexing-status-builder/cross-chain-indexing-status-snapshot", () => ({ + buildCrossChainIndexingStatusSnapshotOmnichain: vi.fn(), +})); + +describe("EnsDbWriterWorker", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + it("upserts version, config, and indexing status snapshots", async () => { + // arrange + const omnichainSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 100, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const snapshot = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 100, + snapshotTime: 200, + omnichainSnapshot, + } as CrossChainIndexingStatusSnapshot; + + const buildSnapshot = vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain); + buildSnapshot.mockReturnValue(snapshot); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn(), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn().mockResolvedValue(omnichainSnapshot), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + vi.mocked(ensDbClient.upsertIndexingStatusSnapshot).mockImplementation(async () => { + worker.stop(); + }); + + // act + const runPromise = worker.run(); + + await vi.runAllTimersAsync(); + await runPromise; + + // assert + expect(ensDbClient.upsertEnsDbVersion).toHaveBeenCalledWith(publicConfig.versionInfo.ensDb); + expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(publicConfig); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(snapshot); + expect(buildSnapshot).toHaveBeenCalledWith(omnichainSnapshot, expect.any(Number)); + }); + + it("throws when stored config is incompatible", async () => { + // arrange + const incompatibleError = new Error("incompatible"); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(publicConfig), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig as EnsIndexerPublicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + vi.mocked(validateEnsIndexerPublicConfigCompatibility).mockImplementation(() => { + throw incompatibleError; + }); + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act + await expect(worker.run()).rejects.toThrow( + "In-memory ENSIndexer Public Config object is not compatible with its counterpart stored in ENSDb.", + ); + + // assert + expect(ensDbClient.upsertEnsDbVersion).not.toHaveBeenCalled(); + }); + + it("continues after snapshot validation errors", async () => { + // arrange + const unstartedSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Unstarted, + } as OmnichainIndexingStatusSnapshot; + + const validSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 200, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const snapshot = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 200, + snapshotTime: 300, + omnichainSnapshot: validSnapshot, + } as CrossChainIndexingStatusSnapshot; + + vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(snapshot); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn(), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi + .fn() + .mockResolvedValueOnce(unstartedSnapshot) + .mockResolvedValueOnce(validSnapshot), + } as unknown as IndexingStatusBuilder; + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + vi.mocked(ensDbClient.upsertIndexingStatusSnapshot).mockImplementation(async () => { + worker.stop(); + }); + + // act + const runPromise = worker.run(); + + await vi.runOnlyPendingTimersAsync(); + await vi.runOnlyPendingTimersAsync(); + await runPromise; + + // assert + expect(indexingStatusBuilder.getOmnichainIndexingStatusSnapshot).toHaveBeenCalledTimes(2); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(snapshot); + }); + + it("retries run until success", async () => { + // arrange + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + const runSpy = vi + .spyOn(worker, "run") + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValueOnce(undefined); + + // act + const runPromise = worker.runWithRetries({ maxRetries: 2 }); + + await vi.runOnlyPendingTimersAsync(); + await runPromise; + + // assert + expect(runSpy).toHaveBeenCalledTimes(2); + }); + + it("throws after exceeding max retries", async () => { + // arrange + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + const error = new Error("still failing"); + vi.spyOn(worker, "run").mockRejectedValue(error); + + // act + const runPromise = worker.runWithRetries({ maxRetries: 2 }); + const rejection = expect(runPromise).rejects.toThrow( + "ENSDb Writer Worker failed after 2 attempts.", + ); + + await vi.runAllTimersAsync(); + + // assert + await rejection; + }); +}); From 42332321a9cb8df73585413822ad770e89aaff26 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Sun, 1 Mar 2026 18:59:13 +0100 Subject: [PATCH 07/24] Integrate `EnsDbWriterWorker` with ENSIndexer runtime Uses retries to ensure that worker has a chance to perform its tasks, regardless of transient network issues. --- .../ponder/src/api/handlers/ensnode-api.ts | 12 ++++++++++++ .../src/lib/ensdb-writer-worker/singleton.ts | 11 +++++++++++ 2 files changed, 23 insertions(+) create mode 100644 apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts diff --git a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts index 33281c43e..6f509984f 100644 --- a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts +++ b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts @@ -14,11 +14,23 @@ import { } from "@ensnode/ensnode-sdk"; import { buildENSIndexerPublicConfig } from "@/config/public"; +import { ensDbWriterWorker } from "@/lib/ensdb-writer-worker/singleton"; import { createCrossChainIndexingStatusSnapshotOmnichain } from "@/lib/indexing-status/build-index-status"; import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; const app = new Hono(); +// Start ENSDb Writer Worker with retries and error handling +ensDbWriterWorker.runWithRetries({ maxRetries: 3 }).catch((error) => { + // Stop the worker to prevent further attempts to upsert data into ENSDb. + ensDbWriterWorker.stop(); + + console.error("Error running ENSDb Writer Worker:", error); + + // Trigger graceful shutdown of Ponder app by throwing an error from the top-level scope of the module. + throw error; +}); + // include ENSIndexer Public Config endpoint app.get("/config", async (c) => { // prepare the public config object, including dependency info diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts new file mode 100644 index 000000000..20f640197 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -0,0 +1,11 @@ +import { ensDbClient } from "@/lib/ensdb-client/singleton"; +import { ensIndexerClient } from "@/lib/ensindexer-client/singleton"; +import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; + +import { EnsDbWriterWorker } from "./ensdb-writer-worker"; + +export const ensDbWriterWorker = new EnsDbWriterWorker( + ensDbClient, + ensIndexerClient, + indexingStatusBuilder, +); From 09d4cfeb6715823e39c933177fe8b169e95bd75d Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Mon, 2 Mar 2026 14:42:34 +0100 Subject: [PATCH 08/24] Improve ENSDb Writer Worker lifecycle management Implement `AbortSignal`, and ensure only one worker instance can be started. --- .../ponder/src/api/handlers/ensnode-api.ts | 13 +-- .../ensdb-writer-worker.ts | 87 +++++++++++++++---- .../src/lib/ensdb-writer-worker/singleton.ts | 39 +++++++-- 3 files changed, 104 insertions(+), 35 deletions(-) diff --git a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts index 2e0af34ed..2fd0d7f27 100644 --- a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts +++ b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts @@ -14,21 +14,12 @@ import { } from "@ensnode/ensnode-sdk"; import { buildENSIndexerPublicConfig } from "@/config/public"; -import { ensDbWriterWorker } from "@/lib/ensdb-writer-worker/singleton"; +import { startEnsDbWriterWorker } from "@/lib/ensdb-writer-worker/singleton"; import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; const app = new Hono(); -// Start ENSDb Writer Worker with retries and error handling -ensDbWriterWorker.runWithRetries({ maxRetries: 3 }).catch((error) => { - // Stop the worker to prevent further attempts to upsert data into ENSDb. - ensDbWriterWorker.stop(); - - console.error("Error running ENSDb Writer Worker:", error); - - // Trigger graceful shutdown of Ponder app by throwing an error from the top-level scope of the module. - throw error; -}); +startEnsDbWriterWorker(); // include ENSIndexer Public Config endpoint app.get("/config", async (c) => { diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts index 35b74ae03..1f1ba7741 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts @@ -29,11 +29,9 @@ const INDEXING_STATUS_RECORD_UPDATE_INTERVAL: Duration = 5; */ export class EnsDbWriterWorker { /** - * Indicates whether the worker is stopped. When `true`, all recurring tasks - * in the worker stop functioning. The worker can be stopped by calling - * the {@link stop} method. + * AbortController instance used to signal the worker to stop its recurring tasks. */ - private isStopped = false; + private abortController: AbortController = new AbortController(); /** * ENSDb Client instance used by the worker to interact with ENSDb. @@ -76,6 +74,7 @@ export class EnsDbWriterWorker { * {@link CrossChainIndexingStatusSnapshot} into ENSDb. */ public async run(): Promise { + // Fetch data required for task 1 and task 2. const inMemoryConfig = await this.getValidatedEnsIndexerPublicConfig(); // Task 1: upsert ENSDb version into ENSDb. @@ -94,7 +93,9 @@ export class EnsDbWriterWorker { const indexingStatusSnapshotStream = this.getIndexingStatusSnapshotStream(); for await (const snapshot of indexingStatusSnapshotStream) { + console.log(`[EnsDbWriterWorker]: Upserting Indexing Status Snapshot into ENSDb...`); await this.ensDbClient.upsertIndexingStatusSnapshot(snapshot); + console.log(`[EnsDbWriterWorker]: Indexing Status Snapshot upserted successfully`); } } @@ -104,10 +105,10 @@ export class EnsDbWriterWorker { * Retries the {@link run} method up to `maxRetries` times if it throws, * waiting the same interval used for indexing status updates between attempts. * - * @param maxRetries Maximum number of attempts before throwing. + * @param options.maxRetries Maximum number of attempts before throwing. * @throws Error if the number of attempts exceeds `maxRetries` or if the worker is stopped before a successful run. */ - public async runWithRetries({ maxRetries }: { maxRetries: number }): Promise { + public async runWithRetries(options: { maxRetries: number }): Promise { let attempt = 0; while (!this.isStopped) { @@ -117,19 +118,30 @@ export class EnsDbWriterWorker { await this.run(); return; } catch (error) { - if (attempt >= maxRetries) { + if (this.isStopped) { + return; + } + + console.error(`[EnsDbWriterWorker]: Error in run attempt #${attempt}:`, error); + + if (attempt >= options.maxRetries) { throw new Error(`ENSDb Writer Worker failed after ${attempt} attempts.`, { cause: error, }); } - await new Promise((resolve) => - setTimeout(resolve, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL)), - ); + // Wait for the configured delay before the next attempt. This also + // ensures that if the worker is stopped while waiting, it will + // not start a new attempt. + await this.nextTryDelay(); } } - throw new Error("ENSDb Writer Worker could not process all tasks successfully."); + // If the loop exits due to the worker being stopped, + // ensure that we do not treat it as an error case. + if (!this.isStopped) { + throw new Error("ENSDb Writer Worker could not process all tasks successfully."); + } } /** @@ -138,7 +150,16 @@ export class EnsDbWriterWorker { * Stops all recurring tasks in the worker. */ public stop(): void { - this.isStopped = true; + if (!this.isStopped) { + this.abortController.abort(); + } + } + + /** + * Indicates whether the ENSDb Writer Worker is stopped. + */ + get isStopped(): boolean { + return this.abortController.signal.aborted; } /** @@ -189,9 +210,8 @@ export class EnsDbWriterWorker { * * An async generator function that yields validated Indexing Status Snapshots * retrieved from Indexing Status Builder at a regular interval defined by - * `INDEXING_STATUS_RECORD_UPDATE_INTERVAL`. Validation criteria are defined - * in the function body. The generator stops yielding snapshots when the - * worker is stopped. + * `INDEXING_STATUS_RECORD_UPDATE_INTERVAL`. The generator stops yielding + * snapshots when the worker is stopped. * * Note: failure to retrieve the Indexing Status from Indexing Status Builder * or failure to validate the retrieved Indexing Status Snapshot does not @@ -233,11 +253,40 @@ export class EnsDbWriterWorker { // Instead, continue with the next attempt after the delay. } finally { // Regardless of success or failure of the attempt to retrieve the Indexing Status, - // wait for the specified interval before the next attempt. - await new Promise((resolve) => - setTimeout(resolve, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL)), - ); + // wait for the configured delay before the next attempt. + await this.nextTryDelay(); } } } + + /** + * Resolves after the configured interval, or immediately if the worker + * has been stopped. Prevents hanging on shutdown mid-wait. + */ + private nextTryDelay(): Promise { + return new Promise((resolve) => { + // Do not delay if the worker is already stopped, to allow for prompt shutdown. + if (this.isStopped) { + return resolve(); + } + + // When abort signal is received, + // clear the timeout and resolve immediately to allow for prompt shutdown. + const onAbort = (): void => { + clearTimeout(timeout); + resolve(); + }; + + const timeout = setTimeout(() => { + // Clear the abort event listener after the timeout completes, to prevent memory leaks. + this.abortController.signal.removeEventListener("abort", onAbort); + + resolve(); + }, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL)); + + // If the worker is stopped while waiting, + // resolve immediately to allow for prompt shutdown. + this.abortController.signal.addEventListener("abort", onAbort, { once: true }); + }); + } } diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts index 20f640197..6d504af2b 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -4,8 +4,37 @@ import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; import { EnsDbWriterWorker } from "./ensdb-writer-worker"; -export const ensDbWriterWorker = new EnsDbWriterWorker( - ensDbClient, - ensIndexerClient, - indexingStatusBuilder, -); +let ensDbWriterWorker: EnsDbWriterWorker; + +/** + * Starts the EnsDbWriterWorker in a new asynchronous context. + * + * The worker will run indefinitely until its internal AbortSignal is triggered, + * for example due to a process termination signal or an internal error, at + * which point it will attempt to gracefully shut down. + */ +export function startEnsDbWriterWorker() { + if (typeof ensDbWriterWorker !== "undefined") { + throw new Error("EnsDbWriterWorker has already been started"); + } + + ensDbWriterWorker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + const ensDbWriterWorkerRun = ensDbWriterWorker.runWithRetries({ maxRetries: 3 }); + + // Handle any uncaught errors from the worker + ensDbWriterWorkerRun.catch((error) => { + // Abort the worker on error to trigger cleanup + ensDbWriterWorker.stop(); + + console.error("EnsDbWriterWorker encountered an error:", error); + + // Re-throw the error to ensure the application shuts down with a non-zero exit code. + process.exitCode = 1; + throw error; + }); + + // Handle graceful shutdown on process termination signals + process.on("SIGINT", () => ensDbWriterWorker.stop()); + process.on("SIGTERM", () => ensDbWriterWorker.stop()); +} From e8e5d018c8536a77952f605ad0af7fef3620dd4e Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Mon, 2 Mar 2026 14:45:49 +0100 Subject: [PATCH 09/24] docs(changeset): Introduced `EnsDbClient` and `EnsDbWriterWorker` to enable storing ENSNode metadata in ENSDb. --- .changeset/afraid-ducks-strive.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/afraid-ducks-strive.md diff --git a/.changeset/afraid-ducks-strive.md b/.changeset/afraid-ducks-strive.md new file mode 100644 index 000000000..fad2cf8b8 --- /dev/null +++ b/.changeset/afraid-ducks-strive.md @@ -0,0 +1,5 @@ +--- +"ensindexer": minor +--- + +Introduced `EnsDbClient` and `EnsDbWriterWorker` to enable storing ENSNode metadata in ENSDb. From bdb9da303e5aaabeafbc4d62cc1e3dd8a0fe4472 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Mon, 2 Mar 2026 15:59:37 +0100 Subject: [PATCH 10/24] Apply AI PR feedback --- .../src/lib/ensdb-writer-worker/ensdb-writer-worker.ts | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts index 1f1ba7741..e7bf05855 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts @@ -106,7 +106,7 @@ export class EnsDbWriterWorker { * waiting the same interval used for indexing status updates between attempts. * * @param options.maxRetries Maximum number of attempts before throwing. - * @throws Error if the number of attempts exceeds `maxRetries` or if the worker is stopped before a successful run. + * @throws Error if the number of attempts exceeds `maxRetries`. */ public async runWithRetries(options: { maxRetries: number }): Promise { let attempt = 0; @@ -136,12 +136,6 @@ export class EnsDbWriterWorker { await this.nextTryDelay(); } } - - // If the loop exits due to the worker being stopped, - // ensure that we do not treat it as an error case. - if (!this.isStopped) { - throw new Error("ENSDb Writer Worker could not process all tasks successfully."); - } } /** From cfd0508b3d0c09809051979562c9175b360a826a Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 07:07:41 +0100 Subject: [PATCH 11/24] Update code docs as per PR feedback --- .changeset/afraid-ducks-strive.md | 2 +- apps/ensindexer/src/lib/ensdb-client/drizzle.ts | 6 ++++-- apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.changeset/afraid-ducks-strive.md b/.changeset/afraid-ducks-strive.md index fad2cf8b8..134592ec8 100644 --- a/.changeset/afraid-ducks-strive.md +++ b/.changeset/afraid-ducks-strive.md @@ -2,4 +2,4 @@ "ensindexer": minor --- -Introduced `EnsDbClient` and `EnsDbWriterWorker` to enable storing ENSNode metadata in ENSDb. +Introduced `EnsDbClient` and `EnsDbWriterWorker` to enable storing metadata in ENSDb. diff --git a/apps/ensindexer/src/lib/ensdb-client/drizzle.ts b/apps/ensindexer/src/lib/ensdb-client/drizzle.ts index cee05366b..b5cc9b5c7 100644 --- a/apps/ensindexer/src/lib/ensdb-client/drizzle.ts +++ b/apps/ensindexer/src/lib/ensdb-client/drizzle.ts @@ -1,5 +1,7 @@ -// This file was copied 1-to-1 from ENSApi. - +// This file is based on `packages/ponder-subgraph/src/drizzle.ts` file. +// We currently duplicate the makeDrizzle function, as we don't have +// a shared package for backend code yet. When we do, we can move +// this function to the shared package and import it in both places. import { setDatabaseSchema } from "@ponder/client"; import { drizzle } from "drizzle-orm/node-postgres"; diff --git a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts index c23161d5b..25dddb806 100644 --- a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts +++ b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts @@ -41,7 +41,7 @@ interface DrizzleDb extends NodePgDatabase {} * * This client exists to provide an abstraction layer for interacting with ENSDb. * It enables ENSIndexer and ENSApi to decouple from each other, and use - * the ENSDb Client as the integration point between the two. + * ENSDb as the integration point between the two (via ENSDb Client). * * Enables querying and mutating ENSDb data, such as: * - ENSDb version From e1aa937518d2298d004375dcb36264b8c602c9df Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 08:32:43 +0100 Subject: [PATCH 12/24] Create a separate entrypoint file for ENSDb Writer Worker --- .../ponder/src/api/ensdb-writer-worker-entrypoint.ts | 9 +++++++++ apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts | 3 --- apps/ensindexer/ponder/src/api/index.ts | 1 + 3 files changed, 10 insertions(+), 3 deletions(-) create mode 100644 apps/ensindexer/ponder/src/api/ensdb-writer-worker-entrypoint.ts diff --git a/apps/ensindexer/ponder/src/api/ensdb-writer-worker-entrypoint.ts b/apps/ensindexer/ponder/src/api/ensdb-writer-worker-entrypoint.ts new file mode 100644 index 000000000..3b831f80f --- /dev/null +++ b/apps/ensindexer/ponder/src/api/ensdb-writer-worker-entrypoint.ts @@ -0,0 +1,9 @@ +// This file is the entry point for the ENSDb Writer Worker. +// It must be placed inside the `api` directory of the Ponder app to avoid +// the following build issue: +// Error: Invalid dependency graph. Config, schema, and indexing function files +// cannot import objects from the API function file "src/api/index.ts". + +import { startEnsDbWriterWorker } from "@/lib/ensdb-writer-worker/singleton"; + +startEnsDbWriterWorker(); diff --git a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts index 2fd0d7f27..58c4b015d 100644 --- a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts +++ b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts @@ -14,13 +14,10 @@ import { } from "@ensnode/ensnode-sdk"; import { buildENSIndexerPublicConfig } from "@/config/public"; -import { startEnsDbWriterWorker } from "@/lib/ensdb-writer-worker/singleton"; import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; const app = new Hono(); -startEnsDbWriterWorker(); - // include ENSIndexer Public Config endpoint app.get("/config", async (c) => { // prepare the public config object, including dependency info diff --git a/apps/ensindexer/ponder/src/api/index.ts b/apps/ensindexer/ponder/src/api/index.ts index 70f0703f4..7dc40bd71 100644 --- a/apps/ensindexer/ponder/src/api/index.ts +++ b/apps/ensindexer/ponder/src/api/index.ts @@ -6,6 +6,7 @@ import { cors } from "hono/cors"; import type { ErrorResponse } from "@ensnode/ensnode-sdk"; import ensNodeApi from "./handlers/ensnode-api"; +import "./ensdb-writer-worker-entrypoint"; const app = new Hono(); From 8ebe95f76c9b54e0d07e8a83d90aefe7b498854c Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 08:33:36 +0100 Subject: [PATCH 13/24] Simplify ENSDb Writer Worker Applies background tasks approach known from `SWRCache` implementation. --- .../ensdb-writer-worker.ts | 183 +++++------------- 1 file changed, 52 insertions(+), 131 deletions(-) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts index e7bf05855..00ad1aa71 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts @@ -7,6 +7,7 @@ import { type EnsIndexerClient, type EnsIndexerPublicConfig, OmnichainIndexingStatusIds, + type OmnichainIndexingStatusSnapshot, validateEnsIndexerPublicConfigCompatibility, } from "@ensnode/ensnode-sdk"; @@ -17,21 +18,21 @@ import type { IndexingStatusBuilder } from "@/lib/indexing-status-builder/indexi * Interval in seconds between two consecutive attempts to upsert * the Indexing Status Snapshot record into ENSDb. */ -const INDEXING_STATUS_RECORD_UPDATE_INTERVAL: Duration = 5; +const INDEXING_STATUS_RECORD_UPDATE_INTERVAL: Duration = 1; /** * ENSDb Writer Worker * - * A worker responsible for writing ENSIndexer-related data into ENSDb, including: + * A worker responsible for writing ENSIndexer-related metadata into ENSDb, including: * - ENSDb version * - ENSIndexer Public Config * - ENSIndexer Indexing Status Snapshots */ export class EnsDbWriterWorker { /** - * AbortController instance used to signal the worker to stop its recurring tasks. + * Interval for recurring upserts of Indexing Status Snapshots into ENSDb. */ - private abortController: AbortController = new AbortController(); + private indexingStatusInterval: ReturnType | null = null; /** * ENSDb Client instance used by the worker to interact with ENSDb. @@ -72,6 +73,9 @@ export class EnsDbWriterWorker { * {@link EnsIndexerPublicConfig} into ENSDb. * 3) A recurring attempt to upsert serialized representation of * {@link CrossChainIndexingStatusSnapshot} into ENSDb. + * + * @throws Error if the in-memory ENSIndexer Public Config is incompatible + * with the stored one in ENSDb (if available). */ public async run(): Promise { // Fetch data required for task 1 and task 2. @@ -90,52 +94,10 @@ export class EnsDbWriterWorker { console.log(`[EnsDbWriterWorker]: ENSIndexer Public Config upserted successfully`); // Task 3: recurring upsert of Indexing Status Snapshot into ENSDb. - const indexingStatusSnapshotStream = this.getIndexingStatusSnapshotStream(); - - for await (const snapshot of indexingStatusSnapshotStream) { - console.log(`[EnsDbWriterWorker]: Upserting Indexing Status Snapshot into ENSDb...`); - await this.ensDbClient.upsertIndexingStatusSnapshot(snapshot); - console.log(`[EnsDbWriterWorker]: Indexing Status Snapshot upserted successfully`); - } - } - - /** - * Run the ENSDb Writer Worker with retry behavior. - * - * Retries the {@link run} method up to `maxRetries` times if it throws, - * waiting the same interval used for indexing status updates between attempts. - * - * @param options.maxRetries Maximum number of attempts before throwing. - * @throws Error if the number of attempts exceeds `maxRetries`. - */ - public async runWithRetries(options: { maxRetries: number }): Promise { - let attempt = 0; - - while (!this.isStopped) { - attempt += 1; - - try { - await this.run(); - return; - } catch (error) { - if (this.isStopped) { - return; - } - - console.error(`[EnsDbWriterWorker]: Error in run attempt #${attempt}:`, error); - - if (attempt >= options.maxRetries) { - throw new Error(`ENSDb Writer Worker failed after ${attempt} attempts.`, { - cause: error, - }); - } - - // Wait for the configured delay before the next attempt. This also - // ensures that if the worker is stopped while waiting, it will - // not start a new attempt. - await this.nextTryDelay(); - } - } + this.indexingStatusInterval = setInterval( + () => this.upsertIndexingStatusSnapshot(), + secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL), + ); } /** @@ -144,18 +106,12 @@ export class EnsDbWriterWorker { * Stops all recurring tasks in the worker. */ public stop(): void { - if (!this.isStopped) { - this.abortController.abort(); + if (this.indexingStatusInterval) { + clearInterval(this.indexingStatusInterval); + this.indexingStatusInterval = null; } } - /** - * Indicates whether the ENSDb Writer Worker is stopped. - */ - get isStopped(): boolean { - return this.abortController.signal.aborted; - } - /** * Get validated ENSIndexer Public Config object for the ENSDb Writer Worker. * @@ -184,9 +140,11 @@ export class EnsDbWriterWorker { try { validateEnsIndexerPublicConfigCompatibility(storedConfig, inMemoryConfig); } catch (error) { - const errorMessage = `In-memory ENSIndexer Public Config object is not compatible with its counterpart stored in ENSDb.`; + const errorMessage = error instanceof Error ? error.message : "Unknown error"; - console.error(`[EnsDbWriterWorker]: ${errorMessage}`); + console.error( + `[EnsDbWriterWorker]: In-memory ENSIndexer Public Config object is not compatible with its counterpart stored in ENSDb. Cause: ${errorMessage}`, + ); // Throw the error to terminate the ENSIndexer process due to // found config incompatibility @@ -200,87 +158,50 @@ export class EnsDbWriterWorker { } /** - * Get Indexing Status Snapshot Stream - * - * An async generator function that yields validated Indexing Status Snapshots - * retrieved from Indexing Status Builder at a regular interval defined by - * `INDEXING_STATUS_RECORD_UPDATE_INTERVAL`. The generator stops yielding - * snapshots when the worker is stopped. + * Upsert the current Indexing Status Snapshot into ENSDb. * - * Note: failure to retrieve the Indexing Status from Indexing Status Builder - * or failure to validate the retrieved Indexing Status Snapshot does not - * cause the generator to throw an error. Instead, the generator continues - * with the next attempt after the specified delay. - * - * @yields validated Indexing Status Snapshots retrieved from Indexing Status Builder. - * Validation criteria are defined in the function body. - * @returns void when the worker is stopped. + * This method is called by the scheduler at regular intervals. + * Errors are logged but not thrown, to keep the worker running. */ - private async *getIndexingStatusSnapshotStream(): AsyncGenerator { - while (!this.isStopped) { - try { - // get system timestamp for the current iteration of the loop - const snapshotTime = getUnixTime(new Date()); + private async upsertIndexingStatusSnapshot(): Promise { + try { + // get system timestamp for the current iteration + const snapshotTime = getUnixTime(new Date()); - const omnichainSnapshot = - await this.indexingStatusBuilder.getOmnichainIndexingStatusSnapshot(); + const omnichainSnapshot = await this.getValidatedIndexingStatusSnapshot(); - // Invariant: the Omnichain Status must indicate that indexing has started already. - if (omnichainSnapshot.omnichainStatus === OmnichainIndexingStatusIds.Unstarted) { - throw new Error("Omnichain Status must not be 'Unstarted'."); - } + const crossChainSnapshot = buildCrossChainIndexingStatusSnapshotOmnichain( + omnichainSnapshot, + snapshotTime, + ); - const crossChainSnapshot = buildCrossChainIndexingStatusSnapshotOmnichain( - omnichainSnapshot, - snapshotTime, - ); - - // Yield the validated indexing status snapshot - yield crossChainSnapshot; - } catch (error) { - console.error( - `[EnsDbWriterWorker]: Error retrieving or validating Indexing Status Snapshot:`, - error, - ); - // Do not throw the error, as failure to retrieve the Indexing Status - // should not cause the ENSDb Writer Worker to stop functioning. - // Instead, continue with the next attempt after the delay. - } finally { - // Regardless of success or failure of the attempt to retrieve the Indexing Status, - // wait for the configured delay before the next attempt. - await this.nextTryDelay(); - } + console.log(`[EnsDbWriterWorker]: Upserting Indexing Status Snapshot into ENSDb...`); + await this.ensDbClient.upsertIndexingStatusSnapshot(crossChainSnapshot); + console.log(`[EnsDbWriterWorker]: Indexing Status Snapshot upserted successfully`); + } catch (error) { + console.error( + `[EnsDbWriterWorker]: Error retrieving or validating Indexing Status Snapshot:`, + error, + ); + // Do not throw the error, as failure to retrieve the Indexing Status + // should not cause the ENSDb Writer Worker to stop functioning. } } /** - * Resolves after the configured interval, or immediately if the worker - * has been stopped. Prevents hanging on shutdown mid-wait. + * Get validated Omnichain Indexing Status Snapshot + * + * @returns Validated Omnichain Indexing Status Snapshot. + * @throws Error if the Omnichain Indexing Status is not in expected status yet. */ - private nextTryDelay(): Promise { - return new Promise((resolve) => { - // Do not delay if the worker is already stopped, to allow for prompt shutdown. - if (this.isStopped) { - return resolve(); - } - - // When abort signal is received, - // clear the timeout and resolve immediately to allow for prompt shutdown. - const onAbort = (): void => { - clearTimeout(timeout); - resolve(); - }; + private async getValidatedIndexingStatusSnapshot(): Promise { + const omnichainSnapshot = await this.indexingStatusBuilder.getOmnichainIndexingStatusSnapshot(); - const timeout = setTimeout(() => { - // Clear the abort event listener after the timeout completes, to prevent memory leaks. - this.abortController.signal.removeEventListener("abort", onAbort); - - resolve(); - }, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL)); + // Invariant: the Omnichain Status must indicate that indexing has started already. + if (omnichainSnapshot.omnichainStatus === OmnichainIndexingStatusIds.Unstarted) { + throw new Error("Omnichain Status must not be 'Unstarted'."); + } - // If the worker is stopped while waiting, - // resolve immediately to allow for prompt shutdown. - this.abortController.signal.addEventListener("abort", onAbort, { once: true }); - }); + return omnichainSnapshot; } } From abe2d6c302db755534b72eeefa89f7b48f9c2f58 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 08:33:59 +0100 Subject: [PATCH 14/24] Update tests for ENSDb Writer Worker to follow simplified implementation --- .../ensdb-writer-worker.test.ts | 160 ++++++++++++------ 1 file changed, 106 insertions(+), 54 deletions(-) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts index df2dff1e7..b1ff45ced 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts @@ -36,7 +36,7 @@ describe("EnsDbWriterWorker", () => { vi.clearAllMocks(); }); - it("upserts version, config, and indexing status snapshots", async () => { + it("upserts version, config, and starts interval for indexing status snapshots", async () => { // arrange const omnichainSnapshot = { omnichainStatus: OmnichainIndexingStatusIds.Following, @@ -58,7 +58,7 @@ describe("EnsDbWriterWorker", () => { getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), - upsertIndexingStatusSnapshot: vi.fn(), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), } as unknown as EnsDbClient; const ensIndexerClient = { @@ -71,21 +71,22 @@ describe("EnsDbWriterWorker", () => { const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); - vi.mocked(ensDbClient.upsertIndexingStatusSnapshot).mockImplementation(async () => { - worker.stop(); - }); - - // act - const runPromise = worker.run(); - - await vi.runAllTimersAsync(); - await runPromise; + // act - run() returns immediately after setting up interval + await worker.run(); - // assert + // assert - verify initial upserts happened expect(ensDbClient.upsertEnsDbVersion).toHaveBeenCalledWith(publicConfig.versionInfo.ensDb); expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(publicConfig); + + // advance time to trigger interval + await vi.advanceTimersByTimeAsync(1000); + + // assert - snapshot should be upserted expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(snapshot); expect(buildSnapshot).toHaveBeenCalledWith(omnichainSnapshot, expect.any(Number)); + + // cleanup + worker.stop(); }); it("throws when stored config is incompatible", async () => { @@ -114,15 +115,13 @@ describe("EnsDbWriterWorker", () => { const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); // act - await expect(worker.run()).rejects.toThrow( - "In-memory ENSIndexer Public Config object is not compatible with its counterpart stored in ENSDb.", - ); + await expect(worker.run()).rejects.toThrow("incompatible"); // assert expect(ensDbClient.upsertEnsDbVersion).not.toHaveBeenCalled(); }); - it("continues after snapshot validation errors", async () => { + it("continues upserting after snapshot validation errors", async () => { // arrange const unstartedSnapshot = { omnichainStatus: OmnichainIndexingStatusIds.Unstarted, @@ -134,20 +133,20 @@ describe("EnsDbWriterWorker", () => { chains: {}, } as OmnichainIndexingStatusSnapshot; - const snapshot = { + const crossChainSnapshot = { strategy: CrossChainIndexingStrategyIds.Omnichain, slowestChainIndexingCursor: 200, snapshotTime: 300, omnichainSnapshot: validSnapshot, } as CrossChainIndexingStatusSnapshot; - vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(snapshot); + vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(crossChainSnapshot); const ensDbClient = { getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), - upsertIndexingStatusSnapshot: vi.fn(), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), } as unknown as EnsDbClient; const ensIndexerClient = { @@ -160,31 +159,42 @@ describe("EnsDbWriterWorker", () => { .mockResolvedValueOnce(unstartedSnapshot) .mockResolvedValueOnce(validSnapshot), } as unknown as IndexingStatusBuilder; + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); - vi.mocked(ensDbClient.upsertIndexingStatusSnapshot).mockImplementation(async () => { - worker.stop(); - }); + // act - run returns immediately + await worker.run(); - // act - const runPromise = worker.run(); + // first interval tick - should error but not throw + await vi.advanceTimersByTimeAsync(1000); - await vi.runOnlyPendingTimersAsync(); - await vi.runOnlyPendingTimersAsync(); - await runPromise; + // second interval tick - should succeed + await vi.advanceTimersByTimeAsync(1000); // assert expect(indexingStatusBuilder.getOmnichainIndexingStatusSnapshot).toHaveBeenCalledTimes(2); - expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(snapshot); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(1); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(crossChainSnapshot); + + // cleanup + worker.stop(); }); - it("retries run until success", async () => { + it("stops the interval when stop() is called", async () => { // arrange + const omnichainSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 100, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const upsertIndexingStatusSnapshot = vi.fn().mockResolvedValue(undefined); + const ensDbClient = { getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), - upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot, } as unknown as EnsDbClient; const ensIndexerClient = { @@ -192,33 +202,67 @@ describe("EnsDbWriterWorker", () => { } as unknown as EnsIndexerClient; const indexingStatusBuilder = { - getOmnichainIndexingStatusSnapshot: vi.fn(), + getOmnichainIndexingStatusSnapshot: vi.fn().mockResolvedValue(omnichainSnapshot), } as unknown as IndexingStatusBuilder; const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); - const runSpy = vi - .spyOn(worker, "run") - .mockRejectedValueOnce(new Error("boom")) - .mockResolvedValueOnce(undefined); - // act - const runPromise = worker.runWithRetries({ maxRetries: 2 }); + await worker.run(); + await vi.advanceTimersByTimeAsync(1000); - await vi.runOnlyPendingTimersAsync(); - await runPromise; + const callCountBeforeStop = upsertIndexingStatusSnapshot.mock.calls.length; - // assert - expect(runSpy).toHaveBeenCalledTimes(2); + worker.stop(); + + // advance time after stop + await vi.advanceTimersByTimeAsync(2000); + + // assert - no more calls after stop + expect(upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(callCountBeforeStop); }); - it("throws after exceeding max retries", async () => { + it("recovers from errors and continues upserting snapshots", async () => { // arrange + const snapshot1 = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 100, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const snapshot2 = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 200, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const crossChainSnapshot1 = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 100, + snapshotTime: 1000, + omnichainSnapshot: snapshot1, + } as CrossChainIndexingStatusSnapshot; + + const crossChainSnapshot2 = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 200, + snapshotTime: 2000, + omnichainSnapshot: snapshot2, + } as CrossChainIndexingStatusSnapshot; + + vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain) + .mockReturnValueOnce(crossChainSnapshot1) + .mockReturnValueOnce(crossChainSnapshot2); + const ensDbClient = { getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), - upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi + .fn() + .mockResolvedValueOnce(undefined) + .mockRejectedValueOnce(new Error("DB error")) + .mockResolvedValueOnce(undefined), } as unknown as EnsDbClient; const ensIndexerClient = { @@ -226,23 +270,31 @@ describe("EnsDbWriterWorker", () => { } as unknown as EnsIndexerClient; const indexingStatusBuilder = { - getOmnichainIndexingStatusSnapshot: vi.fn(), + getOmnichainIndexingStatusSnapshot: vi + .fn() + .mockResolvedValueOnce(snapshot1) + .mockResolvedValueOnce(snapshot2) + .mockResolvedValueOnce(snapshot2), } as unknown as IndexingStatusBuilder; const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); - const error = new Error("still failing"); - vi.spyOn(worker, "run").mockRejectedValue(error); - // act - const runPromise = worker.runWithRetries({ maxRetries: 2 }); - const rejection = expect(runPromise).rejects.toThrow( - "ENSDb Writer Worker failed after 2 attempts.", - ); + await worker.run(); - await vi.runAllTimersAsync(); + // first tick - succeeds + await vi.advanceTimersByTimeAsync(1000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(crossChainSnapshot1); - // assert - await rejection; + // second tick - fails with DB error, but continues + await vi.advanceTimersByTimeAsync(1000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenLastCalledWith(crossChainSnapshot2); + + // third tick - succeeds again + await vi.advanceTimersByTimeAsync(1000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(3); + + // cleanup + worker.stop(); }); }); From cfbb6d06bdbd8d01a739014f13d65a5cd4dbe104 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 08:35:36 +0100 Subject: [PATCH 15/24] Include `p-retry` tooling in ENSIndexer Initially, I wanted to implement some helpers myself, but then, after seeing AI PR feedback, I figured I was rebuilding `p-retry` in a way... Here is the (now closed) PR: https://github.com/namehash/ensnode/pull/1709 --- apps/ensapi/package.json | 2 +- apps/ensindexer/package.json | 1 + pnpm-lock.yaml | 16 +++++++++++----- pnpm-workspace.yaml | 1 + 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/apps/ensapi/package.json b/apps/ensapi/package.json index 8aefb096f..96453e26a 100644 --- a/apps/ensapi/package.json +++ b/apps/ensapi/package.json @@ -53,7 +53,7 @@ "graphql-yoga": "^5.16.0", "hono": "catalog:", "p-memoize": "^8.0.0", - "p-retry": "^7.1.0", + "p-retry": "catalog:", "pg-connection-string": "catalog:", "pino": "catalog:", "ponder-enrich-gql-docs-middleware": "^0.1.3", diff --git a/apps/ensindexer/package.json b/apps/ensindexer/package.json index 1ee36aa53..0afe00f24 100644 --- a/apps/ensindexer/package.json +++ b/apps/ensindexer/package.json @@ -36,6 +36,7 @@ "dns-packet": "^5.6.1", "drizzle-orm": "catalog:", "pg-connection-string": "catalog:", + "p-retry": "catalog:", "hono": "catalog:", "ponder": "catalog:", "viem": "catalog:", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cf1215722..fd89bd1ae 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -51,6 +51,9 @@ catalogs: lucide-react: specifier: ^0.548.0 version: 0.548.0 + p-retry: + specifier: 7.1.1 + version: 7.1.1 pg-connection-string: specifier: ^2.9.1 version: 2.9.1 @@ -397,8 +400,8 @@ importers: specifier: ^8.0.0 version: 8.0.0 p-retry: - specifier: ^7.1.0 - version: 7.1.0 + specifier: 'catalog:' + version: 7.1.1 pg-connection-string: specifier: 'catalog:' version: 2.9.1 @@ -490,6 +493,9 @@ importers: hono: specifier: 'catalog:' version: 4.12.2 + p-retry: + specifier: 'catalog:' + version: 7.1.1 pg-connection-string: specifier: 'catalog:' version: 2.9.1 @@ -6961,8 +6967,8 @@ packages: resolution: {integrity: sha512-aNZ+VfjobsWryoiPnEApGGmf5WmNsCo9xu8dfaYamG5qaLP7ClhLN6NgsFe6SwJ2UbLEBK5dv9x8Mn5+RVhMWQ==} engines: {node: '>=18'} - p-retry@7.1.0: - resolution: {integrity: sha512-xL4PiFRQa/f9L9ZvR4/gUCRNus4N8YX80ku8kv9Jqz+ZokkiZLM0bcvX0gm1F3PDi9SPRsww1BDsTWgE6Y1GLQ==} + p-retry@7.1.1: + resolution: {integrity: sha512-J5ApzjyRkkf601HpEeykoiCvzHQjWxPAHhyjFcEUP2SWq0+35NKh8TLhpLw+Dkq5TZBFvUM6UigdE9hIVYTl5w==} engines: {node: '>=20'} p-timeout@3.2.0: @@ -15925,7 +15931,7 @@ snapshots: eventemitter3: 5.0.1 p-timeout: 6.1.4 - p-retry@7.1.0: + p-retry@7.1.1: dependencies: is-network-error: 1.3.0 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index b1921e920..cad9881e8 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -23,6 +23,7 @@ catalog: pg-connection-string: ^2.9.1 pino: 10.1.0 ponder: 0.16.2 + p-retry: 7.1.1 tailwindcss: ^4.1.18 tailwindcss-animate: ^1.0.7 tailwind-merge: ^3.4.0 From 5d720c3cee9068d1083dc837a3aa9780d23e511f Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 11:18:28 +0100 Subject: [PATCH 16/24] Apply retries mechanism directly on the ENSIndexer Public Config fetch call Uses `p-retry` to save ourselves from re-implementing it from scratch (like we tried in PR 1709). --- .../src/lib/ensdb-client/ensdb-client.ts | 3 +- .../ensdb-writer-worker.ts | 61 ++++++++++++++++--- .../src/lib/ensdb-writer-worker/singleton.ts | 28 ++++----- 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts index 25dddb806..128cb87b2 100644 --- a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts +++ b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts @@ -146,7 +146,8 @@ export class EnsDbClient implements EnsDbClientQuery, EnsDbClientMutation { * Get ENSNode metadata record * * @returns selected record in ENSDb. - * @throws when exactly one matching metadata record was not found + * @throws when more than one matching metadata record is found + * (should be impossible given the PK constraint on 'key') */ private async getEnsNodeMetadata( metadata: Pick, diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts index 00ad1aa71..db05af1b4 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts @@ -1,4 +1,5 @@ import { getUnixTime, secondsToMilliseconds } from "date-fns"; +import pRetry from "p-retry"; import { buildCrossChainIndexingStatusSnapshotOmnichain, @@ -74,10 +75,16 @@ export class EnsDbWriterWorker { * 3) A recurring attempt to upsert serialized representation of * {@link CrossChainIndexingStatusSnapshot} into ENSDb. * - * @throws Error if the in-memory ENSIndexer Public Config is incompatible - * with the stored one in ENSDb (if available). + * @throws Error if the worker is already running, or + * if the in-memory ENSIndexer Public Config could not be fetched, or + * if the in-memory ENSIndexer Public Config is incompatible with the stored config in ENSDb. */ public async run(): Promise { + // Do not allow multiple concurrent runs of the worker + if (this.isRunning) { + throw new Error("EnsDbWriterWorker is already running"); + } + // Fetch data required for task 1 and task 2. const inMemoryConfig = await this.getValidatedEnsIndexerPublicConfig(); @@ -100,6 +107,13 @@ export class EnsDbWriterWorker { ); } + /** + * Indicates whether the ENSDb Writer Worker is currently running. + */ + get isRunning(): boolean { + return this.indexingStatusInterval !== null; + } + /** * Stop the ENSDb Writer Worker * @@ -125,14 +139,45 @@ export class EnsDbWriterWorker { * * @returns In-memory config object, if the validation is successful or * if there is no stored config. - * @throws Error if the in-memory config object is incompatible with - * the stored one. + * @throws Error if the in-memory config object cannot be fetched or, + * got fetched and is incompatible with the stored config object. */ private async getValidatedEnsIndexerPublicConfig(): Promise { - const [storedConfig, inMemoryConfig] = await Promise.all([ - this.ensDbClient.getEnsIndexerPublicConfig(), - this.ensIndexerClient.config(), - ]); + /** + * Fetch the in-memory config with retries, to handle potential transient errors + * in the ENSIndexer Client (e.g. due to network issues). If the fetch fails after + * the defined number of retries, the error will be thrown and the worker will not start, + * as the ENSIndexer Public Config is a critical dependency for the worker's tasks. + */ + const inMemoryConfigPromise = pRetry(() => this.ensIndexerClient.config(), { + retries: 3, + onFailedAttempt: ({ error, attemptNumber, retriesLeft }) => { + console.warn( + `ENSIndexer Config fetch attempt ${attemptNumber} failed (${error.message}). ${retriesLeft} retries left.`, + ); + }, + }); + + let storedConfig: EnsIndexerPublicConfig | undefined; + let inMemoryConfig: EnsIndexerPublicConfig; + + try { + [storedConfig, inMemoryConfig] = await Promise.all([ + this.ensDbClient.getEnsIndexerPublicConfig(), + inMemoryConfigPromise, + ]); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "Unknown error"; + + console.error( + `[EnsDbWriterWorker]: Failed to fetch ENSIndexer Public Config: ${errorMessage}`, + ); + + // Throw the error to terminate the ENSIndexer process due to failed fetch of critical dependency + throw new Error(errorMessage, { + cause: error, + }); + } // Validate in-memory config object compatibility with the stored one, // if the stored one is available diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts index 6d504af2b..8cfc6a5bc 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -12,27 +12,25 @@ let ensDbWriterWorker: EnsDbWriterWorker; * The worker will run indefinitely until its internal AbortSignal is triggered, * for example due to a process termination signal or an internal error, at * which point it will attempt to gracefully shut down. + * + * @throws Error if the worker run method throws an error during execution. */ export function startEnsDbWriterWorker() { - if (typeof ensDbWriterWorker !== "undefined") { - throw new Error("EnsDbWriterWorker has already been started"); - } - ensDbWriterWorker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); - const ensDbWriterWorkerRun = ensDbWriterWorker.runWithRetries({ maxRetries: 3 }); - - // Handle any uncaught errors from the worker - ensDbWriterWorkerRun.catch((error) => { - // Abort the worker on error to trigger cleanup - ensDbWriterWorker.stop(); + ensDbWriterWorker + .run() + // Handle any uncaught errors from the worker + .catch((error) => { + // Abort the worker on error to trigger cleanup + ensDbWriterWorker.stop(); - console.error("EnsDbWriterWorker encountered an error:", error); + console.error("EnsDbWriterWorker encountered an error:", error); - // Re-throw the error to ensure the application shuts down with a non-zero exit code. - process.exitCode = 1; - throw error; - }); + // Re-throw the error to ensure the application shuts down with a non-zero exit code. + process.exitCode = 1; + throw error; + }); // Handle graceful shutdown on process termination signals process.on("SIGINT", () => ensDbWriterWorker.stop()); From 548b0f7447e3d776e65b634a39dd337eb8aab5db Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 11:18:34 +0100 Subject: [PATCH 17/24] Update tests for ENSDb Writer Worker to follow simplified implementation --- .../ensdb-writer-worker.test.ts | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts index b1ff45ced..c97296eb7 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts @@ -26,6 +26,10 @@ vi.mock("@ensnode/ensnode-sdk", async () => { }; }); +vi.mock("p-retry", () => ({ + default: vi.fn((fn) => fn()), +})); + describe("EnsDbWriterWorker", () => { beforeEach(() => { vi.useFakeTimers(); @@ -222,6 +226,144 @@ describe("EnsDbWriterWorker", () => { expect(upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(callCountBeforeStop); }); + it("calls pRetry for config fetch with retry logic", async () => { + // arrange - pRetry is mocked to call fn directly + const omnichainSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 100, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const snapshot = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 100, + snapshotTime: 200, + omnichainSnapshot, + } as CrossChainIndexingStatusSnapshot; + + vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(snapshot); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn().mockResolvedValue(omnichainSnapshot), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act + await worker.run(); + + // assert - config should be called once (pRetry is mocked) + expect(ensIndexerClient.config).toHaveBeenCalledTimes(1); + expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(publicConfig); + + // cleanup + worker.stop(); + }); + + it("fetches stored and in-memory configs concurrently", async () => { + // arrange + const storedConfig = { ...publicConfig, versionInfo: { ...publicConfig.versionInfo } }; + const inMemoryConfig = publicConfig; + + // Ensure validation passes for this test + vi.mocked(validateEnsIndexerPublicConfigCompatibility).mockImplementation(() => { + // validation passes + }); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(storedConfig), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(inMemoryConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act + await worker.run(); + + // assert - both should have been called (concurrent execution via Promise.all) + expect(ensDbClient.getEnsIndexerPublicConfig).toHaveBeenCalledTimes(1); + expect(ensIndexerClient.config).toHaveBeenCalledTimes(1); + + // cleanup + worker.stop(); + }); + + it("throws error when config fetch fails", async () => { + // arrange + const networkError = new Error("Network failure"); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockRejectedValue(networkError), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act & assert + await expect(worker.run()).rejects.toThrow("Network failure"); + + // should be called once (pRetry is mocked to call once) + expect(ensIndexerClient.config).toHaveBeenCalledTimes(1); + expect(ensDbClient.upsertEnsDbVersion).not.toHaveBeenCalled(); + }); + + it("throws error when stored config fetch fails", async () => { + // arrange + const dbError = new Error("Database connection lost"); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockRejectedValue(dbError), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act & assert + await expect(worker.run()).rejects.toThrow("Database connection lost"); + expect(ensDbClient.upsertEnsDbVersion).not.toHaveBeenCalled(); + }); + it("recovers from errors and continues upserting snapshots", async () => { // arrange const snapshot1 = { From 5a816732f07d3d31980b7a35dead0aad87277d6e Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 11:19:52 +0100 Subject: [PATCH 18/24] Update code docs as per PR feedback --- apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts index 8cfc6a5bc..cd29dfab6 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -9,8 +9,8 @@ let ensDbWriterWorker: EnsDbWriterWorker; /** * Starts the EnsDbWriterWorker in a new asynchronous context. * - * The worker will run indefinitely until its internal AbortSignal is triggered, - * for example due to a process termination signal or an internal error, at + * The worker will run indefinitely until it is stopped via {@link EnsDbWriterWorker.stop}, + * for example in response to a process termination signal or an internal error, at * which point it will attempt to gracefully shut down. * * @throws Error if the worker run method throws an error during execution. From 9dced073db64b2b899f60569bdb682c30c35d018 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 11:43:01 +0100 Subject: [PATCH 19/24] Apply AI PR feedback --- .../src/lib/ensdb-writer-worker/singleton.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts index cd29dfab6..94189b959 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -20,6 +20,12 @@ export function startEnsDbWriterWorker() { ensDbWriterWorker .run() + .then(() => { + // Once the worker has successfully kicked off, + // we can listen for termination signals to trigger a graceful shutdown + process.on("SIGINT", () => ensDbWriterWorker.stop()); + process.on("SIGTERM", () => ensDbWriterWorker.stop()); + }) // Handle any uncaught errors from the worker .catch((error) => { // Abort the worker on error to trigger cleanup @@ -31,8 +37,4 @@ export function startEnsDbWriterWorker() { process.exitCode = 1; throw error; }); - - // Handle graceful shutdown on process termination signals - process.on("SIGINT", () => ensDbWriterWorker.stop()); - process.on("SIGTERM", () => ensDbWriterWorker.stop()); } From fef06d764d064b2b8a3f11bf1da3e96c6ecd766f Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 11:57:32 +0100 Subject: [PATCH 20/24] Apply AI PR feedback --- .../src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts index c97296eb7..9680c75f1 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts @@ -394,6 +394,7 @@ describe("EnsDbWriterWorker", () => { vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain) .mockReturnValueOnce(crossChainSnapshot1) + .mockReturnValueOnce(crossChainSnapshot2) .mockReturnValueOnce(crossChainSnapshot2); const ensDbClient = { From cacea45d566cdd3fbd45257952b50fccef30d552 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 12:04:30 +0100 Subject: [PATCH 21/24] Apply AI PR feedback --- .../ensdb-writer-worker/ensdb-writer-worker.test.ts | 12 ++++++------ .../src/lib/ensdb-writer-worker/singleton.ts | 12 +++++++++--- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts index 9680c75f1..27f3f2ff3 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts @@ -59,7 +59,7 @@ describe("EnsDbWriterWorker", () => { buildSnapshot.mockReturnValue(snapshot); const ensDbClient = { - getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), @@ -147,7 +147,7 @@ describe("EnsDbWriterWorker", () => { vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(crossChainSnapshot); const ensDbClient = { - getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), @@ -195,7 +195,7 @@ describe("EnsDbWriterWorker", () => { const upsertIndexingStatusSnapshot = vi.fn().mockResolvedValue(undefined); const ensDbClient = { - getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertIndexingStatusSnapshot, @@ -244,7 +244,7 @@ describe("EnsDbWriterWorker", () => { vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(snapshot); const ensDbClient = { - getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), @@ -314,7 +314,7 @@ describe("EnsDbWriterWorker", () => { const networkError = new Error("Network failure"); const ensDbClient = { - getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), @@ -398,7 +398,7 @@ describe("EnsDbWriterWorker", () => { .mockReturnValueOnce(crossChainSnapshot2); const ensDbClient = { - getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(null), + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), upsertIndexingStatusSnapshot: vi diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts index 94189b959..d764db7bf 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -13,18 +13,24 @@ let ensDbWriterWorker: EnsDbWriterWorker; * for example in response to a process termination signal or an internal error, at * which point it will attempt to gracefully shut down. * - * @throws Error if the worker run method throws an error during execution. + * @throws Error if the worker is already running when this function is called. */ export function startEnsDbWriterWorker() { + if (typeof ensDbWriterWorker !== "undefined") { + throw new Error("EnsDbWriterWorker is already running"); + } + ensDbWriterWorker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + const stopWorker = ensDbWriterWorker.stop.bind(ensDbWriterWorker); + ensDbWriterWorker .run() .then(() => { // Once the worker has successfully kicked off, // we can listen for termination signals to trigger a graceful shutdown - process.on("SIGINT", () => ensDbWriterWorker.stop()); - process.on("SIGTERM", () => ensDbWriterWorker.stop()); + process.once("SIGINT", stopWorker); + process.once("SIGTERM", stopWorker); }) // Handle any uncaught errors from the worker .catch((error) => { From 09d2b8e018ec6e1d7528042e787f581fdfbe2f69 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 12:15:12 +0100 Subject: [PATCH 22/24] Simplify startEnsDbWriterWorker --- apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts index d764db7bf..95a6e0aca 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -22,16 +22,8 @@ export function startEnsDbWriterWorker() { ensDbWriterWorker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); - const stopWorker = ensDbWriterWorker.stop.bind(ensDbWriterWorker); - ensDbWriterWorker .run() - .then(() => { - // Once the worker has successfully kicked off, - // we can listen for termination signals to trigger a graceful shutdown - process.once("SIGINT", stopWorker); - process.once("SIGTERM", stopWorker); - }) // Handle any uncaught errors from the worker .catch((error) => { // Abort the worker on error to trigger cleanup From cb70797d4da58a1f5bf4656e5cc60c71d129466b Mon Sep 17 00:00:00 2001 From: Tomek Kopacki Date: Tue, 3 Mar 2026 12:24:08 +0100 Subject: [PATCH 23/24] Update apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts index 95a6e0aca..e61e89dee 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -17,7 +17,7 @@ let ensDbWriterWorker: EnsDbWriterWorker; */ export function startEnsDbWriterWorker() { if (typeof ensDbWriterWorker !== "undefined") { - throw new Error("EnsDbWriterWorker is already running"); + throw new Error("EnsDbWriterWorker has already been initialized"); } ensDbWriterWorker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); From f608c312e62829aab3c0ec2e704986adea260082 Mon Sep 17 00:00:00 2001 From: Tomasz Kopacki Date: Tue, 3 Mar 2026 17:31:40 +0100 Subject: [PATCH 24/24] Apply PR feedback --- .../ponder/src/api/ensdb-writer-worker-entrypoint.ts | 9 --------- apps/ensindexer/ponder/src/api/index.ts | 9 ++++++++- .../src/lib/ensdb-writer-worker/ensdb-writer-worker.ts | 5 +++-- 3 files changed, 11 insertions(+), 12 deletions(-) delete mode 100644 apps/ensindexer/ponder/src/api/ensdb-writer-worker-entrypoint.ts diff --git a/apps/ensindexer/ponder/src/api/ensdb-writer-worker-entrypoint.ts b/apps/ensindexer/ponder/src/api/ensdb-writer-worker-entrypoint.ts deleted file mode 100644 index 3b831f80f..000000000 --- a/apps/ensindexer/ponder/src/api/ensdb-writer-worker-entrypoint.ts +++ /dev/null @@ -1,9 +0,0 @@ -// This file is the entry point for the ENSDb Writer Worker. -// It must be placed inside the `api` directory of the Ponder app to avoid -// the following build issue: -// Error: Invalid dependency graph. Config, schema, and indexing function files -// cannot import objects from the API function file "src/api/index.ts". - -import { startEnsDbWriterWorker } from "@/lib/ensdb-writer-worker/singleton"; - -startEnsDbWriterWorker(); diff --git a/apps/ensindexer/ponder/src/api/index.ts b/apps/ensindexer/ponder/src/api/index.ts index 7dc40bd71..68f28820f 100644 --- a/apps/ensindexer/ponder/src/api/index.ts +++ b/apps/ensindexer/ponder/src/api/index.ts @@ -5,8 +5,15 @@ import { cors } from "hono/cors"; import type { ErrorResponse } from "@ensnode/ensnode-sdk"; +import { startEnsDbWriterWorker } from "@/lib/ensdb-writer-worker/singleton"; + import ensNodeApi from "./handlers/ensnode-api"; -import "./ensdb-writer-worker-entrypoint"; + +// The entry point for the ENSDb Writer Worker. It must be placed inside +// the `api` directory of the Ponder app to avoid the following build issue: +// Error: Invalid dependency graph. Config, schema, and indexing function files +// cannot import objects from the API function file "src/api/index.ts". +startEnsDbWriterWorker(); const app = new Hono(); diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts index db05af1b4..2a29b3c0a 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts @@ -220,9 +220,7 @@ export class EnsDbWriterWorker { snapshotTime, ); - console.log(`[EnsDbWriterWorker]: Upserting Indexing Status Snapshot into ENSDb...`); await this.ensDbClient.upsertIndexingStatusSnapshot(crossChainSnapshot); - console.log(`[EnsDbWriterWorker]: Indexing Status Snapshot upserted successfully`); } catch (error) { console.error( `[EnsDbWriterWorker]: Error retrieving or validating Indexing Status Snapshot:`, @@ -242,6 +240,9 @@ export class EnsDbWriterWorker { private async getValidatedIndexingStatusSnapshot(): Promise { const omnichainSnapshot = await this.indexingStatusBuilder.getOmnichainIndexingStatusSnapshot(); + // It only makes sense to write Indexing Status Snapshots into ENSDb once + // the indexing process has started, as before that there is no meaningful + // status to record. // Invariant: the Omnichain Status must indicate that indexing has started already. if (omnichainSnapshot.omnichainStatus === OmnichainIndexingStatusIds.Unstarted) { throw new Error("Omnichain Status must not be 'Unstarted'.");