From c85ca7da885579cf378a4f58d5e8aea2dc2e1aaf Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 01:56:49 +0000 Subject: [PATCH 1/2] docs: add CLAUDE.md proposals for 5 active Resonate repos Generated from review of recent merged PRs (SayaliPat, shrivastavakapil2000, JoeVsVolcano, mike-brant, nathan-resonate) on: - step-function-workflow-orchestrator (EMR 7 migration, integration tests, pipeline removal) - identity-graph (11 person identity graph Scala jobs, ExperianDataProcessor) - dos-data-pipeline (IP-inferred district fallback, district_source provenance, ToBitmap gating) - batch-expression-modeling (updated: batch-stitch throttle fix, formatter path layout) - batch-audience-delivery-syndication (OpenX extension fix, PagerDuty alerting) https://claude.ai/code/session_01DC8F2D1FYKS5qiBsKCRMmk --- .../CLAUDE.md | 94 ++++ .../batch-expression-modeling/CLAUDE.md | 524 ++++++++++++++++++ .../dos-data-pipeline/CLAUDE.md | 128 +++++ claude-md-proposals/identity-graph/CLAUDE.md | 136 +++++ .../CLAUDE.md | 141 +++++ 5 files changed, 1023 insertions(+) create mode 100644 claude-md-proposals/batch-audience-delivery-syndication/CLAUDE.md create mode 100644 claude-md-proposals/batch-expression-modeling/CLAUDE.md create mode 100644 claude-md-proposals/dos-data-pipeline/CLAUDE.md create mode 100644 claude-md-proposals/identity-graph/CLAUDE.md create mode 100644 claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md diff --git a/claude-md-proposals/batch-audience-delivery-syndication/CLAUDE.md b/claude-md-proposals/batch-audience-delivery-syndication/CLAUDE.md new file mode 100644 index 0000000..6c28ee5 --- /dev/null +++ b/claude-md-proposals/batch-audience-delivery-syndication/CLAUDE.md @@ -0,0 +1,94 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **Batch Audience Delivery Syndication** workflows — Python Lambda functions and AWS Step Function state machines that handle syndicated delivery of audience data to external partners (OpenX and Viant). + +## Repository Structure + +``` +├── workflows/ +│ ├── lambdas/ +│ │ ├── openx-publish-data-files/ # Renames + delivers CSV files to OpenX S3 bucket +│ │ │ ├── openx_publish_data_files.py +│ │ │ └── test/ +│ │ ├── viant-publish-files/ # Similar delivery for Viant +│ │ └── ... +│ └── step-functions/ +│ ├── openx-syndication-workflow/ # ASL JSON for OpenX delivery state machine +│ └── viant-syndication-workflow/ # ASL JSON for Viant delivery state machine +├── terraform/ # Terragrunt infrastructure configs +└── .github/workflows/ # GitHub Actions deployment workflows +``` + +## Common Development Tasks + +### Running Lambda Tests + +```bash +# openx-publish-data-files tests +cd workflows/lambdas/openx-publish-data-files +python -m pytest test/ -v + +# viant-publish-files tests +cd workflows/lambdas/viant-publish-files +python -m pytest test/ -v +``` + +### Deploying + +Lambda and Step Function deployments are managed via GitHub Actions. Always specify `--ref` when deploying from a feature branch: + +```bash +# Deploy Lambdas +gh workflow run .yml -f environment=dev --ref + +# Deploy Step Functions +gh workflow run .yml -f environment=dev --ref +``` + +## Key Concepts + +### openx-publish-data-files Lambda + +Lists CSV files from S3 (filtered to `.csv.gz`), renames them to the expected OpenX naming convention (`AM_ResonateDataAlliance___Data_.csv.gz`), and copies to the OpenX destination bucket. + +**File extension handling (post-CDP-118857):** +- Spark's `partitionBy("method")` changed the codec suffix separator from `-c000.csv.gz` to `.c000.csv.gz` +- The extension is now **hardcoded** to `.csv.gz` (not parsed from the source filename) +- `PART_PATTERN` matches both `-c000` and `.c000` separators for part number extraction +- Do NOT revert to dynamic extension parsing via `split('.', 1)` — this broke OpenX delivery (CDP-118955) + +### Source Prefix Path Layout (post-CDP-118857) + +The batch-expression-modeling formatter flipped its output partition order: +- **Old**: `/date=*/method=av/vendor=*/akey=*/` +- **New**: `/date=*/vendor=*/method=*/akey=*/` + +The `source_prefix` concatenation in both syndication workflows was updated to match the new layout (CDP-118937). `method=av` remains hardcoded since these state machines handle AV-only delivery. + +### PagerDuty Alerting (CDP-118463) + +Both `openx-syndication-workflow` and `viant-syndication-workflow` send PagerDuty alerts on failure. The EventBridge rule uses a single `DetailType` for compatibility. Do not add multiple `DetailType` values. + +### Delivery Flow + +``` +formatter output (S3) → list CSV files → rename → copy to partner bucket +``` + +The formatter output uses the new path layout (see above). The `list_csv_files` step already filters to `.csv.gz`, so extension logic downstream only needs to handle renaming, not filtering. + +## Recent Changes (as of 2026-06) + +- **CDP-118955 (PR #46)**: Hardcoded `.csv.gz` extension in `openx-publish-data-files` — fixed silent delivery failure after Spark `partitionBy` changed codec suffix separator +- **CDP-118937 (PR #42)**: Swapped `source_prefix` concatenation order to match new formatter path layout (`vendor=*/method=*/` instead of `method=av/vendor=*/`) +- **CDP-118463 (PR #40)**: Added PagerDuty failure alerting to OpenX and Viant syndication workflows + +## Testing Guidelines + +- Parametrize tests for **both** the old (`-c000.csv.gz`) and new (`.c000.csv.gz`) source filename suffixes — the `PART_PATTERN` regex must handle both +- When adding new syndication partners, follow the `openx-publish-data-files` pattern: hardcode the output extension, use the new vendor/method partition order +- Ensure PagerDuty `DetailType` uses a single string value in EventBridge rules diff --git a/claude-md-proposals/batch-expression-modeling/CLAUDE.md b/claude-md-proposals/batch-expression-modeling/CLAUDE.md new file mode 100644 index 0000000..bc7c6df --- /dev/null +++ b/claude-md-proposals/batch-expression-modeling/CLAUDE.md @@ -0,0 +1,524 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **Batch Expression Modeling (BEM)** system for Resonate's audience delivery platform. It evaluates audience expressions against user behavioral data at scale, processes the results through identity stitching, and formats the output for various downstream vendors (Meta, TikTok, TheTradeDesk, etc.). + +### High-Level Architecture + +The system consists of three main components orchestrated by AWS Step Functions: + +1. **BEM (Batch Expression Modeling)** - Scala/Spark job that evaluates ~350 audience expressions against 2TB+ of SuperTagAv data +2. **Stitch** - Scala/Spark job that joins evaluated expressions with vendor identity tables (cookies, HEMs, device IDs) +3. **Formatter** - Scala/Spark job that formats stitched data for specific vendor requirements + +Each component runs on AWS EMR clusters and is configured/orchestrated by Lambda functions and Step Functions state machines. + +## Repository Structure + +``` +├── src/main/scala/com/resonate/ +│ ├── bem/ # Batch Expression Modeling (evaluates expressions against SuperTagAv) +│ ├── stitch/ # Identity stitching (joins RCID with vendor IDs) +│ ├── delivery/ # Delivery formatting and vendor-specific logic +│ └── utils/ # Shared utilities +├── workflows/ +│ ├── lambdas/ # Lambda functions for pipeline configuration +│ │ ├── batch-audience-delivery-processing-config/ # Main config lambda +│ │ ├── batch-vendor-stitch/ # Stitch config lambda +│ │ ├── bem-gen-job-parameters/ # BEM job parameters +│ │ └── ... +│ └── step-functions/ # Step function state machines (ASL JSON) +│ ├── batch-audience-delivery-processing/ # Main pipeline +│ ├── batch-audience-delivery-daily-pipeline/ # Daily orchestrator +│ ├── bem-pipeline/ # BEM execution +│ ├── batch-stitch-pipeline/ # Stitch execution +│ └── batch-delivery-formatter-pipeline/ # Formatter execution +├── terraform/ # Infrastructure as code (one directory per Lambda/Step Function) +├── integration-tests/ # End-to-end integration tests +└── build.sbt # SBT build configuration +``` + +## Common Development Tasks + +### Scala/Spark Development + +**Build and test:** +```bash +# Compile Scala code +./compile.sh # or: sbt compile + +# Run tests +sbt test + +# Run specific test +sbt "testOnly com.resonate.bem.BatchExpressionModelJobTest" + +# Package JAR for EMR deployment +./package.sh # or: sbt assembly +# Output: target/batch-expression-modeling.jar +``` + +**AWS authentication:** +```bash +# Required before uploading JARs to S3 or accessing AWS resources +aws sso login +``` + +### Python Lambda Development + +**Lambda functions are in `workflows/lambdas/`**. Each has its own `requirements.txt`. + +**Run tests for a Lambda:** +```bash +cd workflows/lambdas/ +python -m pytest tests/ +``` + +**Common Lambda functions:** +- `batch-audience-delivery-processing-config` - Generates pipeline configuration from delivery schedule +- `bem-gen-job-parameters` - Generates EMR step parameters for BEM job +- `batch-vendor-stitch` - Generates stitch configuration per vendor + +### Integration Tests + +**Run full integration test:** +```bash +cd integration-tests + +# First time: install dependencies +pip install -r requirements.txt + +# Run setup + execution + validation +bash run_full_integration_test.sh +``` + +The integration test: +1. Sets up test data in S3 with datetime-based paths (e.g., `integration/data/20250914_143022/`) +2. Triggers the `aud-delivery-proc-integration` step function +3. Polls for completion and validates SUCCESS status + +**Cost:** ~$0.25-0.30 per run, ~15-25 minutes with minimal test data. + +### GitHub Actions Workflows + +**Workflow files (in `.github/workflows/`):** + +| Workflow File | Description | +|--------------|-------------| +| `bem-jar.yml` | Builds and publishes the Scala JAR to S3 | +| `bem-lambdas.yml` | Deploys Lambda functions | +| `bem-step-functions.yml` | Deploys Step Functions | +| `bem-unit-tests.yml` | Runs Scala unit tests | + +**Deploy to an environment:** + +⚠️ **CRITICAL: Always specify `--ref` when deploying from a feature branch!** + +Without `--ref`, `gh workflow run` deploys from the **default branch (main)**, NOT your current branch. This is a common mistake that results in deploying old code. + +```bash +# Deploy JAR (required input: environment) +# JAR goes to qa (integration environment uses qa JAR) +gh workflow run bem-jar.yml -f environment=qa --ref + +# Deploy Lambdas (required input: environment) +gh workflow run bem-lambdas.yml -f environment=integration --ref + +# Deploy Step Functions (required inputs: environment, deployment_type) +gh workflow run bem-step-functions.yml -f environment=integration -f deployment_type=All --ref +``` + +**Example deploying all components from a feature branch:** +```bash +BRANCH="feature/CDP-123456-my-feature" +gh workflow run bem-jar.yml -f environment=qa --ref $BRANCH +gh workflow run bem-lambdas.yml -f environment=integration --ref $BRANCH +gh workflow run bem-step-functions.yml -f environment=integration -f deployment_type=All --ref $BRANCH +``` + +**Step Functions deployment_type options:** +- `All` - Deploy all step functions +- `BEM Pipeline Step Function` +- `Batch Stitch Pipeline` +- `Batch Delivery Formatter Pipeline` +- `Audience Delivery Pipeline` +- `Audience Delivery Custom Delivery Pipeline` +- `Audience Delivery Processing Pipeline` +- `Audience Delivery Daily Pipeline` + +**Environment values:** +- `integration` - Integration testing environment (uses QA JAR) +- `dev`, `dev2`-`dev7` - Development environments +- `nonprod` - QA/staging environment +- `prod` - Production environment + +**Check deployment status:** +```bash +# List recent runs for a workflow +gh run list --workflow=bem-jar.yml --limit=3 +gh run list --workflow=bem-lambdas.yml --limit=3 +gh run list --workflow=bem-step-functions.yml --limit=3 + +# Watch a specific run +gh run watch +``` + +**Note:** The integration environment uses the QA JAR. When testing changes in integration: +1. Deploy JAR to `qa` +2. Deploy lambdas to `integration` +3. Deploy step functions to `integration` + +### Working with Step Functions + +Step functions are defined in `workflows/step-functions/*/statemachine/*.asl.json` and use **JSONata query language** (not JSONPath). + +**Key step functions:** +- `batch-audience-delivery-daily-pipeline` - Entry point triggered daily by EventBridge +- `batch-audience-delivery-processing` - Main pipeline orchestrating BEM → Stitch → Formatter +- `bem-pipeline` - EMR cluster creation → BEM job execution → cluster termination +- `batch-stitch-pipeline` - Parallel execution of stitch jobs per vendor +- `batch-delivery-formatter-pipeline` - Formatting for vendor-specific requirements + +## Key Concepts + +### Expression Evaluation (BEM) + +The BEM job (`BatchExpressionModelJob.scala`) evaluates audience expressions against SuperTagAv data: + +1. **Input:** + - `surveyexpressions.csv` - List of expressions to evaluate (e.g., `NOT (E999999998)`) + - SuperTagAv parquet data - User behavioral data with attribute bitmaps (evmap, avmap, cvmap) + - Models CSV - Expression evaluation models + +2. **Processing:** + - Reads SuperTagAv data (~2TB) with optimized schema (only needed bitmap columns) + - Evaluates each expression against each RCID's attribute bitmaps + - Uses `AudienceEvaluator` from `das-expression` library + - Outputs matches partitioned by `exprHash` + +3. **Output:** Parquet files with schema `(rcid, exprHash, score)` partitioned by `exprHash=` + +### Identity Stitching + +The Stitch job (`TableStitch.scala`) joins BEM output with vendor identity tables: + +1. **Input:** + - BEM cache (evaluated expressions with RCIDs) + - Vendor stitch tables (e.g., `tmid`, `hemsha2`, `viant_id` columns) + - Audience metadata (expression hash to audience ID mappings) + +2. **Processing:** + - Joins BEM cache with stitch table on `rcid` + - Maps `exprHash` to `audienceIds` and explodes + - Filters empty vendor keys + +3. **Output:** `(akey, vkey, score)` where `akey` = audience key, `vkey` = vendor key (e.g., cookie, email hash) + +### Formatter Output Path Layout (post-CDP-118857) + +The formatter uses `.partitionBy("method")` which changed the Spark output codec suffix separator: +- **Old**: `/date=*/method=av/vendor=*/akey=*/` with `-c000.csv.gz` suffix +- **New**: `/date=*/vendor=*/method=*/akey=*/` with `.c000.csv.gz` suffix + +Consumer ASLs (`batch-audience-delivery-syndication`, `batch-expression-modeling`) must use the **new** vendor/method partition order. The `method=av` part stays hardcoded in syndication-only state machines. + +### Batch Stitch Throttle Mitigation (CDP-118972) + +The `batch-stitch-pipeline` Map state staggers `AddJobFlowSteps` submissions by Map iteration index: +- `ItemSelector` passes `Map.Item.Index` through +- A `Wait` state before each `EMR Add Step` waits `Index` seconds (iteration 0 = immediate, 1 = 1s, 2 = 2s, etc.) +- This prevents `AmazonElasticMapReduceException: ThrottlingException` when submitting many steps concurrently +- The old `EMR.ThrottlingException` retry was **removed** — that error name was wrong; the actual error is `EMR.AmazonElasticMapReduceException` + +### Delivery Configuration + +The `batch-audience-delivery-processing-config` Lambda generates pipeline configuration: + +- Queries RTP database for delivery schedules (via `RTP_PSQL_CONNECTION_STRING`) +- Determines which vendors need delivery based on schedule windows +- Generates S3 paths for tagav data, cookiejar, expressions, and output locations +- Sets EMR cluster configurations (instance types, counts, etc.) + +**Important fields:** +- `delivery_type` - "daily", "training", "custom" +- `tagav_date_folder` - Date folder for SuperTagAv input (format: YYYYMMDD) +- `cookie_jar_date` - Date folder for cookiejar data +- `start_time_of_delivery` / `end_time_of_delivery` - Time window for vendor deliveries (epoch milliseconds) + +## AWS Step Functions: Converting JSONPath to JSONata + +### Top-Level Changes + +Add `"QueryLanguage": "JSONata"` at the state machine or state level. + +### Field Replacements + +JSONPath's five fields are replaced with two in JSONata: +- **Arguments** - Replaces `Parameters` for sending data to integrated actions +- **Output** - Replaces `ResultPath`, `ResultSelector`, and `OutputPath` for transforming state output + +### JSONPath Expression Syntax + +- **JSONPath**: `"field.$": "$.path.to.value"` +- **JSONata**: `"field": "{% $states.input.path.to.value %}"` + +Remove the `.$` suffix and wrap expressions in `{% %}` delimiters. + +### ResultPath Conversions + +**Merge result into input at a path:** +```json +// JSONPath +"ResultPath": "$.ClusterCreationResult" + +// JSONata +"Output": "{% $merge([$states.input, {'ClusterCreationResult': $states.result}]) %}" +``` + +**Discard result, pass through input only:** +```json +// JSONPath +"ResultPath": null + +// JSONata +// Simply omit the Output field - input passes through by default +``` + +**Replace entire input with result (default):** +```json +// JSONPath +"ResultPath": "$" // or omit ResultPath + +// JSONata +// Omit Output field, or use: +"Output": "{% $states.result %}" +``` + +### OutputPath Conversions + +**Extract specific field from result:** +```json +// JSONPath +"OutputPath": "$.Payload" + +// JSONata +"Output": "{% $states.result.Payload %}" +``` + +### Catch Block Error Handling + +**Merge error into input:** +```json +// JSONPath +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "ResultPath": "$.Error", + "Next": "HandleError" + } +] + +// JSONata +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Output": "{% $merge([$states.input, {'Error': $states.errorOutput}]) %}", + "Next": "HandleError" + } +] +``` + +Note: `$states.errorOutput` is only available in Catch blocks. + +### Map State Conversions + +**ItemsPath:** +```json +// JSONPath +"ItemsPath": "$.step_configs" + +// JSONata +"Items": "{% $states.input.step_configs %}" +``` + +**ItemSelector:** +```json +// JSONPath +"ItemSelector": { + "step_config.$": "$$.Map.Item.Value", + "clusterId.$": "$.ClusterCreationResult.ClusterId" +} + +// JSONata +"ItemSelector": { + "step_config": "{% $states.context.Map.Item.Value %}", + "clusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" +} +``` + +**IMPORTANT:** Use `$states.context.Map.Item.Value` to access the current item in a Map state. There is **NO** `$states.item` variable - this is a common mistake that will cause "Field '$states.item' does not exist" errors. + +**⚠️ CRITICAL: Map States Must Preserve Input** + +By default, a Map state outputs an **array of results** from processing each item, which **loses the original input fields**. If subsequent states need access to fields from the original input (like a ClusterId from cluster creation), you MUST add an Output field to preserve the input. + +```json +// WRONG - This loses ClusterCreationResult from input +"Step Map": { + "Type": "Map", + "ItemSelector": { + "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" + }, + "Items": "{% $states.input.step_configs %}", + "Next": "TerminateCluster" // ❌ Will fail - ClusterCreationResult is lost! +} + +// CORRECT - Preserve input for next state +"Step Map": { + "Type": "Map", + "ItemSelector": { + "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" + }, + "Items": "{% $states.input.step_configs %}", + "Output": "{% $states.input %}", // ✅ Preserves all input fields + "Next": "TerminateCluster" +} +``` + +**Common Error:** +``` +The JSONata expression '$states.input.ClusterCreationResult.ClusterId' specified for the field 'Arguments/ClusterId' returned nothing (undefined). +``` + +This error means the Map state didn't preserve the input, so subsequent states can't access fields that were in the original input. + +### Array Construction + +**Intrinsic function to array literal:** +```json +// JSONPath +"Args.$": "States.Array('cmd', '--flag', $.input.value, '--opt', $.input.other)" + +// JSONata - Use regular JSON array with individual JSONata expressions +"Args": [ + "cmd", + "--flag", + "{% $states.input.value %}", + "--opt", + "{% $states.input.other %}" +] +``` + +### Reserved Variables in JSONata + +- `$states.input` - Original input to the current state +- `$states.result` - Result from API/sub-workflow (Task, Parallel, Map states) +- `$states.errorOutput` - Error output (only in Catch blocks) +- `$states.context` - Execution metadata (StartTime, task token, Map.Item.Value, etc.) + - `$states.context.Map.Item.Value` - Current item in Map state iteration + - `$states.context.Map.Item.Index` - Current iteration index in Map state + +### ⚠️ Common Pitfall: $states.item Does NOT Exist + +**Error:** `Field '$states.item' does not exist` + +**WRONG:** +```json +"ItemSelector": { + "item": "{% $states.item %}" // ❌ This will fail! +} +``` + +**CORRECT:** +```json +"ItemSelector": { + "item": "{% $states.context.Map.Item.Value %}" // ✅ Use this instead +} +``` + +Many online examples and AI suggestions incorrectly reference `$states.item`, but this variable does not exist in AWS Step Functions JSONata. Always use `$states.context.Map.Item.Value` to access the current Map iteration item. + +### Key Differences from JSONPath + +1. **No `.$` suffix** - Regular field names with JSONata expressions in `{% %}` +2. **No Path fields** - InputPath, ResultPath, OutputPath are replaced by Arguments and Output +3. **Assign vs Output** - `Assign` creates variables accessible in all future states; `Output` only affects the immediate next state +4. **Parallel processing** - Assign and Output are processed in parallel; variable assignments don't affect Output + +### Common Patterns + +**Pass through input unchanged:** +- Omit the Output field entirely, OR explicitly use `"Output": "{% $states.input %}"` + +**IMPORTANT:** Any Task state that needs to pass data to subsequent states should preserve the input explicitly. Without an Output field, the Task's result replaces the entire input, losing all previous data. + +**Common scenarios requiring input preservation:** + +1. **After Catch blocks** - When error info was merged into input +2. **Cleanup/termination tasks** - When subsequent states need original context (e.g., Lambda functions that publish metrics need delivery_config) +3. **Between pipeline stages** - When chaining multiple operations that all need access to the original request + +```json +// Example 1: Catch block merges error into input +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Output": "{% $merge([$states.input, {'Error': $states.errorOutput}]) %}", + "Next": "CleanupTask" + } +] + +// The CleanupTask MUST preserve input if Error is needed later +"CleanupTask": { + "Type": "Task", + "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster", + "Arguments": { "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" }, + "Output": "{% $states.input %}", // ✅ Preserves Error field + "Next": "NotifyFailure" +} + +// Example 2: EMR termination before Lambda that needs delivery config +"EMR TerminateCluster Success": { + "Type": "Task", + "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster", + "Arguments": { "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" }, + "Output": "{% $states.input %}", // ✅ Lambda needs delivery_config from input + "Next": "Publish Metrics" +} +``` + +**Add result as new field while preserving input:** +```jsonata +"Output": "{% $merge([$states.input, {'resultField': $states.result}]) %}" +``` + +**Transform and extract specific data:** +```jsonata +"Output": { + "field1": "{% $states.input.someValue %}", + "field2": "{% $states.result.someOtherValue %}" +} +``` + +**Create variables for use in later states:** +```jsonata +"Assign": { + "myVariable": "{% $states.input.someValue %}" +} +// Later states can reference {% $myVariable %} +``` + +### Reference Documentation + +- [AWS Step Functions - Transforming data with JSONata](https://docs.aws.amazon.com/step-functions/latest/dg/transforming-data.html) +- [AWS Step Functions - Passing data between states with variables](https://docs.aws.amazon.com/step-functions/latest/dg/workflow-variables.html) + +## Recent Changes (as of 2026-05) + +- **CDP-118972 (PR #263)**: Stagger batch-stitch `AddJobFlowSteps` submissions by Map iteration index to prevent EMR throttling; removed incorrect `EMR.ThrottlingException` retry +- **CDP-118937 (PR #256)**: Swapped `source_prefix` concatenation order in `batch-audience-delivery-processing` to match new formatter path layout (`vendor=*/method=*/`) +- **CDP-118857 (PR #253)**: Added batch payload formatters with `partitionBy("method")` — changed Spark output path layout and codec suffix separator +- **DE-15134**: Opted `vendor-delivery-metrics` Lambda into failure alerts diff --git a/claude-md-proposals/dos-data-pipeline/CLAUDE.md b/claude-md-proposals/dos-data-pipeline/CLAUDE.md new file mode 100644 index 0000000..a69cda7 --- /dev/null +++ b/claude-md-proposals/dos-data-pipeline/CLAUDE.md @@ -0,0 +1,128 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **DOS Data Pipeline** — Scala/Spark jobs that process data for the Data Operations/Science (DOS) team. The primary pipelines are: + +- **GeoLocationDaily** — Enriches daily RcidEnriched with geo-location data (congressional districts, state legislative districts) using L2 data and IP-inferred fallback +- **GeoLocationFull** — Maintains the rolling geo-location full dataset +- **GeoLocationFullBackfill** — Backfills geo-location data across a date range using daily-on-multi-day-pixels approach +- **ToBitmap** — Converts geo-location data to bitmap format, gating the L2-confirmed marker bit on `district_source` + +## Tech Stack + +- **Scala** 2.11.8 +- **Spark** 2.4.3 (deployed on EMR via `dos-data-pipeline-latest.jar`) +- **SBT** build tool +- **GitHub Actions CI** with SonarCloud integration + +## Repository Structure + +``` +├── src/main/scala/com/resonate/ +│ ├── geolocation/ +│ │ ├── GeoLocationDaily # Daily enrichment job +│ │ ├── GeoLocationFull # Rolling full dataset maintenance +│ │ ├── GeoLocationFullBackfill # Date-range backfill (daily-on-multi-day-pixels) +│ │ ├── DistrictResolver # L2-vs-IP district coalescing + district_source derivation +│ │ └── ToBitmap # Geo → bitmap conversion (gates L2-confirmed bit) +│ └── ... +├── src/test/scala/com/resonate/ +│ └── ... # Unit test suites (60 tests) +├── build.sbt +└── README.md +``` + +## Common Development Tasks + +### Build and Test + +```bash +# Compile +sbt compile + +# Run all tests +sbt test + +# Run specific test +sbt "testOnly com.resonate.geolocation.GeoLocationDailyTest" + +# Package JAR for EMR deployment +sbt assembly + +# AWS auth (required for S3 access) +aws sso login +``` + +### Deploying + +The JAR is published to S3 and consumed by the `geo-location` Step Function in `step-function-workflow-orchestrator`. Configuration (zip→district CSVs, params.json) is deployed via Terragrunt. + +For geo-location and segment-aggregator, configuration is sourced from this repository. For other pipelines, configuration comes from services-api-configuration via the datapipeline-trigger Lambda. + +## Key Concepts + +### DistrictResolver + +Central helper for deriving district data from two sources: L2 voter file and IP-inferred (NetAcuity) zip→district mappings. + +**district_source values:** +| Value | Condition | +|-------|-----------| +| `L2_CONFIRMED` | L2 has any district AND `l2_party_confirmed = true` | +| `L2_UNCONFIRMED` | L2 has any district AND `l2_party_confirmed != true` | +| `IP_INFERRED` | No L2 district but zip mapping produced a result | +| `null` | No district data available | + +**Zip→District Mappings (4-file snapshot):** +Deployed to `s3://resonate-core-applications{-env}/dos/geo-location/configs/zip-district-mappings/`: +- `zip-congress-mapping.csv` — L2 mode-per-zip, all 51 states +- `zip-proposed-congress-mapping.csv` — 2026 redistricted file (CA/MO/NC/OH/TX/UT/VA) +- `zip-state-senate-mapping.csv` — L2 mode-per-zip +- `zip-state-house-mapping.csv` — L2 mode-per-zip + +Schema: `zip,state,`. Files version together as a snapshot passed via `myZipDistrictMappingsBasePath`. + +### ToBitmap + +Gates the L2-confirmed marker bit on `district_source = "L2_CONFIRMED"`. Rows with `L2_UNCONFIRMED` or `IP_INFERRED` are enriched with district data but do not set the confirmed bit. + +### GeoLocationFullBackfill + +Rewrote to use a **daily-on-multi-day-pixels** approach: +- Takes a date-range glob from the orchestrator (rendered `$.ParamsConfig.Rendered.myZipDistrictMappingsBasePath`) +- Always re-derives all four districts (does not take a `DistrictColumnList` arg) +- Reads existing `zip` from fullDf for IP-fallback path (no second NetAcuity lookup) + +### GeoLocation Should Run Full (Orchestrator Fix) + +The `Should Run Full` choice state in `geo_location.asl.json` checks `IsPresent` on `$.FullInputPath` before the path-equality fallback. This ensures backfill output is preserved when today's full already exists (fixes CDP-118512). + +## Recent Changes (as of 2026-05) + +- **CDP-118946 (PR #103)**: Added IP-inferred district fallback and `district_source` provenance + - New `DistrictResolver` helper centralizes L2-vs-IP coalescing + - 4-file zip→district mappings split by district type + - `GeoLocationFullBackfill` rewritten to daily-on-multi-day-pixels approach + - `GeoLocationDaily.extract` and `GeoLocationFullBackfill.fullBackfill` both use DistrictResolver + - `GeoLocationFull.expectedCols` extended with `zip` and `district_source` +- **CDP-118947 (PR #103)**: `ToBitmap` gates L2-confirmed marker bit on `district_source = "L2_CONFIRMED"` +- **CDP-118944**: Added `ToBitmap` unit tests for district_source gating + +## Testing Guidelines + +- `GeoLocationDailyTest`: test fixtures use `ZipDistrictMappings`; DistrictResolver invoked directly +- `GeoLocationFullBackfillTest`: 7 coverage cases for each `district_source` path (L2_CONFIRMED, L2_UNCONFIRMED, IP_INFERRED, null, partial-L2 coalesce, IP-only, no-match) +- `GeoLocationFullTest`: tuple arities updated for `zip` and `district_source` columns +- When adding a new district type, add entries to all 4 mapping files and update `DistrictResolver` coalescing logic +- `sort_array` must be applied to state-leg candidate arrays before hash-based pick to ensure determinism + +## Configuration + +**Configuration is deployed from this repo** (unlike other pipelines that use services-api-configuration). The `geo-location` pipeline reads its config from `s3://resonate-core-applications{-env}/dos/geo-location/configs/`. + +When modifying configuration: +1. Update files in `src/main/resources/` or the appropriate config directory +2. The deploy pipeline (in `step-function-workflow-orchestrator`) uploads them to S3 via Terragrunt diff --git a/claude-md-proposals/identity-graph/CLAUDE.md b/claude-md-proposals/identity-graph/CLAUDE.md new file mode 100644 index 0000000..c67893b --- /dev/null +++ b/claude-md-proposals/identity-graph/CLAUDE.md @@ -0,0 +1,136 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **Identity Graph** Scala/Spark pipeline — a collection of 11 Spark jobs that build Resonate's person identity graph from multiple data sources (Experian, L2, Tapad, LiveIntent, LiveRamp, Sovrn). It also contains the **ExperianDataProcessor** which converts Experian offline delivery files from gzip CSV to Parquet. + +Deployment docs: https://resonate-jira.atlassian.net/wiki/spaces/ASD/pages/3683057840/Pipeline+Scala+SBT+GitOps + +**Production JAR:** `s3://resonate-core-applications/identity-graph/jars/identity-graph-latest.jar` + +## Tech Stack + +- **Scala** 2.12.17 +- **Spark** 3.5.0 +- **SBT** build tool +- **AWS EMR** for execution + +## Repository Structure + +``` +├── src/main/scala/com/resonate/ +│ ├── identitygraph/ +│ │ ├── jobs/ # 11 Spark pipeline jobs +│ │ │ ├── ExperianPreprocessJob # Normalizes Experian ConsumerView + digital graph +│ │ │ ├── L2ConsumerPreprocessJob # Preprocesses L2 consumer tab-delimited files +│ │ │ ├── TapadIndexJob # Experian/Tapad → identifier_index +│ │ │ ├── L2IndexJob # L2 voter → identifier_index +│ │ │ ├── L2ConsumerIndexJob # L2 consumer → identifier_index +│ │ │ ├── OnboardingMatchJob # 4-phase matching (blocking, corroboration, scoring, fallback) +│ │ │ ├── OnboardingHouseholdLinksJob # Household CTV/TTD attribution via digital graph +│ │ │ ├── OnboardingPersonsJob # Creates Tapad-only person records (deprecated BL-106) +│ │ │ ├── MergeOnboardingLinksJob # BL-30 multi-provider collapse + confidence scoring +│ │ │ ├── PersonIdentityJob # Main orchestrator: persons + identifier_index +│ │ │ └── CustomerMatchJob # Ephemeral customer file matching (7 fallback tiers) +│ │ └── utils/ # Shared utilities +│ │ ├── HashUtils # SHA256-based person_id and household_id generation +│ │ ├── StagingWriter # Cache, count, write, unpersist pattern +│ │ ├── AddressNormalizer # USPS Pub 28 suffix expansion, nickname resolution +│ │ ├── IpFilter # RFC1918 + datacenter/cloud IP blocklist +│ │ └── ScoringConfig # Centralized confidence scoring, recency decay +│ └── experian/ +│ └── ExperianDataProcessor # Converts Experian gzip CSV → Parquet +├── src/test/scala/com/resonate/ +│ └── identitygraph/ # 443 unit tests in 20 test suites +├── build.sbt +└── README.md +``` + +## Common Development Tasks + +### Build and Test + +```bash +# Compile +sbt compile + +# Run all tests (443 tests across 20 suites) +sbt test + +# Run a specific test suite +sbt "testOnly com.resonate.identitygraph.jobs.PersonIdentityJobSpec" + +# Package JAR for EMR +sbt assembly +# Output: target/scala-2.12/identity-graph-assembly-*.jar + +# AWS auth (required for S3 access) +aws sso login +``` + +### JVM Memory (CI) + +The SBT build is configured with 3GB heap and 1.5GB forked test JVM. On JDK 17 the `--add-opens` flags are automatically applied via `build.sbt`. Do not change these settings without testing on CI. + +### Deploying + +Deployment follows the GitOps process described in the Confluence page above. The JAR is built and uploaded to S3, then referenced by the `identity-graph` Step Function in `step-function-workflow-orchestrator`. + +## Key Concepts + +### Person Identity Pipeline (11 Jobs) + +The pipeline processes multiple identity sources and produces `identifier_index` entries: + +| Job | Input | Purpose | +|-----|-------|---------| +| ExperianPreprocessJob | Experian CSV gzip | Normalize ConsumerView + digital graph | +| L2ConsumerPreprocessJob | L2 tab-delimited | Preprocess L2 consumer data | +| TapadIndexJob | Experian/Tapad parquet | Create identifier_index (direct + staging) | +| L2IndexJob | L2 voter | Create identifier_index; null guard on HOUSEHOLD_ID | +| L2ConsumerIndexJob | L2 consumer | Create identifier_index entries | +| OnboardingMatchJob | All sources | 4-phase match; Phase 3 uses person_id.asc tie-breaker | +| OnboardingHouseholdLinksJob | Digital graph | Household CTV/TTD attribution | +| OnboardingPersonsJob | Tapad-only | Person records (deprecated) | +| MergeOnboardingLinksJob | Onboarding links | Multi-provider collapse, BL-4/BL-107 confidence | +| PersonIdentityJob | All | Main orchestrator | +| CustomerMatchJob | Customer file | 7-tier fallback matching | + +### HashUtils + +- `personId(seed)` — deterministic SHA256-based person_id +- `householdId(seed)` — deterministic SHA256-based household_id + +Seeds are constructed from PII fields (name, address, etc.) after normalization. + +### AddressNormalizer + +- Applies USPS Publication 28 suffix expansions (e.g., `ST → STREET`) +- Resolves nickname chains iteratively (max 5 hops, e.g., `BOB → ROBERT → BOB` is cycle-safe) + +### ExperianDataProcessor + +Converts Experian offline delivery gzip CSV files to Parquet: +- Subtables: `consumerview`, `email`, `phone` +- Supports glob patterns for input paths (e.g., `Resonate.ConsumerView_*.gz`) +- Digital graph is already Parquet — no conversion needed + +## Recent Changes (as of 2026-05) + +- **PR #22**: Ported all 11 Person Identity Graph Scala jobs from resonate-research; 443 unit tests all passing + - Fixed `L2IndexJob` HOUSEHOLD_ID null guard (`filterNulls = true`) + - Fixed `OnboardingMatchJob` Phase 3 non-determinism (added `person_id.asc` tie-breaker) + - Fixed `ExperianPreprocessJob` non-deterministic aggregation (`min(id_value)` vs `first`) + - Fixed `AddressNormalizer` nickname chains (iterative lookup, max 5 hops) + - Added null guards in `MergeOnboardingLinksJob` and `PersonIdentityJob` +- **PR #21**: Added `ExperianDataProcessor` Spark app for gzip CSV → Parquet conversion + +## Testing Guidelines + +All 11 jobs have dedicated test suites. Key invariants to maintain: +- `L2IndexJob`: `filterNulls = true` for HOUSEHOLD_ID to prevent phantom household collapse +- `OnboardingMatchJob` Phase 3: always include `person_id.asc` as a tie-breaker in `selectBestMatch` window +- `ExperianPreprocessJob`: use `min(id_value)` not `first()` for deterministic aggregation +- `MergeOnboardingLinksJob` / `PersonIdentityJob`: filter null `identifier_value` before groupBy diff --git a/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md b/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md new file mode 100644 index 0000000..f58c965 --- /dev/null +++ b/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md @@ -0,0 +1,141 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This is the **step-function-workflow-orchestrator** monorepo — the central deployment and orchestration hub for the X-Men data platform. It manages 50+ AWS Step Functions pipelines that run daily/weekly batch data workloads including identity graph construction, geo-location, audience segmentation, BEM delivery, and more. + +## Repository Structure + +``` +├── pipelines/ # One directory per pipeline +│ └── / +│ ├── config/ # EMR configs (emr.json, params.json, steps.json) +│ │ ├── prod/ +│ │ ├── dev/ +│ │ ├── dev6/ +│ │ └── integration/ +│ ├── events/ # Step Function input events +│ │ ├── prod/event.json +│ │ └── integration/event.json +│ ├── statemachine/ # ASL JSON state machine definition +│ └── tests/ +│ └── integration/ # pytest integration test suite +│ ├── conftest.py +│ ├── create_golden_dataset_synthetic.py +│ └── test_.py +├── terraform/pipelines/ # Terragrunt configs (one per pipeline × env) +│ └── / +│ ├── dev/terragrunt.hcl +│ ├── dev6/terragrunt.hcl +│ ├── integration/terragrunt.hcl +│ └── prod/terragrunt.hcl +├── python_lambdas/ # Shared Python Lambda functions +│ └── check-source-freshness/ # Config-driven S3/SSM freshness checker +└── .github/workflows/ + └── step_function.yml # GitHub Actions deploy workflow +``` + +## Common Development Tasks + +### Deploying a Pipeline + +Use GitHub Actions (`X-Men Step Function Workflow`) to deploy any pipeline to any environment: + +```bash +# Via GitHub CLI (always specify --ref when on a feature branch!) +gh workflow run step_function.yml \ + -f environment=dev \ + -f step_function=geo-location \ + --ref + +# Check deployment status +gh run list --workflow=step_function.yml --limit=5 +gh run watch +``` + +**Environments:** `dev`, `dev2`–`dev9`, `integration`, `qa`, `prod` + +**Available pipelines (selection):** +`behavior-stitch`, `damlam-preparation`, `experian-data-processing`, `geo-location`, `identity-graph`, `idsync-overlap`, `l2-processing`, `liveramp-stitch/thetradedesk`, `marketops-overlap`, `segment-adobe`, `segment-aggregator`, `segment-onboard`, `segment-transformer`, `sovrn-overlap`, `sovrn-weekly`, `tagav-prep`, `thirdparty-enrichment`, `total-sketch-first-party`, `vendor-stitch` + +### Adding or Updating a Pipeline Config + +Each pipeline's config in `pipelines//config//` is deployed to S3 via Terragrunt. To add a new environment: + +1. Copy the `dev/` config to `/`, adjust paths/IAM roles. +2. Add a corresponding `terraform/pipelines///terragrunt.hcl`. +3. Run the GHA `step_function.yml` workflow against the new env. + +### Running Integration Tests + +```bash +# Install test dependencies +pip install boto3 pytest + +# Generate golden dataset +python3 pipelines//tests/integration/create_golden_dataset_synthetic.py + +# Run the tests (requires dev AWS credentials) +pytest pipelines//tests/integration/ -v --timeout=3600 +``` + +Integration tests follow this pattern: +1. Upload golden dataset to `resonate-core-datasets-dev`. +2. Start the `-integration` Step Function. +3. Poll until terminal state; assert SUCCEEDED or verify S3 output. + +### Modifying the CheckSourceFreshness Lambda + +The `python_lambdas/check-source-freshness/` Lambda supports multiple detection strategies: +- `timestamp_compare` — compares S3 LastModified timestamps +- `latest_directory_vs_ssm` — compares YYYYMMDD directory names against SSM last-run date +- `latest_file_prefix_vs_ssm` — lists files, extracts YYYYMM from names, compares to SSM + +Adding a new source only requires a JSON config entry — no code change. + +```bash +# Run Lambda unit tests +cd python_lambdas/check-source-freshness +pip install -r requirements.txt +python -m pytest tests/ -v +``` + +## Key Concepts + +### EMR Versions + +Pipelines are migrating from **EMR 5.34.0 → EMR 7.12.0** (CDP-118269). When touching a pipeline's `emr.json`: +- **EMR 5.x** uses `core-data-pipeline-latest.jar` +- **EMR 7.x / Spark 3** uses `core-data-pipeline-spark3-latest.jar` + +### Integration Test Pattern + +All integration tests follow the sovrn-overlap pattern: +- `conftest.py` provides session-scoped fixtures: SFN client, execution trigger, entered-states history +- Tests verify: step entered, S3 output exists, `_SUCCESS` marker present, schema correct +- InfluxDB metric steps are expected to fail in integration; tests accept SUCCEEDED or FAILED +- Auto-termination is set to 1800s idle timeout + +### Terraform / Terragrunt + +Each `terragrunt.hcl` includes: +- `schedule_expression` — set to `null` for integration/dev (no cron), cron expression for prod +- `replacement_vars` — substitutes template variables (ARNs, bucket names) in the ASL JSON +- `configurations` — maps S3 URIs for configs uploaded from the `pipelines/` directory + +## Recent Changes (as of 2026-06) + +- **CDP-118269**: Migrated `damlam-preparation` and `sovrn-weekly` from EMR 5.34.0 → 7.12.0 / Spark 3 +- **CDP-118989 / Integration tests**: Added full integration test suites for `marketops-overlap`, `idsync-overlap`, `fusion-behavior-preprocess`, `behavior-stitch`, `segment-adobe`, `segment-onboard`, `segment-aggregator`, `liveramp-stitch/thetradedesk`, `total-sketch-first-party` +- **Removed**: `fusion-behavior-preprocess` pipeline (was dormant since 2023; caused unexpected prod run) +- **CDP-118890**: Added `experian-data-processing` pipeline with CheckSourceFreshness Lambda-based gating +- **CDP-118946**: Threaded zip→district CSV mappings through `geo-location` pipeline for IP-inferred district fallback +- **CDP-118512**: Fixed `geo-location` `Should Run Full` choice to preserve backfill output when today's full already exists + +## Pipeline Lifecycle + +1. Feature branch → PR → main auto-deploys to dev6 via GHA +2. Manual `workflow_dispatch` to `integration` → run integration tests +3. Manual `workflow_dispatch` to `prod` after review sign-off From aa5064d332e6ce21f9f924b9ee3c7cd6cb9a4b26 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 01:59:32 +0000 Subject: [PATCH 2/2] fix: add pytest-timeout to integration test install command pytest --timeout requires the pytest-timeout plugin; add it to the pip install step so the command works in a fresh environment. https://claude.ai/code/session_01DC8F2D1FYKS5qiBsKCRMmk --- .../step-function-workflow-orchestrator/CLAUDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md b/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md index f58c965..8eb4efa 100644 --- a/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md +++ b/claude-md-proposals/step-function-workflow-orchestrator/CLAUDE.md @@ -72,7 +72,7 @@ Each pipeline's config in `pipelines//config//` is deployed ```bash # Install test dependencies -pip install boto3 pytest +pip install boto3 pytest pytest-timeout # Generate golden dataset python3 pipelines//tests/integration/create_golden_dataset_synthetic.py