diff --git a/README.md b/README.md index 1761cdf..262f261 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ The goal is to mirror the functionality of the Python `dclimate-zarr-client` pac - **No blockchain dependencies** – all resolution through HTTP APIs and IPFS - **TypeScript support** with full type definitions - **Dual build targets** for Node.js and browser environments +- **OpenTelemetry hooks** for IPFS/Zarr open latency, status, and gateway attribution ## Installation @@ -238,6 +239,21 @@ const dataset = await client.loadDataset({ - **Gateway** – set `gatewayUrl` on the client constructor or per-call in `loadDataset` options. - **Direct CID access** – supply `cid` in options to skip catalog resolution and load directly from IPFS. +### OpenTelemetry + +The client emits OpenTelemetry API spans and metrics around IPFS/Zarr dataset opens. This is passive by default: no telemetry is exported unless the application configures an OpenTelemetry SDK/provider. + +Emitted names include: + +- Span `dclimate_client.ipfs.load_zarr_dataset` +- Span `dclimate_client.ipfs.open_jaxray_store` +- Counter `dclimate_client.ipfs.dataset_open.requests` +- Histogram `dclimate_client.ipfs.dataset_open.duration` +- Counter `dclimate_client.ipfs.store_open.requests` +- Histogram `dclimate_client.ipfs.store_open.duration` + +Metric attributes include the gateway URL, store type, and status. The dataset CID is only attached to the trace span to avoid high-cardinality metric labels. + ## API Reference ### DClimateClient diff --git a/package.json b/package.json index cb9b91b..7ec9c75 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@dclimate/dclimate-client-js", - "version": "0.5.5", + "version": "0.5.7", "description": "JavaScript client for dClimate datasets using jaxray and IPFS stores", "type": "module", "main": "./dist/node/index.js", @@ -49,7 +49,8 @@ }, "homepage": "https://github.com/dClimate/dclimate-client-js#readme", "dependencies": { - "@dclimate/jaxray": "^0.6.8" + "@dclimate/jaxray": "^0.6.8", + "@opentelemetry/api": "^1.9.1" }, "devDependencies": { "typescript": "^5.4.0", diff --git a/src/instrumentation.ts b/src/instrumentation.ts new file mode 100644 index 0000000..30e56d8 --- /dev/null +++ b/src/instrumentation.ts @@ -0,0 +1,146 @@ +import { + context, + metrics, + SpanStatusCode, + trace, + type Attributes, + type Span, +} from "@opentelemetry/api"; + +type AttributeValue = string | number | boolean; + +export type RetrievalStatus = "ok" | "error" | "connection_error"; + +const TRACER = trace.getTracer("dclimate_client_js.ipfs_retrieval"); +const METER = metrics.getMeter("dclimate_client_js.ipfs_retrieval"); + +const DATASET_OPEN_COUNTER = METER.createCounter( + "dclimate_client.ipfs.dataset_open.requests", + { + unit: "1", + description: "IPFS Zarr dataset open requests.", + } +); +const DATASET_OPEN_DURATION = METER.createHistogram( + "dclimate_client.ipfs.dataset_open.duration", + { + unit: "s", + description: "IPFS Zarr dataset open latency.", + } +); +const STORE_OPEN_COUNTER = METER.createCounter( + "dclimate_client.ipfs.store_open.requests", + { + unit: "1", + description: "IPFS Zarr store open attempts.", + } +); +const STORE_OPEN_DURATION = METER.createHistogram( + "dclimate_client.ipfs.store_open.duration", + { + unit: "s", + description: "IPFS Zarr store open attempt latency.", + } +); + +export function otelAttributes( + attributes: Record = {} +): Attributes { + const cleaned: Record = {}; + for (const [key, value] of Object.entries(attributes)) { + if (value === undefined || value === null) continue; + if ( + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" + ) { + cleaned[key] = value; + } else { + cleaned[key] = String(value); + } + } + return cleaned; +} + +export function recordSpanError(span: Span, error: unknown): void { + const message = error instanceof Error ? error.message : String(error); + span.recordException(error instanceof Error ? error : message); + span.setStatus({ code: SpanStatusCode.ERROR, message }); +} + +export async function withSpan( + name: string, + attributes: Record, + callback: (span: Span) => Promise +): Promise { + const span = TRACER.startSpan(name, { + attributes: otelAttributes(attributes), + }); + + return context.with(trace.setSpan(context.active(), span), async () => { + try { + return await callback(span); + } finally { + span.end(); + } + }); +} + +export function recordDatasetOpen({ + gatewayUrl, + storeType, + status, + seconds, +}: { + gatewayUrl: string; + storeType: string; + status: RetrievalStatus; + seconds: number; +}): void { + const attributes = otelAttributes({ + "dclimate_client.ipfs.gateway": gatewayUrl, + "dclimate_client.ipfs.store_type": storeType, + "dclimate_client.ipfs.status": status, + }); + DATASET_OPEN_COUNTER.add(1, attributes); + DATASET_OPEN_DURATION.record(seconds, attributes); +} + +export function recordStoreOpen({ + gatewayUrl, + storeType, + status, + seconds, +}: { + gatewayUrl: string; + storeType: string; + status: RetrievalStatus; + seconds: number; +}): void { + const attributes = otelAttributes({ + "dclimate_client.ipfs.gateway": gatewayUrl, + "dclimate_client.ipfs.store_type": storeType, + "dclimate_client.ipfs.status": status, + }); + STORE_OPEN_COUNTER.add(1, attributes); + STORE_OPEN_DURATION.record(seconds, attributes); +} + +export function secondsSince(startedAt: number): number { + return (performance.now() - startedAt) / 1000; +} + +export function classifyRetrievalError(error: unknown): RetrievalStatus { + const message = error instanceof Error ? error.message : String(error); + if ( + message.includes("Connection refused") || + message.includes("Max retries exceeded") || + message.includes("Timeout") || + message.includes("timed out") || + message.includes("ECONNREFUSED") || + message.includes("ETIMEDOUT") + ) { + return "connection_error"; + } + return "error"; +} diff --git a/src/ipfs/open-dataset.ts b/src/ipfs/open-dataset.ts index b4ae672..f4cdbc6 100644 --- a/src/ipfs/open-dataset.ts +++ b/src/ipfs/open-dataset.ts @@ -1,6 +1,15 @@ import { Dataset, openIpfsStore } from "@dclimate/jaxray"; import type { IPFSELEMENTS_INTERFACE } from "@dclimate/jaxray"; import { DEFAULT_IPFS_GATEWAY } from "../constants.js"; +import { + classifyRetrievalError, + recordDatasetOpen, + recordSpanError, + recordStoreOpen, + secondsSince, + withSpan, + type RetrievalStatus, +} from "../instrumentation.js"; export type IpfsElements = IPFSELEMENTS_INTERFACE; @@ -18,10 +27,69 @@ export async function openDatasetFromCid( } const gatewayUrl = options.gatewayUrl ?? DEFAULT_IPFS_GATEWAY; - const { store } = await openIpfsStore( - cid, - options.ipfsElements ?? { gatewayUrl } - ); + const storeType = "JaxrayIpfsStore"; + const datasetStartedAt = performance.now(); + let status: RetrievalStatus = "error"; + + return withSpan( + "dclimate_client.ipfs.load_zarr_dataset", + { + "dclimate_client.ipfs.cid": cid, + "dclimate_client.ipfs.gateway": gatewayUrl, + }, + async (datasetSpan) => { + try { + const { store } = await withSpan( + "dclimate_client.ipfs.open_jaxray_store", + { + "dclimate_client.ipfs.gateway": gatewayUrl, + "dclimate_client.ipfs.store_type": storeType, + }, + async (storeSpan) => { + const storeStartedAt = performance.now(); + try { + const openedStore = await openIpfsStore( + cid, + options.ipfsElements ?? { gatewayUrl } + ); + recordStoreOpen({ + gatewayUrl, + storeType, + status: "ok", + seconds: secondsSince(storeStartedAt), + }); + return openedStore; + } catch (error) { + const storeStatus = classifyRetrievalError(error); + recordStoreOpen({ + gatewayUrl, + storeType, + status: storeStatus, + seconds: secondsSince(storeStartedAt), + }); + recordSpanError(storeSpan, error); + throw error; + } + } + ); - return Dataset.open_zarr(store); + const dataset = await Dataset.open_zarr(store); + status = "ok"; + return dataset; + } catch (error) { + status = classifyRetrievalError(error); + recordSpanError(datasetSpan, error); + throw error; + } finally { + datasetSpan.setAttribute("dclimate_client.ipfs.store_type", storeType); + datasetSpan.setAttribute("dclimate_client.ipfs.status", status); + recordDatasetOpen({ + gatewayUrl, + storeType, + status, + seconds: secondsSince(datasetStartedAt), + }); + } + } + ); } diff --git a/tests/open-dataset.test.ts b/tests/open-dataset.test.ts new file mode 100644 index 0000000..67be562 --- /dev/null +++ b/tests/open-dataset.test.ts @@ -0,0 +1,85 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { DEFAULT_IPFS_GATEWAY } from "../src/constants.js"; +import { + classifyRetrievalError, + otelAttributes, +} from "../src/instrumentation.js"; +import { openDatasetFromCid } from "../src/ipfs/open-dataset.js"; + +const openIpfsStoreMock = vi.hoisted(() => vi.fn()); +const openZarrMock = vi.hoisted(() => vi.fn()); + +vi.mock("@dclimate/jaxray", () => ({ + Dataset: { + open_zarr: openZarrMock, + }, + openIpfsStore: openIpfsStoreMock, +})); + +describe("openDatasetFromCid", () => { + beforeEach(() => { + openIpfsStoreMock.mockReset(); + openZarrMock.mockReset(); + }); + + it("opens an IPFS store and Zarr dataset with default gateway telemetry enabled", async () => { + const store = { kind: "store" }; + const dataset = { kind: "dataset" }; + openIpfsStoreMock.mockResolvedValue({ store }); + openZarrMock.mockResolvedValue(dataset); + + await expect(openDatasetFromCid("bafytest")).resolves.toBe(dataset); + + expect(openIpfsStoreMock).toHaveBeenCalledWith("bafytest", { + gatewayUrl: DEFAULT_IPFS_GATEWAY, + }); + expect(openZarrMock).toHaveBeenCalledWith(store); + }); + + it("uses caller supplied IPFS elements", async () => { + const ipfsElements = { gatewayUrl: "https://example.invalid" }; + const store = { kind: "custom-store" }; + const dataset = { kind: "dataset" }; + openIpfsStoreMock.mockResolvedValue({ store }); + openZarrMock.mockResolvedValue(dataset); + + await openDatasetFromCid("bafycustom", { ipfsElements }); + + expect(openIpfsStoreMock).toHaveBeenCalledWith("bafycustom", ipfsElements); + }); + + it("preserves retrieval errors from the store opener", async () => { + const error = new Error("ETIMEDOUT while opening store"); + openIpfsStoreMock.mockRejectedValue(error); + + await expect(openDatasetFromCid("bafytimeout")).rejects.toThrow(error); + }); +}); + +describe("retrieval instrumentation helpers", () => { + it("keeps metric attributes primitive and bounded", () => { + expect( + otelAttributes({ + keep: "value", + count: 3, + enabled: true, + drop: undefined, + stringify: { nested: "value" }, + }) + ).toEqual({ + keep: "value", + count: 3, + enabled: true, + stringify: "[object Object]", + }); + }); + + it("classifies common gateway connection errors", () => { + expect(classifyRetrievalError(new Error("ETIMEDOUT"))).toBe( + "connection_error" + ); + expect(classifyRetrievalError(new Error("zarr metadata missing"))).toBe( + "error" + ); + }); +}); diff --git a/yarn.lock b/yarn.lock index 26deb5c..ad8c75a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -70,6 +70,11 @@ resolved "https://registry.yarnpkg.com/@noble/hashes/-/hashes-2.0.1.tgz#fc1a928061d1232b0a52bb754393c37a5216c89e" integrity sha512-XlOlEbQcE9fmuXxrVTXCTlG2nlRXa9Rj3rr5Ue/+tX+nmkgbX720YHh0VR3hBF9xDvwnb8D2shVGOwNx+ulArw== +"@opentelemetry/api@^1.9.1": + version "1.9.1" + resolved "https://registry.yarnpkg.com/@opentelemetry/api/-/api-1.9.1.tgz#c1b0346de336ba55af2d5a7970882037baedec05" + integrity sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q== + "@oxc-project/runtime@0.115.0": version "0.115.0" resolved "https://registry.npmjs.org/@oxc-project/runtime/-/runtime-0.115.0.tgz"