feat(instrument): Importer hardening — port Agentforce-style robustness to langfuse + shared base (cross-poll #8)#114
Closed
mmercuri wants to merge 1 commit into
Conversation
…r (cross-poll #8) Ports the Agentforce-style robustness patterns into a reusable ``layerlens.instrument.adapters._base.importer`` module so the Langfuse adapter (and every future importer adapter — ServiceNow, Zendesk, HubSpot, …) gets the same hardening by default rather than re-deriving it. Shared helpers: * Regex ID validation — ``validate_id`` / ``require_valid_id`` plus pre-compiled patterns for UUID, slug, integer, Salesforce, ISO date / timestamp. Blocks injection at the query-builder boundary. * Rate-limit header parsing — ``parse_rate_limit_headers`` understands the ``X-RateLimit-*`` family plus ``Retry-After`` (delta-seconds OR RFC 7231 HTTP-date). Surfaces a ``RateLimitInfo`` whose ``sleep_seconds()`` honours explicit reset deadlines. * Cursor pagination — ``paginate`` (sync iterator) + ``apaginate`` (async generator) with cursor-loop guard, empty-data short-circuit, and ``max_pages`` ceiling. * Retry with backoff — ``retry_with_backoff`` / ``aretry_with_backoff`` implementing decorrelated full-jitter (AWS Architecture Blog, 2015). Honors ``RetryableHTTPError`` carrying ``RateLimitInfo`` so a 429 sleeps until the explicit deadline rather than retrying blindly. * ``BaseImporter`` ABC orchestrating fetch → process with quarantine promotion, per-record failure isolation, and dry-run support. Langfuse adapter ported to use the helpers: * ``client.py`` retry loop replaced with ``retry_with_backoff`` + ``RetryableHTTPError``; ``get_all_traces`` rewritten to drive ``paginate``; new ``last_rate_limit`` property + 80%-usage warning mirroring Agentforce ``_check_rate_limit``. * ``importer.py`` validates trace IDs against ``ID_PATTERN_UUID`` before fetching; per-trace fetches retried via the shared helper so a brief network blip on one trace doesn't quarantine it. * ``exporter.py`` validates trace IDs and retries batch ingestion pushes on transient 429 / 5xx. Tests: 68 new tests for the shared base (``tests/instrument/adapters/_base/test_importer.py``) covering helper correctness, edge cases, retry exhaustion, rate-limit-aware backoff, and end-to-end ``BaseImporter`` orchestration. 7 new tests in ``test_langfuse_adapter.py`` verify the refactored importer/exporter use the shared helpers (invalid-ID rejection, transient retry, terminal 4xx no-retry, pagination iteration pattern, rate-limit warning). Docs: ``docs/adapters/importer-base.md`` defines the contract for future importer adapters, including the acceptance checklist. Acceptance: * uv run pytest tests/instrument/adapters/_base/test_importer.py -x → 68 passed * uv run pytest tests/instrument/adapters/frameworks/test_langfuse_adapter.py -x → 19 passed * uv run mypy --strict src/layerlens/instrument/adapters/_base/importer.py → success * uv run ruff check (shared base + langfuse + tests) → clean Effort estimate: ~7h (cross-pollination audit §3.1 #8).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Cross-pollination audit item #8 (
A:/tmp/adapter-cross-pollination-audit.md§3.1).Ports the Agentforce-style hardening patterns into a reusable
layerlens.instrument.adapters._base.importermodule so the Langfuse adapter — and every future importer adapter (ServiceNow, Zendesk, HubSpot, …) — gets the same hardening by default rather than re-deriving it.BaseImporterABC + helpers under_base/importer.pyclient.py/importer.py/exporter.pyrefactored onto the shared helpersdocs/adapters/importer-base.mddefining the contract for future importersEffort estimate: ~7h (matches audit §3.1 #8).
Shared base
validate_id/require_valid_idID_PATTERN_*constantsparse_rate_limit_headersX-RateLimit-*and RFC 7231Retry-AfterRateLimitInfo.is_throttled,.sleep_seconds(),.usage_ratiopaginate/apaginatebatched_inWHERE x IN (…)queriesretry_with_backoffRetryableHTTPErrorRateLimitInfoso retry can sleep until resetBaseImporterLangfuse refactor
client.pyretry loop replaced withretry_with_backoff+RetryableHTTPError;get_all_tracesrewritten to drivepaginate. Newlast_rate_limitproperty + 80 %-usage warning mirroring Agentforce_check_rate_limit.importer.pyvalidates trace IDs againstID_PATTERN_UUIDbefore fetching; per-trace fetches retried so a brief network blip on one trace doesn't quarantine it.exporter.pyvalidates trace IDs and retries batch ingestion pushes on transient 429 / 5xx.Backward compatible —
LangfuseAPIErrorsemantics are preserved (exhausted retries surface the same exception type callers always saw).Test plan
uv run pytest tests/instrument/adapters/_base/test_importer.py -x→ 68 passeduv run pytest tests/instrument/adapters/frameworks/test_langfuse_adapter.py -x→ 19 passed (12 original + 7 new — no regression)uv run mypy --strict src/layerlens/instrument/adapters/_base/importer.py→ successuv run ruff check src/layerlens/instrument/adapters/_base/importer.py src/layerlens/instrument/adapters/frameworks/langfuse/ tests/instrument/adapters/_base/ tests/instrument/adapters/frameworks/test_langfuse_adapter.py→ All checks passedNotes for reviewer (@m-peko)
retry_with_backoffaccepts asleepcallable so test fixtures overridetime.sleeprather than block.max_delayso a misconfiguredRetry-Aftercan't block forever.LangfuseAPIError/SyncResult/SyncStateexternal surface unchanged — pure additive refactor.feat/instrument-frameworks-langgraph(the only branch in the repo that has the Langfuse adapter wired in). Once that branch lands on main, rebasing this PR is mechanical.