Skip to content

feat(instrument): Importer hardening — port Agentforce-style robustness to langfuse + shared base (cross-poll #8)#114

Closed
mmercuri wants to merge 1 commit into
feat/instrument-frameworks-langgraphfrom
feat/instrument-importer-hardening
Closed

feat(instrument): Importer hardening — port Agentforce-style robustness to langfuse + shared base (cross-poll #8)#114
mmercuri wants to merge 1 commit into
feat/instrument-frameworks-langgraphfrom
feat/instrument-importer-hardening

Conversation

@mmercuri

Copy link
Copy Markdown
Contributor

Summary

Cross-pollination audit item #8 (A:/tmp/adapter-cross-pollination-audit.md §3.1).

Ports the Agentforce-style hardening patterns into a reusable
layerlens.instrument.adapters._base.importer module so the Langfuse adapter — and every future importer adapter (ServiceNow, Zendesk, HubSpot, …) — gets the same hardening by default rather than re-deriving it.

  • Shared BaseImporter ABC + helpers under _base/importer.py
  • Langfuse client.py / importer.py / exporter.py refactored onto the shared helpers
  • 68 new unit tests for the shared base + 7 new tests in the langfuse adapter suite
  • Reference doc at docs/adapters/importer-base.md defining the contract for future importers

Effort estimate: ~7h (matches audit §3.1 #8).

Shared base

Symbol Purpose
validate_id / require_valid_id Regex-anchored ID validation BEFORE query interpolation
ID_PATTERN_* constants Pre-compiled patterns: UUID, slug, integer, Salesforce, date, timestamp
parse_rate_limit_headers Normalised view of X-RateLimit-* and RFC 7231 Retry-After
RateLimitInfo .is_throttled, .sleep_seconds(), .usage_ratio
paginate / apaginate Sync iterator + async generator over cursor-paged endpoints
batched_in Chunk IDs for WHERE x IN (…) queries
retry_with_backoff Decorrelated full-jitter; honours rate-limit reset deadlines
RetryableHTTPError Carries optional RateLimitInfo so retry can sleep until reset
BaseImporter Orchestrator — fetch → process with quarantine + per-record isolation

Langfuse refactor

  • client.py retry loop replaced with retry_with_backoff + RetryableHTTPError; get_all_traces rewritten to drive paginate. New last_rate_limit property + 80 %-usage warning mirroring Agentforce _check_rate_limit.
  • importer.py validates trace IDs against ID_PATTERN_UUID before fetching; per-trace fetches retried so a brief network blip on one trace doesn't quarantine it.
  • exporter.py validates trace IDs and retries batch ingestion pushes on transient 429 / 5xx.

Backward compatible — LangfuseAPIError semantics are preserved (exhausted retries surface the same exception type callers always saw).

Test plan

  • uv run pytest tests/instrument/adapters/_base/test_importer.py -x68 passed
  • uv run pytest tests/instrument/adapters/frameworks/test_langfuse_adapter.py -x19 passed (12 original + 7 new — no regression)
  • uv run mypy --strict src/layerlens/instrument/adapters/_base/importer.py → success
  • uv run ruff check src/layerlens/instrument/adapters/_base/importer.py src/layerlens/instrument/adapters/frameworks/langfuse/ tests/instrument/adapters/_base/ tests/instrument/adapters/frameworks/test_langfuse_adapter.py → All checks passed

Notes for reviewer (@m-peko)

  • No TODOs, no stubs, no escape hatches.
  • retry_with_backoff accepts a sleep callable so test fixtures override time.sleep rather than block.
  • Rate-limit-aware sleep is capped by max_delay so a misconfigured Retry-After can't block forever.
  • Kept the existing LangfuseAPIError/SyncResult/SyncState external surface unchanged — pure additive refactor.
  • Base branch is feat/instrument-frameworks-langgraph (the only branch in the repo that has the Langfuse adapter wired in). Once that branch lands on main, rebasing this PR is mechanical.

…r (cross-poll #8)

Ports the Agentforce-style robustness patterns into a reusable
``layerlens.instrument.adapters._base.importer`` module so the
Langfuse adapter (and every future importer adapter — ServiceNow,
Zendesk, HubSpot, …) gets the same hardening by default rather than
re-deriving it.

Shared helpers:

* Regex ID validation — ``validate_id`` / ``require_valid_id`` plus
  pre-compiled patterns for UUID, slug, integer, Salesforce, ISO
  date / timestamp. Blocks injection at the query-builder boundary.
* Rate-limit header parsing — ``parse_rate_limit_headers`` understands
  the ``X-RateLimit-*`` family plus ``Retry-After`` (delta-seconds OR
  RFC 7231 HTTP-date). Surfaces a ``RateLimitInfo`` whose
  ``sleep_seconds()`` honours explicit reset deadlines.
* Cursor pagination — ``paginate`` (sync iterator) + ``apaginate``
  (async generator) with cursor-loop guard, empty-data short-circuit,
  and ``max_pages`` ceiling.
* Retry with backoff — ``retry_with_backoff`` /
  ``aretry_with_backoff`` implementing decorrelated full-jitter
  (AWS Architecture Blog, 2015). Honors ``RetryableHTTPError`` carrying
  ``RateLimitInfo`` so a 429 sleeps until the explicit deadline rather
  than retrying blindly.
* ``BaseImporter`` ABC orchestrating fetch → process with quarantine
  promotion, per-record failure isolation, and dry-run support.

Langfuse adapter ported to use the helpers:

* ``client.py`` retry loop replaced with ``retry_with_backoff`` +
  ``RetryableHTTPError``; ``get_all_traces`` rewritten to drive
  ``paginate``; new ``last_rate_limit`` property + 80%-usage warning
  mirroring Agentforce ``_check_rate_limit``.
* ``importer.py`` validates trace IDs against ``ID_PATTERN_UUID``
  before fetching; per-trace fetches retried via the shared helper so
  a brief network blip on one trace doesn't quarantine it.
* ``exporter.py`` validates trace IDs and retries batch ingestion
  pushes on transient 429 / 5xx.

Tests: 68 new tests for the shared base
(``tests/instrument/adapters/_base/test_importer.py``) covering helper
correctness, edge cases, retry exhaustion, rate-limit-aware backoff,
and end-to-end ``BaseImporter`` orchestration. 7 new tests in
``test_langfuse_adapter.py`` verify the refactored importer/exporter
use the shared helpers (invalid-ID rejection, transient retry, terminal
4xx no-retry, pagination iteration pattern, rate-limit warning).

Docs: ``docs/adapters/importer-base.md`` defines the contract for
future importer adapters, including the acceptance checklist.

Acceptance:
* uv run pytest tests/instrument/adapters/_base/test_importer.py -x → 68 passed
* uv run pytest tests/instrument/adapters/frameworks/test_langfuse_adapter.py -x → 19 passed
* uv run mypy --strict src/layerlens/instrument/adapters/_base/importer.py → success
* uv run ruff check (shared base + langfuse + tests) → clean

Effort estimate: ~7h (cross-pollination audit §3.1 #8).
@mmercuri mmercuri requested a review from m-peko April 26, 2026 23:37
@m-peko m-peko closed this May 21, 2026
@m-peko m-peko deleted the feat/instrument-importer-hardening branch July 2, 2026 16:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants