Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The goal is to mirror the functionality of the Python `dclimate-zarr-client` pac
- **No blockchain dependencies** – all resolution through HTTP APIs and IPFS
- **TypeScript support** with full type definitions
- **Dual build targets** for Node.js and browser environments
- **OpenTelemetry hooks** for IPFS/Zarr open latency, status, and gateway attribution

## Installation

Expand Down Expand Up @@ -238,6 +239,21 @@ const dataset = await client.loadDataset({
- **Gateway** – set `gatewayUrl` on the client constructor or per-call in `loadDataset` options.
- **Direct CID access** – supply `cid` in options to skip catalog resolution and load directly from IPFS.

### OpenTelemetry

The client emits OpenTelemetry API spans and metrics around IPFS/Zarr dataset opens. This is passive by default: no telemetry is exported unless the application configures an OpenTelemetry SDK/provider.

Emitted names include:

- Span `dclimate_client.ipfs.load_zarr_dataset`
- Span `dclimate_client.ipfs.open_jaxray_store`
- Counter `dclimate_client.ipfs.dataset_open.requests`
- Histogram `dclimate_client.ipfs.dataset_open.duration`
- Counter `dclimate_client.ipfs.store_open.requests`
- Histogram `dclimate_client.ipfs.store_open.duration`

Metric attributes include the gateway URL, store type, and status. The dataset CID is only attached to the trace span to avoid high-cardinality metric labels.

## API Reference

### DClimateClient
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dclimate/dclimate-client-js",
"version": "0.5.5",
"version": "0.5.7",
"description": "JavaScript client for dClimate datasets using jaxray and IPFS stores",
"type": "module",
"main": "./dist/node/index.js",
Expand Down Expand Up @@ -49,7 +49,8 @@
},
"homepage": "https://github.com/dClimate/dclimate-client-js#readme",
"dependencies": {
"@dclimate/jaxray": "^0.6.8"
"@dclimate/jaxray": "^0.6.8",
"@opentelemetry/api": "^1.9.1"
},
"devDependencies": {
"typescript": "^5.4.0",
Expand Down
146 changes: 146 additions & 0 deletions src/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import {
context,
metrics,
SpanStatusCode,
trace,
type Attributes,
type Span,
} from "@opentelemetry/api";

type AttributeValue = string | number | boolean;

export type RetrievalStatus = "ok" | "error" | "connection_error";

const TRACER = trace.getTracer("dclimate_client_js.ipfs_retrieval");
const METER = metrics.getMeter("dclimate_client_js.ipfs_retrieval");

const DATASET_OPEN_COUNTER = METER.createCounter(
"dclimate_client.ipfs.dataset_open.requests",
{
unit: "1",
description: "IPFS Zarr dataset open requests.",
}
);
const DATASET_OPEN_DURATION = METER.createHistogram(
"dclimate_client.ipfs.dataset_open.duration",
{
unit: "s",
description: "IPFS Zarr dataset open latency.",
}
);
const STORE_OPEN_COUNTER = METER.createCounter(
"dclimate_client.ipfs.store_open.requests",
{
unit: "1",
description: "IPFS Zarr store open attempts.",
}
);
const STORE_OPEN_DURATION = METER.createHistogram(
"dclimate_client.ipfs.store_open.duration",
{
unit: "s",
description: "IPFS Zarr store open attempt latency.",
}
);

export function otelAttributes(
attributes: Record<string, unknown> = {}
): Attributes {
const cleaned: Record<string, AttributeValue> = {};
for (const [key, value] of Object.entries(attributes)) {
if (value === undefined || value === null) continue;
if (
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean"
) {
cleaned[key] = value;
} else {
cleaned[key] = String(value);
}
}
return cleaned;
}

export function recordSpanError(span: Span, error: unknown): void {
const message = error instanceof Error ? error.message : String(error);
span.recordException(error instanceof Error ? error : message);
span.setStatus({ code: SpanStatusCode.ERROR, message });
}

export async function withSpan<T>(
name: string,
attributes: Record<string, unknown>,
callback: (span: Span) => Promise<T>
): Promise<T> {
const span = TRACER.startSpan(name, {
attributes: otelAttributes(attributes),
});

return context.with(trace.setSpan(context.active(), span), async () => {
try {
return await callback(span);
} finally {
span.end();
}
});
}

export function recordDatasetOpen({
gatewayUrl,
storeType,
status,
seconds,
}: {
gatewayUrl: string;
storeType: string;
status: RetrievalStatus;
seconds: number;
}): void {
const attributes = otelAttributes({
"dclimate_client.ipfs.gateway": gatewayUrl,
"dclimate_client.ipfs.store_type": storeType,
"dclimate_client.ipfs.status": status,
});
DATASET_OPEN_COUNTER.add(1, attributes);
DATASET_OPEN_DURATION.record(seconds, attributes);
}

export function recordStoreOpen({
gatewayUrl,
storeType,
status,
seconds,
}: {
gatewayUrl: string;
storeType: string;
status: RetrievalStatus;
seconds: number;
}): void {
const attributes = otelAttributes({
"dclimate_client.ipfs.gateway": gatewayUrl,
"dclimate_client.ipfs.store_type": storeType,
"dclimate_client.ipfs.status": status,
});
STORE_OPEN_COUNTER.add(1, attributes);
STORE_OPEN_DURATION.record(seconds, attributes);
}

export function secondsSince(startedAt: number): number {
return (performance.now() - startedAt) / 1000;
}

export function classifyRetrievalError(error: unknown): RetrievalStatus {
const message = error instanceof Error ? error.message : String(error);
if (
message.includes("Connection refused") ||
message.includes("Max retries exceeded") ||
message.includes("Timeout") ||
message.includes("timed out") ||
message.includes("ECONNREFUSED") ||
message.includes("ETIMEDOUT")
) {
return "connection_error";
}
return "error";
}
78 changes: 73 additions & 5 deletions src/ipfs/open-dataset.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { Dataset, openIpfsStore } from "@dclimate/jaxray";
import type { IPFSELEMENTS_INTERFACE } from "@dclimate/jaxray";
import { DEFAULT_IPFS_GATEWAY } from "../constants.js";
import {
classifyRetrievalError,
recordDatasetOpen,
recordSpanError,
recordStoreOpen,
secondsSince,
withSpan,
type RetrievalStatus,
} from "../instrumentation.js";

export type IpfsElements = IPFSELEMENTS_INTERFACE;

Expand All @@ -18,10 +27,69 @@ export async function openDatasetFromCid(
}

const gatewayUrl = options.gatewayUrl ?? DEFAULT_IPFS_GATEWAY;
const { store } = await openIpfsStore(
cid,
options.ipfsElements ?? { gatewayUrl }
);
const storeType = "JaxrayIpfsStore";
const datasetStartedAt = performance.now();
let status: RetrievalStatus = "error";

return withSpan(
"dclimate_client.ipfs.load_zarr_dataset",
{
"dclimate_client.ipfs.cid": cid,
"dclimate_client.ipfs.gateway": gatewayUrl,
},
async (datasetSpan) => {
try {
const { store } = await withSpan(
"dclimate_client.ipfs.open_jaxray_store",
{
"dclimate_client.ipfs.gateway": gatewayUrl,
"dclimate_client.ipfs.store_type": storeType,
},
async (storeSpan) => {
const storeStartedAt = performance.now();
try {
const openedStore = await openIpfsStore(
cid,
options.ipfsElements ?? { gatewayUrl }
);
recordStoreOpen({
gatewayUrl,
storeType,
status: "ok",
seconds: secondsSince(storeStartedAt),
});
return openedStore;
} catch (error) {
const storeStatus = classifyRetrievalError(error);
recordStoreOpen({
gatewayUrl,
storeType,
status: storeStatus,
seconds: secondsSince(storeStartedAt),
});
recordSpanError(storeSpan, error);
throw error;
}
}
);

return Dataset.open_zarr(store);
const dataset = await Dataset.open_zarr(store);
status = "ok";
return dataset;
} catch (error) {
status = classifyRetrievalError(error);
recordSpanError(datasetSpan, error);
throw error;
} finally {
datasetSpan.setAttribute("dclimate_client.ipfs.store_type", storeType);
datasetSpan.setAttribute("dclimate_client.ipfs.status", status);
recordDatasetOpen({
gatewayUrl,
storeType,
status,
seconds: secondsSince(datasetStartedAt),
});
}
}
);
}
85 changes: 85 additions & 0 deletions tests/open-dataset.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { DEFAULT_IPFS_GATEWAY } from "../src/constants.js";
import {
classifyRetrievalError,
otelAttributes,
} from "../src/instrumentation.js";
import { openDatasetFromCid } from "../src/ipfs/open-dataset.js";

const openIpfsStoreMock = vi.hoisted(() => vi.fn());
const openZarrMock = vi.hoisted(() => vi.fn());

vi.mock("@dclimate/jaxray", () => ({
Dataset: {
open_zarr: openZarrMock,
},
openIpfsStore: openIpfsStoreMock,
}));

describe("openDatasetFromCid", () => {
beforeEach(() => {
openIpfsStoreMock.mockReset();
openZarrMock.mockReset();
});

it("opens an IPFS store and Zarr dataset with default gateway telemetry enabled", async () => {
const store = { kind: "store" };
const dataset = { kind: "dataset" };
openIpfsStoreMock.mockResolvedValue({ store });
openZarrMock.mockResolvedValue(dataset);

await expect(openDatasetFromCid("bafytest")).resolves.toBe(dataset);

expect(openIpfsStoreMock).toHaveBeenCalledWith("bafytest", {
gatewayUrl: DEFAULT_IPFS_GATEWAY,
});
expect(openZarrMock).toHaveBeenCalledWith(store);
});

it("uses caller supplied IPFS elements", async () => {
const ipfsElements = { gatewayUrl: "https://example.invalid" };
const store = { kind: "custom-store" };
const dataset = { kind: "dataset" };
openIpfsStoreMock.mockResolvedValue({ store });
openZarrMock.mockResolvedValue(dataset);

await openDatasetFromCid("bafycustom", { ipfsElements });

expect(openIpfsStoreMock).toHaveBeenCalledWith("bafycustom", ipfsElements);
});

it("preserves retrieval errors from the store opener", async () => {
const error = new Error("ETIMEDOUT while opening store");
openIpfsStoreMock.mockRejectedValue(error);

await expect(openDatasetFromCid("bafytimeout")).rejects.toThrow(error);
});
});

describe("retrieval instrumentation helpers", () => {
it("keeps metric attributes primitive and bounded", () => {
expect(
otelAttributes({
keep: "value",
count: 3,
enabled: true,
drop: undefined,
stringify: { nested: "value" },
})
).toEqual({
keep: "value",
count: 3,
enabled: true,
stringify: "[object Object]",
});
});

it("classifies common gateway connection errors", () => {
expect(classifyRetrievalError(new Error("ETIMEDOUT"))).toBe(
"connection_error"
);
expect(classifyRetrievalError(new Error("zarr metadata missing"))).toBe(
"error"
);
});
});
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
resolved "https://registry.yarnpkg.com/@noble/hashes/-/hashes-2.0.1.tgz#fc1a928061d1232b0a52bb754393c37a5216c89e"
integrity sha512-XlOlEbQcE9fmuXxrVTXCTlG2nlRXa9Rj3rr5Ue/+tX+nmkgbX720YHh0VR3hBF9xDvwnb8D2shVGOwNx+ulArw==

"@opentelemetry/api@^1.9.1":
version "1.9.1"
resolved "https://registry.yarnpkg.com/@opentelemetry/api/-/api-1.9.1.tgz#c1b0346de336ba55af2d5a7970882037baedec05"
integrity sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==

"@oxc-project/runtime@0.115.0":
version "0.115.0"
resolved "https://registry.npmjs.org/@oxc-project/runtime/-/runtime-0.115.0.tgz"
Expand Down
Loading