diff --git a/.changeset/afraid-ducks-strive.md b/.changeset/afraid-ducks-strive.md new file mode 100644 index 000000000..134592ec8 --- /dev/null +++ b/.changeset/afraid-ducks-strive.md @@ -0,0 +1,5 @@ +--- +"ensindexer": minor +--- + +Introduced `EnsDbClient` and `EnsDbWriterWorker` to enable storing metadata in ENSDb. diff --git a/apps/ensapi/package.json b/apps/ensapi/package.json index 8aefb096f..96453e26a 100644 --- a/apps/ensapi/package.json +++ b/apps/ensapi/package.json @@ -53,7 +53,7 @@ "graphql-yoga": "^5.16.0", "hono": "catalog:", "p-memoize": "^8.0.0", - "p-retry": "^7.1.0", + "p-retry": "catalog:", "pg-connection-string": "catalog:", "pino": "catalog:", "ponder-enrich-gql-docs-middleware": "^0.1.3", diff --git a/apps/ensindexer/package.json b/apps/ensindexer/package.json index f4d69e032..0afe00f24 100644 --- a/apps/ensindexer/package.json +++ b/apps/ensindexer/package.json @@ -29,11 +29,14 @@ "@ensnode/ensnode-sdk": "workspace:*", "@ensnode/ensrainbow-sdk": "workspace:*", "@ensnode/ponder-sdk": "workspace:*", + "@ponder/client": "catalog:", "caip": "catalog:", "date-fns": "catalog:", "deepmerge-ts": "^7.1.5", "dns-packet": "^5.6.1", + "drizzle-orm": "catalog:", "pg-connection-string": "catalog:", + "p-retry": "catalog:", "hono": "catalog:", "ponder": "catalog:", "viem": "catalog:", @@ -43,6 +46,7 @@ "@ensnode/shared-configs": "workspace:*", "@types/dns-packet": "^5.6.5", "@types/node": "catalog:", + "@types/pg": "8.16.0", "typescript": "catalog:", "vitest": "catalog:" } diff --git a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts index 0cb27f996..58c4b015d 100644 --- a/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts +++ b/apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts @@ -14,11 +14,9 @@ import { } from "@ensnode/ensnode-sdk"; import { buildENSIndexerPublicConfig } from "@/config/public"; -import { IndexingStatusBuilder } from "@/lib/indexing-status-builder"; -import { localPonderClient } from "@/lib/local-ponder-client"; +import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; const app = new Hono(); -const indexingStatusBuilder = new IndexingStatusBuilder(localPonderClient); // include ENSIndexer Public Config endpoint app.get("/config", async (c) => { diff --git a/apps/ensindexer/ponder/src/api/index.ts b/apps/ensindexer/ponder/src/api/index.ts index 70f0703f4..68f28820f 100644 --- a/apps/ensindexer/ponder/src/api/index.ts +++ b/apps/ensindexer/ponder/src/api/index.ts @@ -5,8 +5,16 @@ import { cors } from "hono/cors"; import type { ErrorResponse } from "@ensnode/ensnode-sdk"; +import { startEnsDbWriterWorker } from "@/lib/ensdb-writer-worker/singleton"; + import ensNodeApi from "./handlers/ensnode-api"; +// The entry point for the ENSDb Writer Worker. It must be placed inside +// the `api` directory of the Ponder app to avoid the following build issue: +// Error: Invalid dependency graph. Config, schema, and indexing function files +// cannot import objects from the API function file "src/api/index.ts". +startEnsDbWriterWorker(); + const app = new Hono(); // set the X-ENSIndexer-Version header to the current version diff --git a/apps/ensindexer/src/lib/ensdb-client/drizzle.ts b/apps/ensindexer/src/lib/ensdb-client/drizzle.ts new file mode 100644 index 000000000..b5cc9b5c7 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/drizzle.ts @@ -0,0 +1,26 @@ +// This file is based on `packages/ponder-subgraph/src/drizzle.ts` file. +// We currently duplicate the makeDrizzle function, as we don't have +// a shared package for backend code yet. When we do, we can move +// this function to the shared package and import it in both places. +import { setDatabaseSchema } from "@ponder/client"; +import { drizzle } from "drizzle-orm/node-postgres"; + +type Schema = { [name: string]: unknown }; + +/** + * Makes a Drizzle DB object. + */ +export const makeDrizzle = ({ + schema, + databaseUrl, + databaseSchema, +}: { + schema: SCHEMA; + databaseUrl: string; + databaseSchema: string; +}) => { + // monkeypatch schema onto tables + setDatabaseSchema(schema, databaseSchema); + + return drizzle(databaseUrl, { schema, casing: "snake_case" }); +}; diff --git a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.mock.ts b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.mock.ts new file mode 100644 index 000000000..4b7054aa6 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.mock.ts @@ -0,0 +1,66 @@ +import { + type BlockRef, + ChainIndexingStatusIds, + CrossChainIndexingStrategyIds, + type EnsIndexerPublicConfig, + OmnichainIndexingStatusIds, + PluginName, + RangeTypeIds, + type SerializedCrossChainIndexingStatusSnapshot, +} from "@ensnode/ensnode-sdk"; + +export const earlierBlockRef = { + timestamp: 1672531199, + number: 1024, +} as const satisfies BlockRef; + +export const laterBlockRef = { + timestamp: 1672531200, + number: 1025, +} as const satisfies BlockRef; + +export const databaseUrl = "postgres://user:pass@localhost:5432/ensdb"; + +export const databaseSchemaName = "public"; + +export const publicConfig = { + databaseSchemaName, + labelSet: { + labelSetId: "subgraph", + labelSetVersion: 0, + }, + indexedChainIds: new Set([1]), + isSubgraphCompatible: true, + namespace: "mainnet", + plugins: [PluginName.Subgraph], + versionInfo: { + nodejs: "v22.10.12", + ponder: "0.11.25", + ensDb: "0.32.0", + ensIndexer: "0.32.0", + ensNormalize: "1.11.1", + ensRainbow: "0.32.0", + ensRainbowSchema: 2, + }, +} satisfies EnsIndexerPublicConfig; + +export const serializedSnapshot = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: earlierBlockRef.timestamp, + snapshotTime: earlierBlockRef.timestamp + 20, + omnichainSnapshot: { + omnichainStatus: OmnichainIndexingStatusIds.Following, + chains: { + "1": { + chainStatus: ChainIndexingStatusIds.Following, + config: { + rangeType: RangeTypeIds.LeftBounded, + startBlock: earlierBlockRef, + }, + latestIndexedBlock: earlierBlockRef, + latestKnownBlock: laterBlockRef, + }, + }, + omnichainIndexingCursor: earlierBlockRef.timestamp, + }, +} satisfies SerializedCrossChainIndexingStatusSnapshot; diff --git a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.test.ts b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.test.ts new file mode 100644 index 000000000..d67bca8a9 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.test.ts @@ -0,0 +1,200 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { ensNodeMetadata } from "@ensnode/ensnode-schema"; +import { + deserializeCrossChainIndexingStatusSnapshot, + EnsNodeMetadataKeys, + serializeCrossChainIndexingStatusSnapshot, + serializeEnsIndexerPublicConfig, +} from "@ensnode/ensnode-sdk"; + +import { makeDrizzle } from "./drizzle"; +import { EnsDbClient } from "./ensdb-client"; +import * as ensDbClientMock from "./ensdb-client.mock"; + +// Mock the config module to prevent it from trying to load actual environment variables during tests +vi.mock("@/config", () => ({ default: {} })); + +// Mock the makeDrizzle function to return a mock database instance +vi.mock("./drizzle", () => ({ makeDrizzle: vi.fn() })); + +describe("EnsDbClient", () => { + // Mock database query results and methods + const selectResult = { current: [] as Array<{ value: unknown }> }; + const whereMock = vi.fn(async () => selectResult.current); + const fromMock = vi.fn(() => ({ where: whereMock })); + const selectMock = vi.fn(() => ({ from: fromMock })); + const onConflictDoUpdateMock = vi.fn(async () => undefined); + const valuesMock = vi.fn(() => ({ onConflictDoUpdate: onConflictDoUpdateMock })); + const insertMock = vi.fn(() => ({ values: valuesMock })); + const dbMock = { select: selectMock, insert: insertMock }; + + beforeEach(() => { + selectResult.current = []; + whereMock.mockClear(); + fromMock.mockClear(); + selectMock.mockClear(); + onConflictDoUpdateMock.mockClear(); + valuesMock.mockClear(); + insertMock.mockClear(); + vi.mocked(makeDrizzle).mockReturnValue(dbMock as unknown as ReturnType); + }); + + describe("getEnsDbVersion", () => { + it("returns undefined when no record exists", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsDbVersion()).resolves.toBeUndefined(); + + expect(selectMock).toHaveBeenCalledTimes(1); + expect(fromMock).toHaveBeenCalledWith(ensNodeMetadata); + }); + + it("returns value when one record exists", async () => { + // arrange + selectResult.current = [{ value: "0.1.0" }]; + + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsDbVersion()).resolves.toBe("0.1.0"); + }); + + // This scenario should be impossible due to the primary key constraint on + // the 'key' column of 'ensnode_metadata' table. + it("throws when multiple records exist", async () => { + // arrange + selectResult.current = [{ value: "0.1.0" }, { value: "0.1.1" }]; + + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsDbVersion()).rejects.toThrowError(/ensdb_version/i); + }); + }); + + describe("getEnsIndexerPublicConfig", () => { + it("returns undefined when no record exists", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsIndexerPublicConfig()).resolves.toBeUndefined(); + }); + + it("deserializes the stored config", async () => { + // arrange + const serializedConfig = serializeEnsIndexerPublicConfig(ensDbClientMock.publicConfig); + selectResult.current = [{ value: serializedConfig }]; + + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act & assert + await expect(client.getEnsIndexerPublicConfig()).resolves.toStrictEqual( + ensDbClientMock.publicConfig, + ); + }); + }); + + describe("getIndexingStatusSnapshot", () => { + it("deserializes the stored indexing status snapshot", async () => { + // arrange + selectResult.current = [{ value: ensDbClientMock.serializedSnapshot }]; + + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + const expected = deserializeCrossChainIndexingStatusSnapshot( + ensDbClientMock.serializedSnapshot, + ); + + // act & assert + await expect(client.getIndexingStatusSnapshot()).resolves.toStrictEqual(expected); + }); + }); + + describe("upsertEnsDbVersion", () => { + it("writes the database version metadata", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + + // act + await client.upsertEnsDbVersion("0.2.0"); + + // assert + expect(insertMock).toHaveBeenCalledWith(ensNodeMetadata); + expect(valuesMock).toHaveBeenCalledWith({ + key: EnsNodeMetadataKeys.EnsDbVersion, + value: "0.2.0", + }); + expect(onConflictDoUpdateMock).toHaveBeenCalledWith({ + target: ensNodeMetadata.key, + set: { value: "0.2.0" }, + }); + }); + }); + + describe("upsertEnsIndexerPublicConfig", () => { + it("serializes and writes the public config", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + const expectedValue = serializeEnsIndexerPublicConfig(ensDbClientMock.publicConfig); + + // act + await client.upsertEnsIndexerPublicConfig(ensDbClientMock.publicConfig); + + // assert + expect(valuesMock).toHaveBeenCalledWith({ + key: EnsNodeMetadataKeys.EnsIndexerPublicConfig, + value: expectedValue, + }); + }); + }); + + describe("upsertIndexingStatusSnapshot", () => { + it("serializes and writes the indexing status snapshot", async () => { + // arrange + const client = new EnsDbClient( + ensDbClientMock.databaseUrl, + ensDbClientMock.databaseSchemaName, + ); + const snapshot = deserializeCrossChainIndexingStatusSnapshot( + ensDbClientMock.serializedSnapshot, + ); + const expectedValue = serializeCrossChainIndexingStatusSnapshot(snapshot); + + // act + await client.upsertIndexingStatusSnapshot(snapshot); + + // assert + expect(valuesMock).toHaveBeenCalledWith({ + key: EnsNodeMetadataKeys.EnsIndexerIndexingStatus, + value: expectedValue, + }); + }); + }); +}); diff --git a/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts new file mode 100644 index 000000000..128cb87b2 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/ensdb-client.ts @@ -0,0 +1,190 @@ +import type { NodePgDatabase } from "drizzle-orm/node-postgres"; +import { eq } from "drizzle-orm/sql"; + +import { ensNodeMetadata } from "@ensnode/ensnode-schema"; +import { + type CrossChainIndexingStatusSnapshot, + deserializeCrossChainIndexingStatusSnapshot, + deserializeEnsIndexerPublicConfig, + type EnsDbClientMutation, + type EnsDbClientQuery, + type EnsIndexerPublicConfig, + EnsNodeMetadataKeys, + type SerializedEnsNodeMetadata, + type SerializedEnsNodeMetadataEnsDbVersion, + type SerializedEnsNodeMetadataEnsIndexerIndexingStatus, + type SerializedEnsNodeMetadataEnsIndexerPublicConfig, + serializeCrossChainIndexingStatusSnapshot, + serializeEnsIndexerPublicConfig, +} from "@ensnode/ensnode-sdk"; + +import { makeDrizzle } from "./drizzle"; + +/** + * ENSDb Client Schema + * + * Includes schema definitions for {@link EnsDbClient} queries and mutations. + */ +const schema = { + ensNodeMetadata, +}; + +/** + * Drizzle database + * + * Allows interacting with Postgres database for ENSDb, using Drizzle ORM. + */ +interface DrizzleDb extends NodePgDatabase {} + +/** + * ENSDb Client + * + * This client exists to provide an abstraction layer for interacting with ENSDb. + * It enables ENSIndexer and ENSApi to decouple from each other, and use + * ENSDb as the integration point between the two (via ENSDb Client). + * + * Enables querying and mutating ENSDb data, such as: + * - ENSDb version + * - ENSIndexer Public Config, and Indexing Status Snapshot and CrossChainIndexingStatusSnapshot. + */ +export class EnsDbClient implements EnsDbClientQuery, EnsDbClientMutation { + /** + * Drizzle database instance for ENSDb. + */ + private db: DrizzleDb; + + /** + * @param databaseUrl connection string for ENSDb Postgres database + * @param databaseSchemaName Postgres schema name for ENSDb tables + */ + constructor(databaseUrl: string, databaseSchemaName: string) { + this.db = makeDrizzle({ + databaseSchema: databaseSchemaName, + databaseUrl, + schema, + }); + } + + /** + * @inheritdoc + */ + async getEnsDbVersion(): Promise { + const record = await this.getEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsDbVersion, + }); + + return record; + } + + /** + * @inheritdoc + */ + async getEnsIndexerPublicConfig(): Promise { + const record = await this.getEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsIndexerPublicConfig, + }); + + if (!record) { + return undefined; + } + + return deserializeEnsIndexerPublicConfig(record); + } + + /** + * @inheritdoc + */ + async getIndexingStatusSnapshot(): Promise { + const record = await this.getEnsNodeMetadata( + { + key: EnsNodeMetadataKeys.EnsIndexerIndexingStatus, + }, + ); + + if (!record) { + return undefined; + } + + return deserializeCrossChainIndexingStatusSnapshot(record); + } + + /** + * @inheritdoc + */ + async upsertEnsDbVersion(ensDbVersion: string): Promise { + await this.upsertEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsDbVersion, + value: ensDbVersion, + }); + } + + /** + * @inheritdoc + */ + async upsertEnsIndexerPublicConfig( + ensIndexerPublicConfig: EnsIndexerPublicConfig, + ): Promise { + await this.upsertEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsIndexerPublicConfig, + value: serializeEnsIndexerPublicConfig(ensIndexerPublicConfig), + }); + } + + /** + * @inheritdoc + */ + async upsertIndexingStatusSnapshot( + indexingStatus: CrossChainIndexingStatusSnapshot, + ): Promise { + await this.upsertEnsNodeMetadata({ + key: EnsNodeMetadataKeys.EnsIndexerIndexingStatus, + value: serializeCrossChainIndexingStatusSnapshot(indexingStatus), + }); + } + + /** + * Get ENSNode metadata record + * + * @returns selected record in ENSDb. + * @throws when more than one matching metadata record is found + * (should be impossible given the PK constraint on 'key') + */ + private async getEnsNodeMetadata( + metadata: Pick, + ): Promise { + const result = await this.db + .select() + .from(ensNodeMetadata) + .where(eq(ensNodeMetadata.key, metadata.key)); + + if (result.length === 0) { + return undefined; + } + + if (result.length === 1 && result[0]) { + return result[0].value as EnsNodeMetadataType["value"]; + } + + throw new Error(`There must be exactly one ENSNodeMetadata record for '${metadata.key}' key`); + } + + /** + * Upsert ENSNode metadata + * + * @throws when upsert operation failed. + */ + private async upsertEnsNodeMetadata< + EnsNodeMetadataType extends SerializedEnsNodeMetadata = SerializedEnsNodeMetadata, + >(metadata: EnsNodeMetadataType): Promise { + await this.db + .insert(ensNodeMetadata) + .values({ + key: metadata.key, + value: metadata.value, + }) + .onConflictDoUpdate({ + target: ensNodeMetadata.key, + set: { value: metadata.value }, + }); + } +} diff --git a/apps/ensindexer/src/lib/ensdb-client/singleton.ts b/apps/ensindexer/src/lib/ensdb-client/singleton.ts new file mode 100644 index 000000000..3ab2225fd --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-client/singleton.ts @@ -0,0 +1,8 @@ +import config from "@/config"; + +import { EnsDbClient } from "./ensdb-client"; + +/** + * Singleton instance of {@link EnsDbClient} for use in ENSIndexer. + */ +export const ensDbClient = new EnsDbClient(config.databaseUrl, config.databaseSchemaName); diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts new file mode 100644 index 000000000..27f3f2ff3 --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts @@ -0,0 +1,443 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { + buildCrossChainIndexingStatusSnapshotOmnichain, + type CrossChainIndexingStatusSnapshot, + CrossChainIndexingStrategyIds, + type EnsIndexerClient, + type EnsIndexerPublicConfig, + OmnichainIndexingStatusIds, + type OmnichainIndexingStatusSnapshot, + validateEnsIndexerPublicConfigCompatibility, +} from "@ensnode/ensnode-sdk"; + +import type { EnsDbClient } from "@/lib/ensdb-client/ensdb-client"; +import { publicConfig } from "@/lib/ensdb-client/ensdb-client.mock"; +import { EnsDbWriterWorker } from "@/lib/ensdb-writer-worker/ensdb-writer-worker"; +import type { IndexingStatusBuilder } from "@/lib/indexing-status-builder/indexing-status-builder"; + +vi.mock("@ensnode/ensnode-sdk", async () => { + const actual = await vi.importActual("@ensnode/ensnode-sdk"); + + return { + ...actual, + validateEnsIndexerPublicConfigCompatibility: vi.fn(), + buildCrossChainIndexingStatusSnapshotOmnichain: vi.fn(), + }; +}); + +vi.mock("p-retry", () => ({ + default: vi.fn((fn) => fn()), +})); + +describe("EnsDbWriterWorker", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + it("upserts version, config, and starts interval for indexing status snapshots", async () => { + // arrange + const omnichainSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 100, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const snapshot = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 100, + snapshotTime: 200, + omnichainSnapshot, + } as CrossChainIndexingStatusSnapshot; + + const buildSnapshot = vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain); + buildSnapshot.mockReturnValue(snapshot); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn().mockResolvedValue(omnichainSnapshot), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act - run() returns immediately after setting up interval + await worker.run(); + + // assert - verify initial upserts happened + expect(ensDbClient.upsertEnsDbVersion).toHaveBeenCalledWith(publicConfig.versionInfo.ensDb); + expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(publicConfig); + + // advance time to trigger interval + await vi.advanceTimersByTimeAsync(1000); + + // assert - snapshot should be upserted + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(snapshot); + expect(buildSnapshot).toHaveBeenCalledWith(omnichainSnapshot, expect.any(Number)); + + // cleanup + worker.stop(); + }); + + it("throws when stored config is incompatible", async () => { + // arrange + const incompatibleError = new Error("incompatible"); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(publicConfig), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig as EnsIndexerPublicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + vi.mocked(validateEnsIndexerPublicConfigCompatibility).mockImplementation(() => { + throw incompatibleError; + }); + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act + await expect(worker.run()).rejects.toThrow("incompatible"); + + // assert + expect(ensDbClient.upsertEnsDbVersion).not.toHaveBeenCalled(); + }); + + it("continues upserting after snapshot validation errors", async () => { + // arrange + const unstartedSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Unstarted, + } as OmnichainIndexingStatusSnapshot; + + const validSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 200, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const crossChainSnapshot = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 200, + snapshotTime: 300, + omnichainSnapshot: validSnapshot, + } as CrossChainIndexingStatusSnapshot; + + vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(crossChainSnapshot); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi + .fn() + .mockResolvedValueOnce(unstartedSnapshot) + .mockResolvedValueOnce(validSnapshot), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act - run returns immediately + await worker.run(); + + // first interval tick - should error but not throw + await vi.advanceTimersByTimeAsync(1000); + + // second interval tick - should succeed + await vi.advanceTimersByTimeAsync(1000); + + // assert + expect(indexingStatusBuilder.getOmnichainIndexingStatusSnapshot).toHaveBeenCalledTimes(2); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(1); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(crossChainSnapshot); + + // cleanup + worker.stop(); + }); + + it("stops the interval when stop() is called", async () => { + // arrange + const omnichainSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 100, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const upsertIndexingStatusSnapshot = vi.fn().mockResolvedValue(undefined); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot, + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn().mockResolvedValue(omnichainSnapshot), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act + await worker.run(); + await vi.advanceTimersByTimeAsync(1000); + + const callCountBeforeStop = upsertIndexingStatusSnapshot.mock.calls.length; + + worker.stop(); + + // advance time after stop + await vi.advanceTimersByTimeAsync(2000); + + // assert - no more calls after stop + expect(upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(callCountBeforeStop); + }); + + it("calls pRetry for config fetch with retry logic", async () => { + // arrange - pRetry is mocked to call fn directly + const omnichainSnapshot = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 100, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const snapshot = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 100, + snapshotTime: 200, + omnichainSnapshot, + } as CrossChainIndexingStatusSnapshot; + + vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(snapshot); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn().mockResolvedValue(omnichainSnapshot), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act + await worker.run(); + + // assert - config should be called once (pRetry is mocked) + expect(ensIndexerClient.config).toHaveBeenCalledTimes(1); + expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(publicConfig); + + // cleanup + worker.stop(); + }); + + it("fetches stored and in-memory configs concurrently", async () => { + // arrange + const storedConfig = { ...publicConfig, versionInfo: { ...publicConfig.versionInfo } }; + const inMemoryConfig = publicConfig; + + // Ensure validation passes for this test + vi.mocked(validateEnsIndexerPublicConfigCompatibility).mockImplementation(() => { + // validation passes + }); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(storedConfig), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(inMemoryConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act + await worker.run(); + + // assert - both should have been called (concurrent execution via Promise.all) + expect(ensDbClient.getEnsIndexerPublicConfig).toHaveBeenCalledTimes(1); + expect(ensIndexerClient.config).toHaveBeenCalledTimes(1); + + // cleanup + worker.stop(); + }); + + it("throws error when config fetch fails", async () => { + // arrange + const networkError = new Error("Network failure"); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockRejectedValue(networkError), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act & assert + await expect(worker.run()).rejects.toThrow("Network failure"); + + // should be called once (pRetry is mocked to call once) + expect(ensIndexerClient.config).toHaveBeenCalledTimes(1); + expect(ensDbClient.upsertEnsDbVersion).not.toHaveBeenCalled(); + }); + + it("throws error when stored config fetch fails", async () => { + // arrange + const dbError = new Error("Database connection lost"); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockRejectedValue(dbError), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi.fn().mockResolvedValue(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi.fn(), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act & assert + await expect(worker.run()).rejects.toThrow("Database connection lost"); + expect(ensDbClient.upsertEnsDbVersion).not.toHaveBeenCalled(); + }); + + it("recovers from errors and continues upserting snapshots", async () => { + // arrange + const snapshot1 = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 100, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const snapshot2 = { + omnichainStatus: OmnichainIndexingStatusIds.Following, + omnichainIndexingCursor: 200, + chains: {}, + } as OmnichainIndexingStatusSnapshot; + + const crossChainSnapshot1 = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 100, + snapshotTime: 1000, + omnichainSnapshot: snapshot1, + } as CrossChainIndexingStatusSnapshot; + + const crossChainSnapshot2 = { + strategy: CrossChainIndexingStrategyIds.Omnichain, + slowestChainIndexingCursor: 200, + snapshotTime: 2000, + omnichainSnapshot: snapshot2, + } as CrossChainIndexingStatusSnapshot; + + vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain) + .mockReturnValueOnce(crossChainSnapshot1) + .mockReturnValueOnce(crossChainSnapshot2) + .mockReturnValueOnce(crossChainSnapshot2); + + const ensDbClient = { + getEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertEnsDbVersion: vi.fn().mockResolvedValue(undefined), + upsertEnsIndexerPublicConfig: vi.fn().mockResolvedValue(undefined), + upsertIndexingStatusSnapshot: vi + .fn() + .mockResolvedValueOnce(undefined) + .mockRejectedValueOnce(new Error("DB error")) + .mockResolvedValueOnce(undefined), + } as unknown as EnsDbClient; + + const ensIndexerClient = { + config: vi.fn().mockResolvedValue(publicConfig), + } as unknown as EnsIndexerClient; + + const indexingStatusBuilder = { + getOmnichainIndexingStatusSnapshot: vi + .fn() + .mockResolvedValueOnce(snapshot1) + .mockResolvedValueOnce(snapshot2) + .mockResolvedValueOnce(snapshot2), + } as unknown as IndexingStatusBuilder; + + const worker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + // act + await worker.run(); + + // first tick - succeeds + await vi.advanceTimersByTimeAsync(1000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(crossChainSnapshot1); + + // second tick - fails with DB error, but continues + await vi.advanceTimersByTimeAsync(1000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenLastCalledWith(crossChainSnapshot2); + + // third tick - succeeds again + await vi.advanceTimersByTimeAsync(1000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(3); + + // cleanup + worker.stop(); + }); +}); diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts new file mode 100644 index 000000000..2a29b3c0a --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts @@ -0,0 +1,253 @@ +import { getUnixTime, secondsToMilliseconds } from "date-fns"; +import pRetry from "p-retry"; + +import { + buildCrossChainIndexingStatusSnapshotOmnichain, + type CrossChainIndexingStatusSnapshot, + type Duration, + type EnsIndexerClient, + type EnsIndexerPublicConfig, + OmnichainIndexingStatusIds, + type OmnichainIndexingStatusSnapshot, + validateEnsIndexerPublicConfigCompatibility, +} from "@ensnode/ensnode-sdk"; + +import type { EnsDbClient } from "@/lib/ensdb-client/ensdb-client"; +import type { IndexingStatusBuilder } from "@/lib/indexing-status-builder/indexing-status-builder"; + +/** + * Interval in seconds between two consecutive attempts to upsert + * the Indexing Status Snapshot record into ENSDb. + */ +const INDEXING_STATUS_RECORD_UPDATE_INTERVAL: Duration = 1; + +/** + * ENSDb Writer Worker + * + * A worker responsible for writing ENSIndexer-related metadata into ENSDb, including: + * - ENSDb version + * - ENSIndexer Public Config + * - ENSIndexer Indexing Status Snapshots + */ +export class EnsDbWriterWorker { + /** + * Interval for recurring upserts of Indexing Status Snapshots into ENSDb. + */ + private indexingStatusInterval: ReturnType | null = null; + + /** + * ENSDb Client instance used by the worker to interact with ENSDb. + */ + private ensDbClient: EnsDbClient; + + /** + * ENSIndexer Client instance used by the worker to read ENSIndexer Public Config. + */ + private ensIndexerClient: EnsIndexerClient; + + /** + * Indexing Status Builder instance used by the worker to read ENSIndexer Indexing Status. + */ + private indexingStatusBuilder: IndexingStatusBuilder; + + /** + * @param ensDbClient ENSDb Client instance used by the worker to interact with ENSDb. + * @param ensIndexerClient ENSIndexer Client instance used by the worker to read ENSIndexer Public Config. + * @param indexingStatusBuilder Indexing Status Builder instance used by the worker to read ENSIndexer Indexing Status. + */ + constructor( + ensDbClient: EnsDbClient, + ensIndexerClient: EnsIndexerClient, + indexingStatusBuilder: IndexingStatusBuilder, + ) { + this.ensDbClient = ensDbClient; + this.ensIndexerClient = ensIndexerClient; + this.indexingStatusBuilder = indexingStatusBuilder; + } + + /** + * Run the ENSDb Writer Worker + * + * The worker performs the following tasks: + * 1) A single attempt to upsert ENSDb version into ENSDb. + * 2) A single attempt to upsert serialized representation of + * {@link EnsIndexerPublicConfig} into ENSDb. + * 3) A recurring attempt to upsert serialized representation of + * {@link CrossChainIndexingStatusSnapshot} into ENSDb. + * + * @throws Error if the worker is already running, or + * if the in-memory ENSIndexer Public Config could not be fetched, or + * if the in-memory ENSIndexer Public Config is incompatible with the stored config in ENSDb. + */ + public async run(): Promise { + // Do not allow multiple concurrent runs of the worker + if (this.isRunning) { + throw new Error("EnsDbWriterWorker is already running"); + } + + // Fetch data required for task 1 and task 2. + const inMemoryConfig = await this.getValidatedEnsIndexerPublicConfig(); + + // Task 1: upsert ENSDb version into ENSDb. + console.log(`[EnsDbWriterWorker]: Upserting ENSDb version into ENSDb...`); + await this.ensDbClient.upsertEnsDbVersion(inMemoryConfig.versionInfo.ensDb); + console.log( + `[EnsDbWriterWorker]: ENSDb version upserted successfully: ${inMemoryConfig.versionInfo.ensDb}`, + ); + + // Task 2: upsert of EnsIndexerPublicConfig into ENSDb. + console.log(`[EnsDbWriterWorker]: Upserting ENSIndexer Public Config into ENSDb...`); + await this.ensDbClient.upsertEnsIndexerPublicConfig(inMemoryConfig); + console.log(`[EnsDbWriterWorker]: ENSIndexer Public Config upserted successfully`); + + // Task 3: recurring upsert of Indexing Status Snapshot into ENSDb. + this.indexingStatusInterval = setInterval( + () => this.upsertIndexingStatusSnapshot(), + secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL), + ); + } + + /** + * Indicates whether the ENSDb Writer Worker is currently running. + */ + get isRunning(): boolean { + return this.indexingStatusInterval !== null; + } + + /** + * Stop the ENSDb Writer Worker + * + * Stops all recurring tasks in the worker. + */ + public stop(): void { + if (this.indexingStatusInterval) { + clearInterval(this.indexingStatusInterval); + this.indexingStatusInterval = null; + } + } + + /** + * Get validated ENSIndexer Public Config object for the ENSDb Writer Worker. + * + * The function retrieves the ENSIndexer Public Config object from both: + * - stored config in ENSDb, if available, and + * - in-memory config from ENSIndexer Client. + * + * If, and only if, a stored config is available in ENSDb, then the function + * validates the compatibility of the in-memory config object against + * the stored one. Validation criteria are defined in the function body. + * + * @returns In-memory config object, if the validation is successful or + * if there is no stored config. + * @throws Error if the in-memory config object cannot be fetched or, + * got fetched and is incompatible with the stored config object. + */ + private async getValidatedEnsIndexerPublicConfig(): Promise { + /** + * Fetch the in-memory config with retries, to handle potential transient errors + * in the ENSIndexer Client (e.g. due to network issues). If the fetch fails after + * the defined number of retries, the error will be thrown and the worker will not start, + * as the ENSIndexer Public Config is a critical dependency for the worker's tasks. + */ + const inMemoryConfigPromise = pRetry(() => this.ensIndexerClient.config(), { + retries: 3, + onFailedAttempt: ({ error, attemptNumber, retriesLeft }) => { + console.warn( + `ENSIndexer Config fetch attempt ${attemptNumber} failed (${error.message}). ${retriesLeft} retries left.`, + ); + }, + }); + + let storedConfig: EnsIndexerPublicConfig | undefined; + let inMemoryConfig: EnsIndexerPublicConfig; + + try { + [storedConfig, inMemoryConfig] = await Promise.all([ + this.ensDbClient.getEnsIndexerPublicConfig(), + inMemoryConfigPromise, + ]); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "Unknown error"; + + console.error( + `[EnsDbWriterWorker]: Failed to fetch ENSIndexer Public Config: ${errorMessage}`, + ); + + // Throw the error to terminate the ENSIndexer process due to failed fetch of critical dependency + throw new Error(errorMessage, { + cause: error, + }); + } + + // Validate in-memory config object compatibility with the stored one, + // if the stored one is available + if (storedConfig) { + try { + validateEnsIndexerPublicConfigCompatibility(storedConfig, inMemoryConfig); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "Unknown error"; + + console.error( + `[EnsDbWriterWorker]: In-memory ENSIndexer Public Config object is not compatible with its counterpart stored in ENSDb. Cause: ${errorMessage}`, + ); + + // Throw the error to terminate the ENSIndexer process due to + // found config incompatibility + throw new Error(errorMessage, { + cause: error, + }); + } + } + + return inMemoryConfig; + } + + /** + * Upsert the current Indexing Status Snapshot into ENSDb. + * + * This method is called by the scheduler at regular intervals. + * Errors are logged but not thrown, to keep the worker running. + */ + private async upsertIndexingStatusSnapshot(): Promise { + try { + // get system timestamp for the current iteration + const snapshotTime = getUnixTime(new Date()); + + const omnichainSnapshot = await this.getValidatedIndexingStatusSnapshot(); + + const crossChainSnapshot = buildCrossChainIndexingStatusSnapshotOmnichain( + omnichainSnapshot, + snapshotTime, + ); + + await this.ensDbClient.upsertIndexingStatusSnapshot(crossChainSnapshot); + } catch (error) { + console.error( + `[EnsDbWriterWorker]: Error retrieving or validating Indexing Status Snapshot:`, + error, + ); + // Do not throw the error, as failure to retrieve the Indexing Status + // should not cause the ENSDb Writer Worker to stop functioning. + } + } + + /** + * Get validated Omnichain Indexing Status Snapshot + * + * @returns Validated Omnichain Indexing Status Snapshot. + * @throws Error if the Omnichain Indexing Status is not in expected status yet. + */ + private async getValidatedIndexingStatusSnapshot(): Promise { + const omnichainSnapshot = await this.indexingStatusBuilder.getOmnichainIndexingStatusSnapshot(); + + // It only makes sense to write Indexing Status Snapshots into ENSDb once + // the indexing process has started, as before that there is no meaningful + // status to record. + // Invariant: the Omnichain Status must indicate that indexing has started already. + if (omnichainSnapshot.omnichainStatus === OmnichainIndexingStatusIds.Unstarted) { + throw new Error("Omnichain Status must not be 'Unstarted'."); + } + + return omnichainSnapshot; + } +} diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts new file mode 100644 index 000000000..e61e89dee --- /dev/null +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -0,0 +1,38 @@ +import { ensDbClient } from "@/lib/ensdb-client/singleton"; +import { ensIndexerClient } from "@/lib/ensindexer-client/singleton"; +import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; + +import { EnsDbWriterWorker } from "./ensdb-writer-worker"; + +let ensDbWriterWorker: EnsDbWriterWorker; + +/** + * Starts the EnsDbWriterWorker in a new asynchronous context. + * + * The worker will run indefinitely until it is stopped via {@link EnsDbWriterWorker.stop}, + * for example in response to a process termination signal or an internal error, at + * which point it will attempt to gracefully shut down. + * + * @throws Error if the worker is already running when this function is called. + */ +export function startEnsDbWriterWorker() { + if (typeof ensDbWriterWorker !== "undefined") { + throw new Error("EnsDbWriterWorker has already been initialized"); + } + + ensDbWriterWorker = new EnsDbWriterWorker(ensDbClient, ensIndexerClient, indexingStatusBuilder); + + ensDbWriterWorker + .run() + // Handle any uncaught errors from the worker + .catch((error) => { + // Abort the worker on error to trigger cleanup + ensDbWriterWorker.stop(); + + console.error("EnsDbWriterWorker encountered an error:", error); + + // Re-throw the error to ensure the application shuts down with a non-zero exit code. + process.exitCode = 1; + throw error; + }); +} diff --git a/apps/ensindexer/src/lib/ensindexer-client/singleton.ts b/apps/ensindexer/src/lib/ensindexer-client/singleton.ts new file mode 100644 index 000000000..47c09b9ff --- /dev/null +++ b/apps/ensindexer/src/lib/ensindexer-client/singleton.ts @@ -0,0 +1,5 @@ +import config from "@/config"; + +import { EnsIndexerClient } from "@ensnode/ensnode-sdk"; + +export const ensIndexerClient = new EnsIndexerClient({ url: config.ensIndexerUrl }); diff --git a/apps/ensindexer/src/lib/indexing-status-builder/singleton.ts b/apps/ensindexer/src/lib/indexing-status-builder/singleton.ts new file mode 100644 index 000000000..d5d1f26dd --- /dev/null +++ b/apps/ensindexer/src/lib/indexing-status-builder/singleton.ts @@ -0,0 +1,5 @@ +import { localPonderClient } from "@/lib/local-ponder-client"; + +import { IndexingStatusBuilder } from "./indexing-status-builder"; + +export const indexingStatusBuilder = new IndexingStatusBuilder(localPonderClient); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e00ed1c57..fd89bd1ae 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -51,6 +51,9 @@ catalogs: lucide-react: specifier: ^0.548.0 version: 0.548.0 + p-retry: + specifier: 7.1.1 + version: 7.1.1 pg-connection-string: specifier: ^2.9.1 version: 2.9.1 @@ -359,7 +362,7 @@ importers: version: 1.37.0 '@ponder/client': specifier: 'catalog:' - version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) + version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) '@ponder/utils': specifier: 'catalog:' version: 0.2.16(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6)) @@ -383,7 +386,7 @@ importers: version: 4.1.0 drizzle-orm: specifier: 'catalog:' - version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) graphql: specifier: ^16.11.0 version: 16.11.0 @@ -397,8 +400,8 @@ importers: specifier: ^8.0.0 version: 8.0.0 p-retry: - specifier: ^7.1.0 - version: 7.1.0 + specifier: 'catalog:' + version: 7.1.1 pg-connection-string: specifier: 'catalog:' version: 2.9.1 @@ -469,6 +472,9 @@ importers: '@ensnode/ponder-sdk': specifier: workspace:* version: link:../../packages/ponder-sdk + '@ponder/client': + specifier: 'catalog:' + version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) caip: specifier: 'catalog:' version: 1.1.1 @@ -481,15 +487,21 @@ importers: dns-packet: specifier: ^5.6.1 version: 5.6.1 + drizzle-orm: + specifier: 'catalog:' + version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) hono: specifier: 'catalog:' version: 4.12.2 + p-retry: + specifier: 'catalog:' + version: 7.1.1 pg-connection-string: specifier: 'catalog:' version: 2.9.1 ponder: specifier: 'catalog:' - version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6))(zod@4.3.6) + version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6))(zod@4.3.6) viem: specifier: 'catalog:' version: 2.38.5(typescript@5.9.3)(zod@4.3.6) @@ -506,6 +518,9 @@ importers: '@types/node': specifier: 'catalog:' version: 24.10.9 + '@types/pg': + specifier: 8.16.0 + version: 8.16.0 typescript: specifier: 'catalog:' version: 5.9.3 @@ -850,7 +865,7 @@ importers: dependencies: ponder: specifier: 'catalog:' - version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76) + version: 0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76) viem: specifier: 'catalog:' version: 2.38.5(typescript@5.9.3)(zod@3.25.76) @@ -1065,13 +1080,13 @@ importers: version: 2.5.1 '@ponder/client': specifier: 'catalog:' - version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) + version: 0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3) dataloader: specifier: ^2.2.3 version: 2.2.3 drizzle-orm: specifier: 0.41.0 - version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + version: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) graphql: specifier: ^16.10.0 version: 16.11.0 @@ -4220,6 +4235,9 @@ packages: '@types/node@24.10.9': resolution: {integrity: sha512-ne4A0IpG3+2ETuREInjPNhUGis1SFjv1d5asp8MzEAGtOZeTeHVDOYqOgqfhvseqg/iXty2hjBf1zAOb7RNiNw==} + '@types/pg@8.16.0': + resolution: {integrity: sha512-RmhMd/wD+CF8Dfo+cVIy3RR5cl8CyfXQ0tGgW6XBL8L4LM/UTEbNXYRbLwU6w+CgrKBNbrQWt4FUtTfaU5jSYQ==} + '@types/prismjs@1.26.6': resolution: {integrity: sha512-vqlvI7qlMvcCBbVe0AKAb4f97//Hy0EBTaiW8AalRnG/xAN5zOiWWyrNqNXeq8+KAuvRewjCVY1+IPxk4RdNYw==} @@ -6949,8 +6967,8 @@ packages: resolution: {integrity: sha512-aNZ+VfjobsWryoiPnEApGGmf5WmNsCo9xu8dfaYamG5qaLP7ClhLN6NgsFe6SwJ2UbLEBK5dv9x8Mn5+RVhMWQ==} engines: {node: '>=18'} - p-retry@7.1.0: - resolution: {integrity: sha512-xL4PiFRQa/f9L9ZvR4/gUCRNus4N8YX80ku8kv9Jqz+ZokkiZLM0bcvX0gm1F3PDi9SPRsww1BDsTWgE6Y1GLQ==} + p-retry@7.1.1: + resolution: {integrity: sha512-J5ApzjyRkkf601HpEeykoiCvzHQjWxPAHhyjFcEUP2SWq0+35NKh8TLhpLw+Dkq5TZBFvUM6UigdE9hIVYTl5w==} engines: {node: '>=20'} p-timeout@3.2.0: @@ -11238,9 +11256,9 @@ snapshots: '@pkgjs/parseargs@0.11.0': optional: true - '@ponder/client@0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3)': + '@ponder/client@0.16.1(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3)(typescript@5.9.3)': dependencies: - drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) eventsource: 3.0.7 superjson: 2.2.6 optionalDependencies: @@ -12680,6 +12698,12 @@ snapshots: dependencies: undici-types: 7.16.0 + '@types/pg@8.16.0': + dependencies: + '@types/node': 24.10.9 + pg-protocol: 1.10.3 + pg-types: 2.2.0 + '@types/prismjs@1.26.6': {} '@types/progress@2.0.7': @@ -13971,10 +13995,11 @@ snapshots: dotenv@8.6.0: {} - drizzle-orm@0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3): + drizzle-orm@0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3): optionalDependencies: '@electric-sql/pglite': 0.2.13 '@opentelemetry/api': 1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a) + '@types/pg': 8.16.0 kysely: 0.26.3 pg: 8.16.3 @@ -15906,7 +15931,7 @@ snapshots: eventemitter3: 5.0.1 p-timeout: 6.1.4 - p-retry@7.1.0: + p-retry@7.1.1: dependencies: is-network-error: 1.3.0 @@ -16150,7 +16175,7 @@ snapshots: graphql: 16.11.0 hono: 4.12.2 - ponder@0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76): + ponder@0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@3.25.76))(zod@3.25.76): dependencies: '@babel/code-frame': 7.27.1 '@commander-js/extra-typings': 12.1.0(commander@12.1.0) @@ -16167,7 +16192,7 @@ snapshots: dataloader: 2.2.3 detect-package-manager: 3.0.2 dotenv: 16.6.1 - drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) glob: 10.5.0 graphql: 16.8.2 graphql-yoga: 5.17.1(graphql@16.8.2) @@ -16232,7 +16257,7 @@ snapshots: - utf-8-validate - zod - ponder@0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6))(zod@4.3.6): + ponder@0.16.2(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/node@24.10.9)(@types/pg@8.16.0)(hono@4.12.2)(lightningcss@1.30.2)(typescript@5.9.3)(viem@2.38.5(typescript@5.9.3)(zod@4.3.6))(zod@4.3.6): dependencies: '@babel/code-frame': 7.27.1 '@commander-js/extra-typings': 12.1.0(commander@12.1.0) @@ -16249,7 +16274,7 @@ snapshots: dataloader: 2.2.3 detect-package-manager: 3.0.2 dotenv: 16.6.1 - drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(kysely@0.26.3)(pg@8.16.3) + drizzle-orm: 0.41.0(@electric-sql/pglite@0.2.13)(@opentelemetry/api@1.9.0(patch_hash=4b2adeefaf7c22f9987d0a125d69cab900719bec7ed7636648bea6947107033a))(@types/pg@8.16.0)(kysely@0.26.3)(pg@8.16.3) glob: 10.5.0 graphql: 16.8.2 graphql-yoga: 5.17.1(graphql@16.8.2) diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index b1921e920..cad9881e8 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -23,6 +23,7 @@ catalog: pg-connection-string: ^2.9.1 pino: 10.1.0 ponder: 0.16.2 + p-retry: 7.1.1 tailwindcss: ^4.1.18 tailwindcss-animate: ^1.0.7 tailwind-merge: ^3.4.0