From b895c37301c7b9848dcced04fa06bad55c7f2cd5 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 13:11:35 +0000 Subject: [PATCH] docs: add CLAUDE.md proposals for 5 repos with recent merged PRs Adds CLAUDE.md files for 5 repositories with PRs merged to main by: SayaliPat, shrivastavakapil2000, JoeVsVolcano, mike-brant, nathan-resonate Repos covered: - step-function-workflow-orchestrator (162+ merged PRs from team) - identity-graph (SayaliPat: person-identity-graph port, 443 unit tests) - dos-data-pipeline (mike-brant: DistrictResolver, l2_party_confirmed, ToBitmap) - resonate-utils (SayaliPat: LiveRamp Playwright automation fixes) - batch-expression-modeling (update: CDP-118857/972 formatter+stitch changes) Each CLAUDE.md captures: project overview, tech stack, repo structure, common dev tasks, key invariants, and gotchas discovered in recent PRs. https://claude.ai/code/session_012jmYLE5WsWE6p9mJRBWNKn --- .../batch-expression-modeling/CLAUDE.md | 530 ++++++++++++++++++ .../dos-data-pipeline/CLAUDE.md | 110 ++++ claude-md-proposals/identity-graph/CLAUDE.md | 117 ++++ claude-md-proposals/resonate-utils/CLAUDE.md | 78 +++ .../CLAUDE.md | 157 ++++++ 5 files changed, 992 insertions(+) create mode 100644 claude-md-proposals/batch-expression-modeling/CLAUDE.md create mode 100644 claude-md-proposals/dos-data-pipeline/CLAUDE.md create mode 100644 claude-md-proposals/identity-graph/CLAUDE.md create mode 100644 claude-md-proposals/resonate-utils/CLAUDE.md create mode 100644 claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md diff --git a/claude-md-proposals/batch-expression-modeling/CLAUDE.md b/claude-md-proposals/batch-expression-modeling/CLAUDE.md new file mode 100644 index 0000000..3e6c9d8 --- /dev/null +++ b/claude-md-proposals/batch-expression-modeling/CLAUDE.md @@ -0,0 +1,530 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **Batch Expression Modeling (BEM)** system for Resonate's audience delivery platform. It evaluates audience expressions against user behavioral data at scale, processes the results through identity stitching, and formats the output for various downstream vendors (Meta, TikTok, TheTradeDesk, etc.). + +### High-Level Architecture + +The system consists of three main components orchestrated by AWS Step Functions: + +1. **BEM (Batch Expression Modeling)** - Scala/Spark job that evaluates ~350 audience expressions against 2TB+ of SuperTagAv data +2. **Stitch** - Scala/Spark job that joins evaluated expressions with vendor identity tables (cookies, HEMs, device IDs) +3. **Formatter** - Scala/Spark job that formats stitched data for specific vendor requirements + +Each component runs on AWS EMR clusters and is configured/orchestrated by Lambda functions and Step Functions state machines. + +## Repository Structure + +``` +├── src/main/scala/com/resonate/ +│ ├── bem/ # Batch Expression Modeling (evaluates expressions against SuperTagAv) +│ ├── stitch/ # Identity stitching (joins RCID with vendor IDs) +│ ├── delivery/ # Delivery formatting and vendor-specific logic +│ └── utils/ # Shared utilities +├── workflows/ +│ ├── lambdas/ # Lambda functions for pipeline configuration +│ │ ├── batch-audience-delivery-processing-config/ # Main config lambda +│ │ ├── batch-vendor-stitch/ # Stitch config lambda +│ │ ├── bem-gen-job-parameters/ # BEM job parameters +│ │ └── ... +│ └── step-functions/ # Step function state machines (ASL JSON) +│ ├── batch-audience-delivery-processing/ # Main pipeline +│ ├── batch-audience-delivery-daily-pipeline/ # Daily orchestrator +│ ├── bem-pipeline/ # BEM execution +│ ├── batch-stitch-pipeline/ # Stitch execution +│ └── batch-delivery-formatter-pipeline/ # Formatter execution +├── terraform/ # Infrastructure as code (one directory per Lambda/Step Function) +├── integration-tests/ # End-to-end integration tests +└── build.sbt # SBT build configuration +``` + +## Common Development Tasks + +### Scala/Spark Development + +**Build and test:** +```bash +# Compile Scala code +./compile.sh # or: sbt compile + +# Run tests +sbt test + +# Run specific test +sbt "testOnly com.resonate.bem.BatchExpressionModelJobTest" + +# Package JAR for EMR deployment +./package.sh # or: sbt assembly +# Output: target/batch-expression-modeling.jar +``` + +**AWS authentication:** +```bash +# Required before uploading JARs to S3 or accessing AWS resources +aws sso login +``` + +### Python Lambda Development + +**Lambda functions are in `workflows/lambdas/`**. Each has its own `requirements.txt`. + +**Run tests for a Lambda:** +```bash +cd workflows/lambdas/ +python -m pytest tests/ +``` + +**Common Lambda functions:** +- `batch-audience-delivery-processing-config` - Generates pipeline configuration from delivery schedule +- `bem-gen-job-parameters` - Generates EMR step parameters for BEM job +- `batch-vendor-stitch` - Generates stitch configuration per vendor + +### Integration Tests + +**Run full integration test:** +```bash +cd integration-tests + +# First time: install dependencies +pip install -r requirements.txt + +# Run setup + execution + validation +bash run_full_integration_test.sh +``` + +The integration test: +1. Sets up test data in S3 with datetime-based paths (e.g., `integration/data/20250914_143022/`) +2. Triggers the `aud-delivery-proc-integration` step function +3. Polls for completion and validates SUCCESS status + +**Cost:** ~$0.25-0.30 per run, ~15-25 minutes with minimal test data. + +### GitHub Actions Workflows + +**Workflow files (in `.github/workflows/`):** + +| Workflow File | Description | +|--------------|-------------| +| `bem-jar.yml` | Builds and publishes the Scala JAR to S3 | +| `bem-lambdas.yml` | Deploys Lambda functions | +| `bem-step-functions.yml` | Deploys Step Functions | +| `bem-unit-tests.yml` | Runs Scala unit tests | + +**Deploy to an environment:** + +⚠️ **CRITICAL: Always specify `--ref` when deploying from a feature branch!** + +Without `--ref`, `gh workflow run` deploys from the **default branch (main)**, NOT your current branch. This is a common mistake that results in deploying old code. + +```bash +# Deploy JAR (required input: environment) +# JAR goes to qa (integration environment uses qa JAR) +gh workflow run bem-jar.yml -f environment=qa --ref + +# Deploy Lambdas (required input: environment) +gh workflow run bem-lambdas.yml -f environment=integration --ref + +# Deploy Step Functions (required inputs: environment, deployment_type) +gh workflow run bem-step-functions.yml -f environment=integration -f deployment_type=All --ref +``` + +**Example deploying all components from a feature branch:** +```bash +BRANCH="feature/CDP-123456-my-feature" +gh workflow run bem-jar.yml -f environment=qa --ref $BRANCH +gh workflow run bem-lambdas.yml -f environment=integration --ref $BRANCH +gh workflow run bem-step-functions.yml -f environment=integration -f deployment_type=All --ref $BRANCH +``` + +**Step Functions deployment_type options:** +- `All` - Deploy all step functions +- `BEM Pipeline Step Function` +- `Batch Stitch Pipeline` +- `Batch Delivery Formatter Pipeline` +- `Audience Delivery Pipeline` +- `Audience Delivery Custom Delivery Pipeline` +- `Audience Delivery Processing Pipeline` +- `Audience Delivery Daily Pipeline` +- `Batch Delivery Formatter Metrics Lambda` *(added CDP-118857)* + +**Environment values:** +- `integration` - Integration testing environment (uses QA JAR) +- `dev`, `dev2`-`dev7` - Development environments +- `nonprod` - QA/staging environment +- `prod` - Production environment + +**Check deployment status:** +```bash +# List recent runs for a workflow +gh run list --workflow=bem-jar.yml --limit=3 +gh run list --workflow=bem-lambdas.yml --limit=3 +gh run list --workflow=bem-step-functions.yml --limit=3 + +# Watch a specific run +gh run watch +``` + +**Note:** The integration environment uses the QA JAR. When testing changes in integration: +1. Deploy JAR to `qa` +2. Deploy lambdas to `integration` +3. Deploy step functions to `integration` + +### Working with Step Functions + +Step functions are defined in `workflows/step-functions/*/statemachine/*.asl.json` and use **JSONata query language** (not JSONPath). + +**Key step functions:** +- `batch-audience-delivery-daily-pipeline` - Entry point triggered daily by EventBridge +- `batch-audience-delivery-processing` - Main pipeline orchestrating BEM → Stitch → Formatter +- `bem-pipeline` - EMR cluster creation → BEM job execution → cluster termination +- `batch-stitch-pipeline` - Parallel execution of stitch jobs per vendor +- `batch-delivery-formatter-pipeline` - Formatting for vendor-specific requirements + +## Key Concepts + +### Expression Evaluation (BEM) + +The BEM job (`BatchExpressionModelJob.scala`) evaluates audience expressions against SuperTagAv data: + +1. **Input:** + - `surveyexpressions.csv` - List of expressions to evaluate (e.g., `NOT (E999999998)`) + - SuperTagAv parquet data - User behavioral data with attribute bitmaps (evmap, avmap, cvmap) + - Models CSV - Expression evaluation models + +2. **Processing:** + - Reads SuperTagAv data (~2TB) with optimized schema (only needed bitmap columns) + - Evaluates each expression against each RCID's attribute bitmaps + - Uses `AudienceEvaluator` from `das-expression` library + - Outputs matches partitioned by `exprHash` + +3. **Output:** Parquet files with schema `(rcid, exprHash, score)` partitioned by `exprHash=` + +### Identity Stitching + +The Stitch job (`TableStitch.scala`) joins BEM output with vendor identity tables: + +1. **Input:** + - BEM cache (evaluated expressions with RCIDs) + - Vendor stitch tables (e.g., `tmid`, `hemsha2`, `viant_id` columns) + - Audience metadata (expression hash to audience ID mappings) + +2. **Processing:** + - Joins BEM cache with stitch table on `rcid` + - Maps `exprHash` to `audienceIds` and explodes + - Filters empty vendor keys + +3. **Output:** `(akey, vkey, score)` where `akey` = audience key, `vkey` = vendor key (e.g., cookie, email hash) + +### Delivery Configuration + +The `batch-audience-delivery-processing-config` Lambda generates pipeline configuration: + +- Queries RTP database for delivery schedules (via `RTP_PSQL_CONNECTION_STRING`) +- Determines which vendors need delivery based on schedule windows +- Generates S3 paths for tagav data, cookiejar, expressions, and output locations +- Sets EMR cluster configurations (instance types, counts, etc.) + +**Important fields:** +- `delivery_type` - "daily", "training", "custom" +- `tagav_date_folder` - Date folder for SuperTagAv input (format: YYYYMMDD) +- `cookie_jar_date` - Date folder for cookiejar data +- `start_time_of_delivery` / `end_time_of_delivery` - Time window for vendor deliveries (epoch milliseconds) + +### Formatter (CDP-118857) + +The Formatter Spark job formats stitched data per vendor. Key recent changes: + +**Output partition layout (changed in CDP-118857):** +- Old: `/date=*/method=av/vendor=*/akey=*/` +- **New:** `/date=*/vendor=*/method=*/akey=*/` + +Any consumer that concatenates a `source_prefix` must use the new `vendor=*/method=*/` order. Affected repos: `batch-audience-delivery-syndication`, `batch-audience-delivery-experian-syndication`, `batch-audience-delivery-yahoo-syndication`, `batch-expression-modeling` (custom-delivery ASL). + +**Metrics lambda (CDP-118857):** After EMR cluster termination, the formatter pipeline invokes `batch-delivery-formatter-publish-metrics`. Metrics are written to InfluxDB: +- `com.resonate.delivery.format.count` — per-audience rows +- `com.resonate.delivery.format.aggregate` — aggregate rows per vendor/method/delivery_type + +**File extension caveat:** Spark's `partitionBy("method")` changed codec suffix separators from `-c000.csv.gz` to `.c000.csv.gz`. Publisher lambdas (`experian-publish-data-files`, `openx-publish-data-files`, etc.) must hardcode `.csv.gz` as the output extension rather than parsing it from the source filename. + +**`delta_with_full_fallback` fix (CDP-118857):** `_annotate_previous_date_partitions` now accepts both `"delta"` and `"delta_with_full_fallback"` refresh types (mirrors Scala's `RefreshType.isDelta`). Meta deliveries use `delta_with_full_fallback`. + +### Batch Stitch Pipeline (CDP-118972) + +`AddJobFlowSteps` calls are staggered via index-based `Wait` states (iteration N waits N seconds before submission). This prevents AWS EMR `AddJobFlowSteps` throttling when N stitch steps are submitted in parallel. + +The non-functional `EMR.ThrottlingException` retry was removed — the actual throttle error is `EMR.AmazonElasticMapReduceException`, not `EMR.ThrottlingException`. + +### Prior-Stitch Partition Pruning (CDP-118857) + +`prunePartitions` takes `dates: Seq[String]` and applies `col("date").isin(dates: _*)` as a static Catalyst filter. This prevents OOM on meta deliveries with `delta_with_full_fallback` by limiting the prior-stitch scan to only the dates referenced by current deliveries. + +## AWS Step Functions: Converting JSONPath to JSONata + +### Top-Level Changes + +Add `"QueryLanguage": "JSONata"` at the state machine or state level. + +### Field Replacements + +JSONPath's five fields are replaced with two in JSONata: +- **Arguments** - Replaces `Parameters` for sending data to integrated actions +- **Output** - Replaces `ResultPath`, `ResultSelector`, and `OutputPath` for transforming state output + +### JSONPath Expression Syntax + +- **JSONPath**: `"field.$": "$.path.to.value"` +- **JSONata**: `"field": "{% $states.input.path.to.value %}"` + +Remove the `.$` suffix and wrap expressions in `{% %}` delimiters. + +### ResultPath Conversions + +**Merge result into input at a path:** +```json +// JSONPath +"ResultPath": "$.ClusterCreationResult" + +// JSONata +"Output": "{% $merge([$states.input, {'ClusterCreationResult': $states.result}]) %}" +``` + +**Discard result, pass through input only:** +```json +// JSONPath +"ResultPath": null + +// JSONata +// Simply omit the Output field - input passes through by default +``` + +**Replace entire input with result (default):** +```json +// JSONPath +"ResultPath": "$" // or omit ResultPath + +// JSONata +// Omit Output field, or use: +"Output": "{% $states.result %}" +``` + +### OutputPath Conversions + +**Extract specific field from result:** +```json +// JSONPath +"OutputPath": "$.Payload" + +// JSONata +"Output": "{% $states.result.Payload %}" +``` + +### Catch Block Error Handling + +**Merge error into input:** +```json +// JSONPath +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "ResultPath": "$.Error", + "Next": "HandleError" + } +] + +// JSONata +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Output": "{% $merge([$states.input, {'Error': $states.errorOutput}]) %}", + "Next": "HandleError" + } +] +``` + +Note: `$states.errorOutput` is only available in Catch blocks. + +### Map State Conversions + +**ItemsPath:** +```json +// JSONPath +"ItemsPath": "$.step_configs" + +// JSONata +"Items": "{% $states.input.step_configs %}" +``` + +**ItemSelector:** +```json +// JSONPath +"ItemSelector": { + "step_config.$": "$$.Map.Item.Value", + "clusterId.$": "$.ClusterCreationResult.ClusterId" +} + +// JSONata +"ItemSelector": { + "step_config": "{% $states.context.Map.Item.Value %}", + "clusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" +} +``` + +**IMPORTANT:** Use `$states.context.Map.Item.Value` to access the current item in a Map state. There is **NO** `$states.item` variable - this is a common mistake that will cause "Field '$states.item' does not exist" errors. + +**⚠️ CRITICAL: Map States Must Preserve Input** + +By default, a Map state outputs an **array of results** from processing each item, which **loses the original input fields**. If subsequent states need access to fields from the original input (like a ClusterId from cluster creation), you MUST add an Output field to preserve the input. + +```json +// WRONG - This loses ClusterCreationResult from input +"Step Map": { + "Type": "Map", + "ItemSelector": { + "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" + }, + "Items": "{% $states.input.step_configs %}", + "Next": "TerminateCluster" // ❌ Will fail - ClusterCreationResult is lost! +} + +// CORRECT - Preserve input for next state +"Step Map": { + "Type": "Map", + "ItemSelector": { + "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" + }, + "Items": "{% $states.input.step_configs %}", + "Output": "{% $states.input %}", // ✅ Preserves all input fields + "Next": "TerminateCluster" +} +``` + +**Common Error:** +``` +The JSONata expression '$states.input.ClusterCreationResult.ClusterId' specified for the field 'Arguments/ClusterId' returned nothing (undefined). +``` + +This error means the Map state didn't preserve the input, so subsequent states can't access fields that were in the original input. + +### Array Construction + +**Intrinsic function to array literal:** +```json +// JSONPath +"Args.$": "States.Array('cmd', '--flag', $.input.value, '--opt', $.input.other)" + +// JSONata - Use regular JSON array with individual JSONata expressions +"Args": [ + "cmd", + "--flag", + "{% $states.input.value %}", + "--opt", + "{% $states.input.other %}" +] +``` + +### Reserved Variables in JSONata + +- `$states.input` - Original input to the current state +- `$states.result` - Result from API/sub-workflow (Task, Parallel, Map states) +- `$states.errorOutput` - Error output (only in Catch blocks) +- `$states.context` - Execution metadata (StartTime, task token, Map.Item.Value, etc.) + - `$states.context.Map.Item.Value` - Current item in Map state iteration + - `$states.context.Map.Item.Index` - Current iteration index in Map state + +### ⚠️ Common Pitfall: $states.item Does NOT Exist + +**Error:** `Field '$states.item' does not exist` + +**WRONG:** +```json +"ItemSelector": { + "item": "{% $states.item %}" // ❌ This will fail! +} +``` + +**CORRECT:** +```json +"ItemSelector": { + "item": "{% $states.context.Map.Item.Value %}" // ✅ Use this instead +} +``` + +Many online examples and AI suggestions incorrectly reference `$states.item`, but this variable does not exist in AWS Step Functions JSONata. Always use `$states.context.Map.Item.Value` to access the current Map iteration item. + +### Key Differences from JSONPath + +1. **No `.$` suffix** - Regular field names with JSONata expressions in `{% %}` +2. **No Path fields** - InputPath, ResultPath, OutputPath are replaced by Arguments and Output +3. **Assign vs Output** - `Assign` creates variables accessible in all future states; `Output` only affects the immediate next state +4. **Parallel processing** - Assign and Output are processed in parallel; variable assignments don't affect Output + +### Common Patterns + +**Pass through input unchanged:** +- Omit the Output field entirely, OR explicitly use `"Output": "{% $states.input %}"` + +**IMPORTANT:** Any Task state that needs to pass data to subsequent states should preserve the input explicitly. Without an Output field, the Task's result replaces the entire input, losing all previous data. + +**Common scenarios requiring input preservation:** + +1. **After Catch blocks** - When error info was merged into input +2. **Cleanup/termination tasks** - When subsequent states need original context (e.g., Lambda functions that publish metrics need delivery_config) +3. **Between pipeline stages** - When chaining multiple operations that all need access to the original request + +```json +// Example 1: Catch block merges error into input +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Output": "{% $merge([$states.input, {'Error': $states.errorOutput}]) %}", + "Next": "CleanupTask" + } +] + +// The CleanupTask MUST preserve input if Error is needed later +"CleanupTask": { + "Type": "Task", + "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster", + "Arguments": { "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" }, + "Output": "{% $states.input %}", // ✅ Preserves Error field + "Next": "NotifyFailure" +} + +// Example 2: EMR termination before Lambda that needs delivery config +"EMR TerminateCluster Success": { + "Type": "Task", + "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster", + "Arguments": { "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" }, + "Output": "{% $states.input %}", // ✅ Lambda needs delivery_config from input + "Next": "Publish Metrics" +} +``` + +**Add result as new field while preserving input:** +```jsonata +"Output": "{% $merge([$states.input, {'resultField': $states.result}]) %}" +``` + +**Transform and extract specific data:** +```jsonata +"Output": { + "field1": "{% $states.input.someValue %}", + "field2": "{% $states.result.someOtherValue %}" +} +``` + +**Create variables for use in later states:** +```jsonata +"Assign": { + "myVariable": "{% $states.input.someValue %}" +} +// Later states can reference {% $myVariable %} +``` + +### Reference Documentation + +- [AWS Step Functions - Transforming data with JSONata](https://docs.aws.amazon.com/step-functions/latest/dg/transforming-data.html) +- [AWS Step Functions - Passing data between states with variables](https://docs.aws.amazon.com/step-functions/latest/dg/workflow-variables.html) diff --git a/claude-md-proposals/dos-data-pipeline/CLAUDE.md b/claude-md-proposals/dos-data-pipeline/CLAUDE.md new file mode 100644 index 0000000..5f9d546 --- /dev/null +++ b/claude-md-proposals/dos-data-pipeline/CLAUDE.md @@ -0,0 +1,110 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +**dos-data-pipeline** contains the Scala/Spark data pipeline jobs for Resonate's DOS (Data Operations & Services) platform. The primary pipelines are `geo-location` (IP → postal/district mapping) and `segment-aggregator`. Other pipelines (ToBitmap, BitmapEnrichment, etc.) are triggered by the `datapipeline-trigger` Lambda in `lambda-modeling`; their configurations live in `services-api-configuration`. + +## Tech Stack + +- **Scala** / **Spark** (EMR 6.x or 7.x depending on pipeline) +- **SBT** build tool +- **GitHub Actions** for CI/CD deployment +- Configs for geo-location and segment-aggregator live **in this repo** (deployed by GitHub Actions) + +## Repository Structure + +``` +├── src/main/scala/com/resonate/ +│ ├── geo/ # GeoLocationDaily, GeoLocationFullBackfill, GeoLocationFull +│ ├── segment/ # SegmentAggregator +│ ├── bitmap/ # ToBitmap (legislative-district marker bits) +│ └── utils/ # DistrictResolver, shared helpers +├── src/test/scala/com/resonate/ +│ └── ... # Unit tests +└── config/ # Geo-location and segment-aggregator configs (deployed in-repo) +``` + +## Common Development Tasks + +### Build and Test + +```bash +# Run all unit tests +sbt test + +# Run a specific test suite +sbt "testOnly com.resonate.geo.GeoLocationDailyTest" + +# Package the JAR +sbt assembly +``` + +### CI/CD Deploy + +Deployment is through GitHub Actions. Configuration files for geo-location and segment-aggregator in this repo are deployed by Actions — **do not apply config changes manually to S3 in dev/prod**. + +## Key Jobs + +### GeoLocationDaily + +Processes daily RcidEnriched output — appends `zip`, `congress`, `state_senate`, `state_house`, `proposed_congress`, `district_source`, and `l2_party_confirmed` columns. + +**DistrictResolver** (CDP-118946): centralises L2-vs-IP district coalescing: + +| `district_source` | Condition | +|---|---| +| `L2_CONFIRMED` | L2 has district AND `l2_party_confirmed = true` | +| `L2_UNCONFIRMED` | L2 has district AND `l2_party_confirmed != true` | +| `IP_INFERRED` | No L2 district, but IP zip lookup populated at least one district | +| `null` | No L2 and no IP match | + +The 4-file zip→district mapping (`zip-congress-mapping.csv`, `zip-proposed-congress-mapping.csv`, `zip-state-senate-mapping.csv`, `zip-state-house-mapping.csv`) is passed via a single base-path arg `zipDistrictMappingsBasePath`. Files are read by convention, not hardcoded — do not rename the CSV files. + +### GeoLocationFullBackfill + +Re-derives all four districts for every RCID in the rolling-full. Reads `zip` from the existing fullDf (populated at Daily time) — does NOT make a second NetAcuity lookup. + +**Key invariant**: `districtColumnList` arg was removed (CDP-118946). The Backfill always re-derives all four districts. The orchestrator's `geo_location.asl.json` must NOT pass `DistrictColumnList`. + +### ToBitmap (CDP-118944 / CDP-118947) + +Sets the `A200299998` (L2-confirmed) marker bit. The marker is: +- Set unconditionally (Phase A) — present on every RCID +- Will be narrowed to `district_source = L2_CONFIRMED` in Phase 2 Unit 5 + +**Reserved bit range**: `[100300001, 200300000]`. ToBitmap's `mergeGeoAttrsForRoaringBitmapUdf` wipes `[100300001, 200299999)` on every run — `200299998` is inside this range. Do not assign new geo bits outside `[200299998, 200300000]`. + +### l2_party_confirmed (CDP-118945) + +Added to `rcid-l2-enriched` output (via `core-data-applications/L2HemToRcid`): + +- `true`: cookie party null, L2 party non-major, or parties match +- `false`: both major (Dem/Rep) and disagree (Bucket 2, ~10% of cookie jar) + +This field gates the `district_source` classification downstream. + +## Coordinated Deploys + +The following units must deploy together in a single window: +- **Unit 3** (`core-data-applications`): `l2_party_confirmed` from `L2HemToRcid` +- **Unit 4** (`dos-data-pipeline`): `district_source` + IP fill in `GeoLocationDaily`/`GeoLocationFullBackfill` +- **Unit 5** (`dos-data-pipeline`): `ToBitmap` market-bit gating on `L2_CONFIRMED` + +Merging any one without the others leaves the pipeline in a broken intermediate state. + +## Testing Guidelines + +- `GeoLocationDailyTest`: rewires fixtures into `ZipDistrictMappings`; exercises DistrictResolver paths directly +- `GeoLocationFullBackfillTest`: cover all 7 `district_source` paths (L2_CONFIRMED, L2_UNCONFIRMED, IP_INFERRED, null, partial-L2, IP-only, no-match) +- `GeoLocationFullTest`: update tuple arities when new output columns are added +- `sbt test` must pass before any PR; 60 tests as of 2026-05-20 + +## Key Constraints + +- **Never pass `DistrictColumnList` to `GeoLocationFullBackfill`** — the arg was removed in CDP-118946 +- **Never make a second NetAcuity lookup in Backfill** — reuse `zip` from fullDf +- **4-file zip mapping must stay named exactly** as listed — read by convention +- **Marker bit `200299998` must stay inside the wipe range** `[100300001, 200299999)` +- Branch naming: `feature/{JIRA-ID}-{kebab-slug}` diff --git a/claude-md-proposals/identity-graph/CLAUDE.md b/claude-md-proposals/identity-graph/CLAUDE.md new file mode 100644 index 0000000..8a568d1 --- /dev/null +++ b/claude-md-proposals/identity-graph/CLAUDE.md @@ -0,0 +1,117 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +**identity-graph** contains the Scala/Spark jobs that build Resonate's person identity graph — linking RCIDs to hashed emails, phone numbers, voter records, Experian digital graph data, and Tapad device IDs. It runs on AWS EMR via step functions in `step-function-workflow-orchestrator`. + +## Tech Stack + +- **Scala** 2.12.17 / **Spark** 3.5.0 +- **SBT** build tool +- **JDK 17** runtime (--add-opens flags required — already configured in `build.sbt`) +- **Scopt** for CLI argument parsing (all jobs use scopt, not `application.conf`) +- Deploy: JAR published to `s3://resonate-core-applications/identity-graph/jars/identity-graph-latest.jar` + +## Repository Structure + +``` +├── src/main/scala/com/resonate/ +│ ├── jobs/ # 11 pipeline jobs (see catalogue below) +│ └── utils/ # Shared utilities +│ ├── HashUtils # Deterministic SHA256-based person_id / household_id generation +│ ├── StagingWriter # cache → count → write → unpersist pattern +│ ├── AddressNormalizer # USPS Pub 28 suffix expansion, nickname resolution (transitive, max 5 hops) +│ ├── IpFilter # RFC1918 + datacenter/cloud IP blocklist +│ └── ScoringConfig # Centralized confidence scoring and recency decay +├── src/test/scala/com/resonate/ +│ └── ... # 443 unit tests across 20 test suites +└── build.sbt +``` + +## Common Development Tasks + +### Build and Test + +```bash +# Run all unit tests +sbt test + +# Run a specific test suite +sbt "testOnly com.resonate.jobs.PersonIdentityJobSpec" + +# Package the JAR for EMR deployment +sbt assembly +# Output: target/scala-2.12/identity-graph-assembly-*.jar + +# AWS authentication (needed before uploading JARs) +aws sso login +``` + +### CI/CD Deploy + +The JAR is automatically built and uploaded to S3 on push to `main`. The GitOps flow is documented at the Confluence link in the README. For feature branch testing, manually dispatch the CI publish workflow with `environment=dev`. + +## Job Catalogue + +| Job | Purpose | +|---|---| +| `ExperianPreprocessJob` | Normalises Experian ConsumerView + digital graph (CSV/Parquet → normalised Parquet) | +| `L2ConsumerPreprocessJob` | Preprocesses L2 consumer tab-delimited files | +| `TapadIndexJob` | Maps Experian/Tapad identifiers to `identifier_index` (direct + staging) | +| `L2IndexJob` | Maps L2 voter records to `identifier_index` entries | +| `L2ConsumerIndexJob` | Maps L2 consumer records to `identifier_index` entries | +| `OnboardingMatchJob` | 4-phase matching: blocking → corroboration → scoring → fallback | +| `OnboardingHouseholdLinksJob` | Household CTV/TTD attribution via digital graph | +| `OnboardingPersonsJob` | Creates Tapad-only person records (deprecated, BL-106) | +| `MergeOnboardingLinksJob` | Multi-provider collapse + BL-4/BL-107 confidence scoring | +| `PersonIdentityJob` | Main orchestrator: persons + identifier_index from L2/LI/LR/Sovrn | +| `CustomerMatchJob` | Ephemeral customer file matching (7 fallback tiers) | + +## Key Invariants and Bug Fixes + +These are non-obvious correctness constraints — always preserve them: + +- **`L2IndexJob` HOUSEHOLD_ID**: `filterNulls = true` (not false) — prevents phantom household collapse from null household IDs. +- **`OnboardingMatchJob` Phase 3**: Window for `selectBestMatch` includes `person_id.asc` tie-breaker — prevents non-deterministic output. +- **`ExperianPreprocessJob` aggregation**: Use `min(id_value)` not `first(id_value)` — prevents non-deterministic aggregation. +- **`AddressNormalizer` nickname chains**: `resolveNicknameUDF` applies iterative lookup (max 5 hops) for transitive chains. +- **`MergeOnboardingLinksJob` / `PersonIdentityJob`**: null `identifier_value` filter before groupBy prevents null-key groupBy errors. + +## Shared Utilities + +### HashUtils +Generates deterministic `person_id` and `household_id` via SHA256. Do not change the hashing logic — it must be stable across runs. + +### StagingWriter +Wraps the `cache → count → write → unpersist` pattern for staging partitions. Use this instead of ad-hoc caching. + +### AddressNormalizer +Implements USPS Pub 28 suffix expansion and nickname resolution. Nickname chains are resolved transitively (up to 5 hops). The `ip_blocklist.txt` should be refreshed periodically. + +### ScoringConfig +Centralises confidence scoring thresholds, recency decay parameters, and artifact thresholds. Do not hardcode scoring values in individual jobs. + +## Testing Guidelines + +- Each job MUST have a corresponding `*Spec.scala` test suite +- All Spark tests use `SparkSession` in local mode — no external dependencies +- JDK 17 requires the `--add-opens` flags already present in `build.sbt`; do NOT remove them +- Test fixtures use column-aligned parquet schemas — verify row counts match expected values +- `parallelExecution in Test := false` — tests run sequentially to avoid Spark session conflicts + +## Integration with step-function-workflow-orchestrator + +This repo is the Spark side only. The companion Step Function pipelines live in `step-function-workflow-orchestrator`: +- `pipelines/experian-data-processing/` — runs `ExperianPreprocessJob` + `ExperianDataProcessor` +- `pipelines/person-identity-graph/` — runs the full identity pipeline sequence + +Changing a job's CLI args requires a coordinated PR in both repos. + +## Key Constraints + +- **Never change SHA256 hashing logic in `HashUtils`** — any change breaks historical linkage +- **Never use `first()`** for aggregations where order is not guaranteed — use `min()` or `max()` +- **Null guards before groupBy** — always filter null identifier_value/household_id before groupBy +- Branch naming: `feature/{JIRA-ID}-{kebab-slug}` diff --git a/claude-md-proposals/resonate-utils/CLAUDE.md b/claude-md-proposals/resonate-utils/CLAUDE.md new file mode 100644 index 0000000..5e3361a --- /dev/null +++ b/claude-md-proposals/resonate-utils/CLAUDE.md @@ -0,0 +1,78 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +**resonate-utils** is a shared repository for reusable engineering utilities, diagnostics, and automation scripts. It is NOT a production service — it contains standalone tools used by engineers for automation, data checks, and operational tasks. + +## Repository Structure + +``` +src/ +├── python/ +│ └── liveramp-automation/ # Playwright-based LiveRamp resend automation +│ ├── liveramp_resend.py # Main automation class +│ └── tests/ # pytest test suite +├── scala/ # Ad-hoc Spark/validation tools +└── shell/ # Shell scripts for ops automation +.github/ +└── workflows/ + ├── build-scala.yml # Builds Scala fat JARs + └── lint-python.yml # Lints Python scripts +``` + +## Python Development + +### LiveRamp Automation (`src/python/liveramp-automation/`) + +Playwright-based browser automation for the LiveRamp segment resend workflow. Runs on ECS via scheduled execution. + +**Key design pattern — always use Locators, never ElementHandles:** + +```python +# CORRECT: Locators re-query the DOM on each interaction +more_btn = page.locator('[aria-label="More options"]') +more_btn.click() + +# WRONG: ElementHandle can go stale after React/Pendo re-renders +more_btn = page.query_selector('[aria-label="More options"]') +more_btn.click() # May raise "Element is not attached to the DOM" +``` + +The `_click_more_menu` method implements retry logic with: +1. Re-query of the element before each retry attempt (not reusing a stale handle) +2. 1000ms stabilization wait between retries +3. `force=True` fallback if Pendo still blocks after retries +4. Persistent CSS block (`display:none; pointer-events:none`) on all Pendo elements + `pendo.stopGuides()` + +**Running tests:** +```bash +cd src/python/liveramp-automation +pip install -r requirements.txt +python -m pytest tests/ -v +``` + +**Monitoring:** CloudWatch logs at `/ecs/liveramp-automation` + +### General Python Guidelines + +- Use `pytest` for all tests +- Keep each utility self-contained with its own `requirements.txt` +- Add a `README.md` in each utility directory + +## Scala Development + +```bash +cd +sbt assembly +# Submit via spark-submit +spark-submit --class com.resonate.SomeApp target/scala-2.12/your-tool.jar +``` + +## Key Constraints + +- This repo contains **one-off tools and utilities** — not production services +- Do not add production-critical logic here; it belongs in the appropriate service repo +- The LiveRamp automation must run on ECS; keep Playwright dependencies compatible with the ECS container image +- **Never reuse stale `ElementHandle` objects** — always use Playwright `Locator` API diff --git a/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md b/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md new file mode 100644 index 0000000..b5c6c80 --- /dev/null +++ b/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md @@ -0,0 +1,157 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository is the **X-Men Step Function Workflow Orchestrator** — the central hub for all AWS Step Function–based data pipelines at Resonate. It contains pipeline ASL state machines, Python Lambdas, Terraform/Terragrunt infrastructure configs, and integration tests for pipelines across the CDP/data platform. + +## Repository Structure + +``` +├── pipelines/ # Per-pipeline directory +│ └── / +│ ├── statemachine/ # ASL JSON state machine +│ ├── config/ # EMR, params, steps configs (dev/prod/integration) +│ ├── events/ # Step Function input events (dev/prod/integration) +│ └── tests/ +│ └── integration/ # pytest integration test suite +│ ├── conftest.py # Fixtures: SFN execution, S3 golden data +│ ├── create_golden_dataset_synthetic.py # Synthetic S3 test data generator +│ └── test_.py # Smoke, content, schema, history tests +├── python_lambdas/ # Lambda functions (Python) +│ ├── check-source-freshness/ # Multi-strategy S3/SSM freshness gate +│ ├── dynamic-dates/ # Resolves / date-range tokens in event JSONs +│ ├── avmap-mapper/ # Maps rcid to avmap +│ └── ... +├── terraform/ +│ └── pipelines/ +│ └── / +│ └── /terragrunt.hcl # Per-env Terragrunt config +└── .github/workflows/ + └── step_function.yml # Deploy workflow (environment + step_function inputs) +``` + +## Common Development Tasks + +### Running Integration Tests + +Integration tests run against live AWS Step Functions in the dev account. Each pipeline under `pipelines//tests/integration/` follows the same pattern: + +```bash +# 1. Upload golden dataset (once per pipeline) +python3 pipelines//tests/integration/create_golden_dataset_synthetic.py + +# 2. Run the tests +pytest pipelines//tests/integration/ -v --timeout= +# Typical timeouts: 3600–7200 seconds depending on pipeline +``` + +Standard test classes (follow this pattern when adding new pipelines): +- `TestSmoke` — `SUCCEEDED` status, `_SUCCESS` marker in S3, non-empty output +- `TestContent` — schema validation, no-null checks, row count assertions +- `TestSchema` — column names and types +- `TestHistory` — Step Function execution history: expected states entered + +### Deploying via GitHub Actions + +**Always use `--ref ` when deploying from a feature branch.** Without it, the workflow deploys from `main`. + +```bash +# Deploy a Step Function to an environment +gh workflow run step_function.yml \ + --field environment=integration \ + --field step_function= \ + --ref feature/your-branch-name + +# Check status +gh run list --workflow=step_function.yml --limit=5 +``` + +**Environments:** `dev`, `dev2`–`dev7`, `integration`, `qa`, `prod` + +### Python Lambda Development + +Each Lambda lives in `python_lambdas//`. Tests use moto for S3/SSM mocking. + +```bash +cd python_lambdas/ +python -m pytest tests/ -v +``` + +### Terraform / Terragrunt + +Infrastructure configs at `terraform/pipelines///terragrunt.hcl`. Deploy via GitHub Actions — **do not run `terragrunt apply` locally against prod**. + +## Pipeline Catalogue + +| Pipeline | EMR Version | Key Jira | +|---|---|---| +| sovrn-weekly | EMR 7.12.0 (Spark 3) | CDP-118269 | +| damlam-preparation | EMR 7.12.0 (Spark 3) | CDP-118269 | +| experian-data-processing | EMR 7.x | CDP-118890 | +| geo-location | EMR 6.x | CDP-118946 | +| l2-processing | EMR 6.x | CDP-118512 | +| thirdparty-enrichment | EMR 6.x | — | +| total-sketch-first-party | EMR 6.15.0 | CDP-118989 | +| segment-aggregator | EMR 6.15.0 | CDP-118989 | +| marketops-overlap | EMR 6.x | CDP-118989 | +| idsync-overlap | EMR 6.x | CDP-118989 | +| tagav-prep | EMR 6.x | CDP-118989 | +| topic-tag-metrics | EMR 6.x | CDP-118967 | +| behavior-stitch | EMR 6.x | — | + +## EMR Version Migration (CDP-118269) + +Migrating pipelines from `emr-5.34.0` to `emr-7.12.0` (Spark 3). For each migrated pipeline: + +1. Update `ReleaseLabel` in `config/emr.json` and `config/emr-on-demand.json` +2. Update the JAR filename in `config/prod/params.json`: `*-latest.jar` → `*-spark3-latest.jar` +3. Apply same changes to `config/integration/` files +4. Run the integration step function and confirm `SUCCEEDED` + +## check-source-freshness Lambda + +Supports multiple detection strategies (configured per pipeline in JSON): + +| Strategy | Use Case | +|---|---| +| `timestamp_compare` | Compare S3 `LastModified` timestamps | +| `latest_directory_vs_ssm` | Latest YYYYMMDD subdirectory vs SSM last-run date | +| `latest_file_prefix_vs_ssm` | Latest date in flat filenames vs SSM (e.g. Experian `YYYYMM_*.gz`) | + +When `has_new_data: false`, the pipeline short-circuits without launching an EMR cluster. + +## Integration Test Pattern + +Each new pipeline integration test suite MUST include: + +1. **`config/integration/emr.json`** — small on-demand cluster, `AutoTerminationPolicy.IdleTimeout: 1800` +2. **`config/integration/params.json`** — Test IAM roles (`CoreDataPipelineEC2ResourceRoleTest` / `CoreDataPipelineRoleTest`), `Environment: INTEGRATION`, fixed output date `20260101` +3. **`events/integration/event.json`** — `ExpiryTimeout` sized to pipeline (e.g. 3600–7200), empty preconditions +4. **`terraform/pipelines//integration/terragrunt.hcl`** — `schedule_expression = null` +5. **`tests/__init__.py`** and **`tests/integration/__init__.py`** — package markers for pytest discovery +6. **`create_golden_dataset_synthetic.py`** — generates synthetic parquet input, uploads to `resonate-core-datasets-dev` +7. **`conftest.py`** — `autouse=True` golden-data upload, SFN execution fixture with timeout, execution history fixture +8. **`test_.py`** — standard smoke/content/schema/history test classes + +## Dynamic Dates Lambda + +Resolves tokens in event JSON `configurations` values: + +- `` — last date-named S3 directory with `_SUCCESS` marker +- `` — flat-file mode: last date embedded in filename, gated by a sentinel file suffix +- `<@refName>` — reuse a captured date from a previous `` field +- `` / `` — date arithmetic + +## QA Environments + +`qa` state machines (e.g. `geo-location-qa`) share source data with prod but write to `*-qa` buckets. Cron is disabled — manual trigger only. Deploy via `step_function.yml` with `environment: qa`. + +## Key Constraints + +- **Never force-push to `main`** +- **Never run `terragrunt apply` against prod locally** +- **Specify `--ref` when triggering GHA workflows from a feature branch** +- Branch naming: `feature/{JIRA-ID}-{kebab-slug}` (e.g. `feature/CDP-118269-sovrn-weekly-emr7`) +- Fusion-behavior-preprocess pipeline was removed (PR #734) — do not re-add it