diff --git a/claude-md-updates/README.md b/claude-md-updates/README.md new file mode 100644 index 0000000..be11db8 --- /dev/null +++ b/claude-md-updates/README.md @@ -0,0 +1,71 @@ +# CLAUDE.md Updates + +This directory contains proposed CLAUDE.md files for 5 repositories that had active PRs merged into main (May–June 2026) by team members: SayaliPat, shrivastavakapil2000, JoeVsVolcano, mike-brant, nathan-resonate. + +## Files to Apply + +Each subdirectory contains a `CLAUDE.md` to be committed to the root of the corresponding repository: + +| Directory | Target Repository | Action | +|-----------|------------------|--------| +| `step-function-workflow-orchestrator/` | `resonate/step-function-workflow-orchestrator` | **Create** new CLAUDE.md | +| `batch-expression-modeling/` | `resonate/batch-expression-modeling` | **Replace** existing CLAUDE.md | +| `identity-graph/` | `resonate/identity-graph` | **Create** new CLAUDE.md | +| `batch-audience-delivery-syndication/` | `resonate/batch-audience-delivery-syndication` | **Create** new CLAUDE.md | +| `dos-data-pipeline/` | `resonate/dos-data-pipeline` | **Create** new CLAUDE.md | + +## How to Apply + +For each repository, create a branch and open a PR: + +```bash +# 1. Check out the target repo +cd /path/to/step-function-workflow-orchestrator +git checkout -b chore/add-claude-md + +# 2. Copy the file from this repo (resonate/.github) +# Assumes resonate/.github is cloned alongside the target repo +cp ../resonate-.github/claude-md-updates/step-function-workflow-orchestrator/CLAUDE.md ./CLAUDE.md +# Or download directly from GitHub: +# curl -o CLAUDE.md https://raw.githubusercontent.com/resonate/.github/main/claude-md-updates/step-function-workflow-orchestrator/CLAUDE.md + +git add CLAUDE.md +git commit -m "chore: add CLAUDE.md with project guidance for Claude Code" +git push -u origin chore/add-claude-md +# Then open PR via GitHub UI or: gh pr create --title "chore: add CLAUDE.md" +``` + +## What's Covered in Each File + +### step-function-workflow-orchestrator +- Pipeline inventory (12 active pipelines) +- EMR 5→7 migration notes (Spark 2→3, yarn vcore fix) +- Integration test patterns +- Dynamic dates lambda (directory mode vs flat-file sentinel mode) +- Recent changes: EMR migrations, QA envs, fusion-behavior-preprocess removal, geo district namespace fixes + +### batch-expression-modeling (UPDATE) +- All existing content preserved +- Added: formatter metrics lambda, stitch throttle protection (MaxConcurrency=2) +- Added: `delta_with_full_fallback` refresh type handling +- Added: Formatter output path layout (post CDP-118857 partition order change) + +### identity-graph +- 11 Spark pipeline jobs and their purposes +- Shared utilities (HashUtils, StagingWriter, AddressNormalizer, IpFilter, ScoringConfig) +- PRISM design overview and 6 tracks +- All jobs use scopt CLI args +- Recent changes: port from resonate-research, ExperianDataProcessor, PRISM docs + +### batch-audience-delivery-syndication +- Supported vendors (OpenX, Experian, Viant, BlockGraph) +- openx-publish-data-files: hardcoded .csv.gz extension (DO NOT revert to dynamic parsing) +- blockgraph-create-taxonomy-file: taxonomy generation, SPI=N constant +- Source path partition order (vendor=*/method=av/) + +### dos-data-pipeline +- district_source provenance (L2_CONFIRMED, L2_UNCONFIRMED, IP_INFERRED) +- IP-inferred district fallback via 4 ZIP→district CSVs +- ToBitmap gating on L2_CONFIRMED +- ZIP→district namespace requirements (L2 canonical vs floterial) +- GeoLocationFullBackfill: always re-derives all 4 districts diff --git a/claude-md-updates/batch-audience-delivery-syndication/CLAUDE.md b/claude-md-updates/batch-audience-delivery-syndication/CLAUDE.md new file mode 100644 index 0000000..dac4f8e --- /dev/null +++ b/claude-md-updates/batch-audience-delivery-syndication/CLAUDE.md @@ -0,0 +1,113 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains **Batch Audience Delivery — Syndication** workflows: the Lambda functions and Step Function state machines that publish formatted audience data files to syndicated delivery vendors (OpenX, Experian, Viant/OpenX). It works as a consumer of output from `batch-expression-modeling`'s formatter step. + +### Supported Vendors + +| Vendor | Lambda | State Machine | +|--------|--------|--------------| +| OpenX | `openx-publish-data-files` | `batch-audience-delivery-syndication` | +| Experian | `experian-publish-data-files` | `experian-syndication-workflow` | +| Viant | `viant-publish-files` | (see viant workflow) | +| BlockGraph | `blockgraph-create-taxonomy-file` | BlockGraph delivery workflow | + +## Repository Structure + +``` +├── workflows/ +│ ├── lambdas/ +│ │ ├── openx-publish-data-files/ # Renames + copies CSV.gz files to OpenX S3 +│ │ ├── blockgraph-create-taxonomy-file/ # Generates BlockGraph metadata CSV(s) +│ │ └── ... +│ └── step-functions/ # ASL state machine definitions +├── terraform/ +│ └── workflows/lambdas/ # Terraform per-lambda per-env +└── .github/workflows/ # CI/CD +``` + +## Common Development Tasks + +### Python Lambda Development + +Each Lambda has its own directory under `workflows/lambdas//`: + +```bash +# Run tests +cd workflows/lambdas/ +pytest test/ -v + +# Example +cd workflows/lambdas/openx-publish-data-files +pytest test/ -v # 6 tests +``` + +### Deploying Lambdas + +Lambda deployment is via GitHub Actions (see `.github/workflows/`). Deploy to a specific environment: + +```bash +gh workflow run .yml \ + -f environment=dev \ + --ref +``` + +### Terraform / Terragrunt + +```bash +cd terraform/workflows/lambdas// +aws sso login +terragrunt plan +terragrunt apply +``` + +## Key Lambda: openx-publish-data-files + +Copies Spark-formatted `.csv.gz` files from S3 source to OpenX destination, renaming to the pattern `resonate_syndication_{date}_Data_{N}.csv.gz`. + +**Important:** The extension is hardcoded as `.csv.gz` — do NOT revert to dynamic `split('.', 1)` extension parsing. This was fixed in CDP-118955 because the upstream `batch-expression-modeling` formatter (CDP-118857) changed Spark's codec-suffix separator from `-c000.csv.gz` to `.c000.csv.gz`, which broke dynamic parsing and silently stopped all OpenX audience refreshes. + +**Part number extraction:** Uses regex `part-(\d+)-.*\.csv` to find the part number, then increments by 1 for 1-indexed output filenames. + +## Key Lambda: blockgraph-create-taxonomy-file + +Generates BlockGraph metadata CSVs for a delivery run (CDP-118915): + +- **Initial format** (13 fields) — net-new segments not yet in BlockGraph +- **Refresh format** (8 fields) — segments already known to BlockGraph + +**Resolution:** +- Syndicated mode: queries BlockGraph ADS group to resolve audience set +- Custom mode: uses `audience_key_list` from event, filters to BlockGraph group hierarchy + +**SPI field:** Constant `N` — we deliver BGIDs (not SPI source data), so the "Created using Sensitive Personal Information" flag is `N` taxonomy-wide (Q5 resolved, CDP-118915). Env var `spi_value` can override if needed. + +**Output path:** `/batch-delivery-payload/metadata/resonate_metadata_{initial,refresh}_{ts}.csv` + +## Key Concepts + +### Source Path Layout (Post CDP-118857) + +The `batch-expression-modeling` formatter outputs data partitioned as: +``` +/date=/vendor=/method=/akey=/part-*.csv.gz +``` + +The source_prefix for syndication lambdas must concatenate `vendor=*/method=av/` (NOT the legacy `method=av/vendor=*/`). This was fixed in CDP-118937 for Experian, Yahoo, and custom delivery. + +### File Extension Warning + +Spark's `.partitionBy()` changes the codec suffix separator from `-c000.csv.gz` to `.c000.csv.gz`. Any lambda that dynamically parses file extensions from Spark output filenames will break after a `partitionBy` change. Always hardcode the expected extension (`.csv.gz`) after confirming `list_csv_files` already filters to that extension. + +### Delivery State Marker + +`blockgraph-create-taxonomy-file` reads the Delivery State marker from `state/known-segments/` to identify which PSIDs have already been delivered (refresh vs. initial routing). + +## Recent Changes (May–June 2026) + +- **openx-publish-data-files** (PR #46, CDP-118955): Hardcoded `.csv.gz` extension — fixes broken OpenX syndication after CDP-118857 Spark `partitionBy` change caused malformed filenames (last good delivery was 2026-04-26) +- **blockgraph-create-taxonomy-file** (PR #48, CDP-118915): New Lambda implementing BlockGraph taxonomy file generation — 18 unit tests, supports both syndicated and custom audience modes, SPI=N hardcoded +- **source_prefix path order** (PR #42, CDP-118937): Swapped `method=av/vendor=*` → `vendor=*/method=av/` to match new formatter output layout diff --git a/claude-md-updates/batch-expression-modeling/CLAUDE.md b/claude-md-updates/batch-expression-modeling/CLAUDE.md new file mode 100644 index 0000000..1cb8e9b --- /dev/null +++ b/claude-md-updates/batch-expression-modeling/CLAUDE.md @@ -0,0 +1,430 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **Batch Expression Modeling (BEM)** system for Resonate's audience delivery platform. It evaluates audience expressions against user behavioral data at scale, processes the results through identity stitching, and formats the output for various downstream vendors (Meta, TikTok, TheTradeDesk, OpenX, Experian, Viant, etc.). + +### High-Level Architecture + +The system consists of three main components orchestrated by AWS Step Functions: + +1. **BEM (Batch Expression Modeling)** - Scala/Spark job that evaluates ~350 audience expressions against 2TB+ of SuperTagAv data +2. **Stitch** - Scala/Spark job that joins evaluated expressions with vendor identity tables (cookies, HEMs, device IDs) +3. **Formatter** - Scala/Spark job that formats stitched data for specific vendor requirements + +Each component runs on AWS EMR clusters and is configured/orchestrated by Lambda functions and Step Functions state machines. + +## Repository Structure + +``` +├── src/main/scala/com/resonate/ +│ ├── bem/ # Batch Expression Modeling (evaluates expressions against SuperTagAv) +│ ├── stitch/ # Identity stitching (joins RCID with vendor IDs) +│ ├── delivery/ # Delivery formatting and vendor-specific logic +│ └── utils/ # Shared utilities +├── workflows/ +│ ├── lambdas/ # Lambda functions for pipeline configuration +│ │ ├── batch-audience-delivery-processing-config/ # Main config lambda +│ │ ├── batch-vendor-stitch/ # Stitch config lambda +│ │ ├── batch-delivery-formatter-publish-metrics/ # Formatter metrics lambda +│ │ ├── bem-gen-job-parameters/ # BEM job parameters +│ │ └── ... +│ └── step-functions/ # Step function state machines (ASL JSON) +│ ├── batch-audience-delivery-processing/ # Main pipeline +│ ├── batch-audience-delivery-daily-pipeline/ # Daily orchestrator +│ ├── bem-pipeline/ # BEM execution +│ ├── batch-stitch-pipeline/ # Stitch execution +│ └── batch-delivery-formatter-pipeline/ # Formatter execution +├── terraform/ # Infrastructure as code (one directory per Lambda/Step Function) +├── integration-tests/ # End-to-end integration tests +└── build.sbt # SBT build configuration +``` + +## Common Development Tasks + +### Scala/Spark Development + +**Build and test:** +```bash +# Compile Scala code +./compile.sh # or: sbt compile + +# Run tests +sbt test + +# Run specific test +sbt "testOnly com.resonate.bem.BatchExpressionModelJobTest" + +# Package JAR for EMR deployment +./package.sh # or: sbt assembly +# Output: target/batch-expression-modeling.jar +``` + +**AWS authentication:** +```bash +# Required before uploading JARs to S3 or accessing AWS resources +aws sso login +``` + +### Python Lambda Development + +**Lambda functions are in `workflows/lambdas/`**. Each has its own `requirements.txt`. + +**Run tests for a Lambda:** +```bash +cd workflows/lambdas/ +python -m pytest tests/ +``` + +**Common Lambda functions:** +- `batch-audience-delivery-processing-config` - Generates pipeline configuration from delivery schedule +- `bem-gen-job-parameters` - Generates EMR step parameters for BEM job +- `batch-vendor-stitch` - Generates stitch configuration per vendor +- `batch-delivery-formatter-publish-metrics` - Publishes formatter metrics to InfluxDB + +### Integration Tests + +**Run full integration test:** +```bash +cd integration-tests + +# First time: install dependencies +pip install -r requirements.txt + +# Run setup + execution + validation +bash run_full_integration_test.sh +``` + +The integration test: +1. Sets up test data in S3 with datetime-based paths (e.g., `integration/data/20250914_143022/`) +2. Triggers the `aud-delivery-proc-integration` step function +3. Polls for completion and validates SUCCESS status + +**Cost:** ~$0.25-0.30 per run, ~15-25 minutes with minimal test data. + +### GitHub Actions Workflows + +**Workflow files (in `.github/workflows/`):** + +| Workflow File | Description | +|--------------|-------------| +| `bem-jar.yml` | Builds and publishes the Scala JAR to S3 | +| `bem-lambdas.yml` | Deploys Lambda functions | +| `bem-step-functions.yml` | Deploys Step Functions | +| `bem-unit-tests.yml` | Runs Scala unit tests | + +**Deploy to an environment:** + +⚠️ **CRITICAL: Always specify `--ref` when deploying from a feature branch!** + +Without `--ref`, `gh workflow run` deploys from the **default branch (main)**, NOT your current branch. This is a common mistake that results in deploying old code. + +```bash +# Deploy JAR (required input: environment) +# JAR goes to qa (integration environment uses qa JAR) +gh workflow run bem-jar.yml -f environment=qa --ref + +# Deploy Lambdas (required input: environment) +gh workflow run bem-lambdas.yml -f environment=integration --ref + +# Deploy Step Functions (required inputs: environment, deployment_type) +gh workflow run bem-step-functions.yml -f environment=integration -f deployment_type=All --ref +``` + +**Step Functions deployment_type options:** +- `All` - Deploy all step functions +- `BEM Pipeline Step Function` +- `Batch Stitch Pipeline` +- `Batch Delivery Formatter Pipeline` +- `Audience Delivery Pipeline` +- `Audience Delivery Custom Delivery Pipeline` +- `Audience Delivery Processing Pipeline` +- `Audience Delivery Daily Pipeline` + +**Environment values:** +- `integration` - Integration testing environment (uses QA JAR) +- `dev`, `dev2`-`dev7` - Development environments +- `nonprod` - QA/staging environment +- `prod` - Production environment + +**Note:** The integration environment uses the QA JAR. When testing changes in integration: +1. Deploy JAR to `qa` +2. Deploy lambdas to `integration` +3. Deploy step functions to `integration` + +### Working with Step Functions + +Step functions are defined in `workflows/step-functions/*/statemachine/*.asl.json` and use **JSONata query language** (not JSONPath). + +**Key step functions:** +- `batch-audience-delivery-daily-pipeline` - Entry point triggered daily by EventBridge +- `batch-audience-delivery-processing` - Main pipeline orchestrating BEM → Stitch → Formatter +- `bem-pipeline` - EMR cluster creation → BEM job execution → cluster termination +- `batch-stitch-pipeline` - Parallel execution of stitch jobs per vendor +- `batch-delivery-formatter-pipeline` - Formatting for vendor-specific requirements + +## Key Concepts + +### Expression Evaluation (BEM) + +The BEM job (`BatchExpressionModelJob.scala`) evaluates audience expressions against SuperTagAv data: + +1. **Input:** + - `surveyexpressions.csv` - List of expressions to evaluate (e.g., `NOT (E999999998)`) + - SuperTagAv parquet data - User behavioral data with attribute bitmaps (evmap, avmap, cvmap) + - Models CSV - Expression evaluation models + +2. **Processing:** + - Reads SuperTagAv data (~2TB) with optimized schema (only needed bitmap columns) + - Evaluates each expression against each RCID's attribute bitmaps + - Uses `AudienceEvaluator` from `das-expression` library + - Outputs matches partitioned by `exprHash` + +3. **Output:** Parquet files with schema `(rcid, exprHash, score)` partitioned by `exprHash=` + +### Identity Stitching + +The Stitch job (`TableStitch.scala`) joins BEM output with vendor identity tables: + +1. **Input:** + - BEM cache (evaluated expressions with RCIDs) + - Vendor stitch tables (e.g., `tmid`, `hemsha2`, `viant_id` columns) + - Audience metadata (expression hash to audience ID mappings) + +2. **Processing:** + - Joins BEM cache with stitch table on `rcid` + - Maps `exprHash` to `audienceIds` and explodes + - Filters empty vendor keys + +3. **Output:** `(akey, vkey, score)` where `akey` = audience key, `vkey` = vendor key (e.g., cookie, email hash) + +### Delivery Configuration + +The `batch-audience-delivery-processing-config` Lambda generates pipeline configuration: + +- Queries RTP database for delivery schedules (via `RTP_PSQL_CONNECTION_STRING`) +- Determines which vendors need delivery based on schedule windows +- Generates S3 paths for tagav data, cookiejar, expressions, and output locations +- Sets EMR cluster configurations (instance types, counts, etc.) + +**Important fields:** +- `delivery_type` - "daily", "training", "custom" +- `tagav_date_folder` - Date folder for SuperTagAv input (format: YYYYMMDD) +- `cookie_jar_date` - Date folder for cookiejar data +- `start_time_of_delivery` / `end_time_of_delivery` - Time window for vendor deliveries (epoch milliseconds) + +### Formatter Output Path Layout + +Post CDP-118857, the Formatter Spark job partitions output as: +``` +/date=/vendor=/method=/akey=/part-*.csv.gz +``` + +Note the partition order: `vendor=*/method=*/` (NOT the legacy `method=av/vendor=*/`). + +Any downstream consumer parsing this path must use the new partition order. + +### `delta_with_full_fallback` Refresh Type + +The `_annotate_previous_date_partitions` function in `survey_expressions.py` checks `RefreshType.isDelta` — both `"delta"` and `"delta_with_full_fallback"` values are eligible for the previous-date-partition lookup. Do NOT check only for strict `"delta"` equality, or meta deliveries (the only `delta_with_full_fallback` vendor) will never get `previous_date_partition` stamped, effectively running as full refresh every time. + +### Stitch Pipeline Throttle Protection + +`batch-stitch-pipeline` uses `MaxConcurrency: 2` and a `Wait` state staggered by `$states.context.Map.Item.Index` seconds before each `EMR Add Step` (CDP-118972). This prevents `AddJobFlowSteps` API throttling (`EMR.AmazonElasticMapReduceException`) when submitting N steps in parallel. + +**Important:** The `EMR.ThrottlingException` retry was removed because Step Functions' top-level error-name match never fires for this error (the actual error is `EMR.AmazonElasticMapReduceException` with throttle info only in the cause string). + +## AWS Step Functions: Converting JSONPath to JSONata + +### Top-Level Changes + +Add `"QueryLanguage": "JSONata"` at the state machine or state level. + +### Field Replacements + +JSONPath's five fields are replaced with two in JSONata: +- **Arguments** - Replaces `Parameters` for sending data to integrated actions +- **Output** - Replaces `ResultPath`, `ResultSelector`, and `OutputPath` for transforming state output + +### JSONPath Expression Syntax + +- **JSONPath**: `"field.$": "$.path.to.value"` +- **JSONata**: `"field": "{% $states.input.path.to.value %}"` + +Remove the `.$` suffix and wrap expressions in `{% %}` delimiters. + +### ResultPath Conversions + +**Merge result into input at a path:** +```json +// JSONPath +"ResultPath": "$.ClusterCreationResult" + +// JSONata +"Output": "{% $merge([$states.input, {'ClusterCreationResult': $states.result}]) %}" +``` + +**Discard result, pass through input only:** +```json +// JSONPath +"ResultPath": null + +// JSONata +// Simply omit the Output field - input passes through by default +``` + +**Replace entire input with result (default):** +```json +// JSONPath +"ResultPath": "$" // or omit ResultPath + +// JSONata +// Omit Output field, or use: +"Output": "{% $states.result %}" +``` + +### OutputPath Conversions + +**Extract specific field from result:** +```json +// JSONPath +"OutputPath": "$.Payload" + +// JSONata +"Output": "{% $states.result.Payload %}" +``` + +### Catch Block Error Handling + +**Merge error into input:** +```json +// JSONPath +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "ResultPath": "$.Error", + "Next": "HandleError" + } +] + +// JSONata +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Output": "{% $merge([$states.input, {'Error': $states.errorOutput}]) %}", + "Next": "HandleError" + } +] +``` + +Note: `$states.errorOutput` is only available in Catch blocks. + +### Map State Conversions + +**ItemsPath:** +```json +// JSONPath +"ItemsPath": "$.step_configs" + +// JSONata +"Items": "{% $states.input.step_configs %}" +``` + +**ItemSelector:** +```json +// JSONPath +"ItemSelector": { + "step_config.$": "$$.Map.Item.Value", + "clusterId.$": "$.ClusterCreationResult.ClusterId" +} + +// JSONata +"ItemSelector": { + "step_config": "{% $states.context.Map.Item.Value %}", + "clusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" +} +``` + +**IMPORTANT:** Use `$states.context.Map.Item.Value` to access the current item in a Map state. There is **NO** `$states.item` variable - this is a common mistake that will cause "Field '$states.item' does not exist" errors. + +**⚠️ CRITICAL: Map States Must Preserve Input** + +By default, a Map state outputs an **array of results** from processing each item, which **loses the original input fields**. If subsequent states need access to fields from the original input (like a ClusterId from cluster creation), you MUST add an Output field to preserve the input. + +```json +// WRONG - This loses ClusterCreationResult from input +"Step Map": { + "Type": "Map", + "ItemSelector": { + "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" + }, + "Items": "{% $states.input.step_configs %}", + "Next": "TerminateCluster" // ❌ Will fail - ClusterCreationResult is lost! +} + +// CORRECT - Preserve input for next state +"Step Map": { + "Type": "Map", + "ItemSelector": { + "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" + }, + "Items": "{% $states.input.step_configs %}", + "Output": "{% $states.input %}", // ✅ Preserves all input fields + "Next": "TerminateCluster" +} +``` + +### Array Construction + +**Intrinsic function to array literal:** +```json +// JSONPath +"Args.$": "States.Array('cmd', '--flag', $.input.value, '--opt', $.input.other)" + +// JSONata - Use regular JSON array with individual JSONata expressions +"Args": [ + "cmd", + "--flag", + "{% $states.input.value %}", + "--opt", + "{% $states.input.other %}" +] +``` + +### Reserved Variables in JSONata + +- `$states.input` - Original input to the current state +- `$states.result` - Result from API/sub-workflow (Task, Parallel, Map states) +- `$states.errorOutput` - Error output (only in Catch blocks) +- `$states.context` - Execution metadata (StartTime, task token, Map.Item.Value, etc.) + - `$states.context.Map.Item.Value` - Current item in Map state iteration + - `$states.context.Map.Item.Index` - Current iteration index in Map state + +### Common Patterns + +**Pass through input unchanged:** +- Omit the Output field entirely, OR explicitly use `"Output": "{% $states.input %}"` + +**Add result as new field while preserving input:** +```jsonata +"Output": "{% $merge([$states.input, {'resultField': $states.result}]) %}" +``` + +**Create variables for use in later states:** +```jsonata +"Assign": { + "myVariable": "{% $states.input.someValue %}" +} +// Later states can reference {% $myVariable %} +``` + +## Recent Changes (May–June 2026) + +- **Batch-stitch AddJobFlowSteps throttle fix** (PR #263, CDP-118972): Added `MaxConcurrency=2` + index-based `Wait` state to stagger EMR step submissions; removed non-functional `EMR.ThrottlingException` retry +- **Formatter metrics lambda** (PR #253, CDP-118857): New `batch-delivery-formatter-publish-metrics` Lambda; routes per-audience rows to `com.resonate.delivery.format.count` and aggregate rows to `com.resonate.delivery.format.aggregate`; integrated into ASL after `EMR TerminateCluster Success` +- **`delta_with_full_fallback` lookback fix** (PR #253): `_annotate_previous_date_partitions` now accepts both `"delta"` and `"delta_with_full_fallback"` refresh types — meta deliveries now get `previous_date_partition` stamped correctly +- **Hive partition pruning for prior-side stitch read** (PR #253): `prunePartitions` takes `dates: Seq[String]` with static `col("date").isin(dates: _*)` filter; prevents OOM from scanning all date partitions for `delta_with_full_fallback` workloads +- **`urn` → `id` rename in Yahoo formatter** (PR #253): `YahooSyndicatedPayloadFormatter` output column renamed from `urn` to `id` +- **Source prefix path order fix** (PR #256, CDP-118937): Swapped `method=av/vendor=*` → `vendor=*/method=av/` in `batch-audience-custom-delivery.asl.json` to match new formatter partition layout diff --git a/claude-md-updates/dos-data-pipeline/CLAUDE.md b/claude-md-updates/dos-data-pipeline/CLAUDE.md new file mode 100644 index 0000000..7a39202 --- /dev/null +++ b/claude-md-updates/dos-data-pipeline/CLAUDE.md @@ -0,0 +1,126 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **Data-of-Scalars (DOS) Data Pipeline** — a Scala/Spark pipeline that processes geographic and district data for Resonate's political audience targeting product. It includes the geo-location pipeline (`GeoLocationDaily`, `GeoLocationFullBackfill`) and segment aggregation jobs. + +### Key Outputs + +| Dataset | S3 Path | +|---------|---------| +| Daily geo | `s3://resonate-core-datasets/data/geo/daily//` | +| Full geo | `s3://resonate-core-datasets/data/geo/full//` | +| Legislative geo | `s3://resonate-core-datasets/data/geo/legislative/full//` | +| Bitmap output | Used by ToBitmap / segment-aggregator downstream | + +## Repository Structure + +``` +├── src/main/scala/com/resonate/spark/apps/ +│ ├── GeoLocationDaily.scala # Daily geo enrichment job +│ ├── GeoLocationFullBackfill.scala # Full backfill job +│ ├── GeoLocationFull.scala # Rolling-full maintenance +│ └── ToBitmap.scala # Converts geo data to bitmap format +├── src/test/scala/ # Test suites +├── build.sbt # Scala 2.11.8, Spark 2.4.3 +└── pipelines/ + ├── geo-location/ + │ └── config/ + │ └── zip-district-mappings/ # ZIP→district CSV mapping files + │ ├── zip-congress-mapping.csv + │ ├── zip-proposed-congress-mapping.csv + │ ├── zip-state-senate-mapping.csv + │ └── zip-state-house-mapping.csv + └── segment-aggregator/ +``` + +## Common Development Tasks + +### Build and Test + +```bash +# Compile +sbt compile + +# Run all tests +sbt test + +# Run specific test +sbt "testOnly com.resonate.spark.apps.GeoLocationDailyTest" + +# Package for EMR +sbt assembly +``` + +**Note:** This project uses **Scala 2.11.8 / Spark 2.4.3** (older than the identity-graph repo). + +### Deploying + +Deployment is via GitHub Actions. See `.github/workflows/` for the deployment workflow targeting the `dos-data-pipeline` EMR job. + +### Updating ZIP→District Mappings + +The 4 CSV files under `pipelines/geo-location/config/zip-district-mappings/` are deployed to: +``` +s3://resonate-core-applications{-env}/dos/geo-location/configs/zip-district-mappings/ +``` + +They are versioned together as a snapshot and must use **L2 canonical namespace** format (e.g., `NHBELKNAP-01`, not `NH001`). + +**Schema:** Each file has columns `zip,state,`. + +## Key Concepts + +### District Source Provenance (`district_source`) + +Every geo record now carries a `district_source` field indicating how the district was resolved: + +| Value | Meaning | +|-------|---------| +| `L2_CONFIRMED` | L2 has a district AND `l2_party_confirmed = true` | +| `L2_UNCONFIRMED` | L2 has a district AND `l2_party_confirmed != true` | +| `IP_INFERRED` | No L2 district but zip→district mapping resolved via IP-geolocation | +| `null` | No district data available | + +Use `DistrictResolver` (the centralized helper) for all `district_source` derivation — do not duplicate the `when`/`coalesce` ladder. + +### IP-Inferred District Fallback + +`GeoLocationDaily` and `GeoLocationFullBackfill` both support IP-inferred district fallback: +- Uses the 4 ZIP→district CSV files as a lookup +- `GeoLocationDaily` resolves ZIP from NetAcuity IP lookup +- `GeoLocationFullBackfill` reads existing `zip` from the rolling-full (no second IP lookup needed) + +### ToBitmap Gating + +The L2-confirmed marker bit in ToBitmap is gated on `district_source = "L2_CONFIRMED"`. Do NOT set the marker unconditionally — `L2_UNCONFIRMED` and `IP_INFERRED` rows must not trigger the confirmed marker. + +### ZIP→District CSV Namespace Requirements + +The mapping CSVs **must** use L2 canonical district codes (matching `DistrictToEvkey.csv`): +- State house: county-based format, e.g., `NHBELKNAP-01` (NOT floterial `NH001`) +- State senate: L2 descriptive format, e.g., `MABRISTOL 14` (NOT floterial `MA023`) +- Congress: numeric, e.g., `MA01` + +Using floterial codes will cause rows to be silently dropped at the `state-legislative-districts-aggregation` join. + +**States requiring county-based namespace (state house):** NH, MA, MN, VT, MD, SD, ND +**States requiring L2 canonical namespace (state senate):** MA, AK, VT, DC, CT + +### GeoLocationFullBackfill + +- Does NOT take `districtColumnList` argument — always re-derives all 4 districts (congress, proposed_congress, state_senate, state_house) from scratch +- Reads `zip` from existing fullDf to run IP-fallback without a second NetAcuity lookup +- `GeoLocationFull.expectedCols` includes `zip` and `district_source` + +## Recent Changes (May–June 2026) + +- **IP-inferred district fallback** (PR #103, CDP-118946): Added `DistrictResolver` helper; both `GeoLocationDaily` and `GeoLocationFullBackfill` now support IP-inferred district lookup via 4 ZIP→district CSVs. Observed distribution: IP_INFERRED ~86.8%, L2_CONFIRMED ~9.9%, L2_UNCONFIRMED ~2.9%, null ~0.3% +- **`district_source` provenance** (PR #103): New column in geo output encoding how each district was resolved +- **ToBitmap L2-confirmed gating** (PR #103, CDP-118947): Marker bit now requires `district_source = "L2_CONFIRMED"` (was unconditional) +- **4-file ZIP→district mapping** (PR #103): Split consolidated CSV into 4 separate type-specific files with `zipDistrictMappingsBasePath` argument replacing `DistrictColumnList` +- **ZIP→district namespace fix — state house** (CDP-118946): Rewrote NH (PR #2 in step-function-workflow-orchestrator), then MA/MN/VT/MD/SD/ND (PR #3) from floterial → L2 canonical namespace; 0 unmatched state-house codes after fix +- **ZIP→district namespace fix — state senate** (CDP-118946): Rewrote MA/AK/VT/DC state senate codes; 2 remaining unmatched (stale data: `AK11`, `CT37`) +- **`state_senate`/`state_house` raw concat revert** (PR #15 in core-data-applications, CDP-118951): Reverted `parseDistrict` back to `concat(state, district)` for state-leg — numeric parsing was valid only for congress codes, not state-leg diff --git a/claude-md-updates/identity-graph/CLAUDE.md b/claude-md-updates/identity-graph/CLAUDE.md new file mode 100644 index 0000000..c8f2dfa --- /dev/null +++ b/claude-md-updates/identity-graph/CLAUDE.md @@ -0,0 +1,155 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **Identity Graph** (PRISM — Person Resolution and Identity System Map) Spark pipeline for Resonate. It builds and maintains the person identity graph by joining multi-source identifier data (L2, Experian, TapAd, LiveRamp) into a unified `identifier_index` and `persons` dataset. + +### Key Outputs + +| Dataset | S3 Path | Description | +|---------|---------|-------------| +| `identifier_index` | `s3://resonate-core-datasets/identity-graph/` | Cross-source identifier linkages | +| `persons` | `s3://resonate-core-datasets/identity-graph/persons/` | Deduplicated person records | +| JAR | `s3://resonate-core-applications/identity-graph/jars/identity-graph-latest.jar` | Published artifact | + +## Repository Structure + +``` +├── src/main/scala/com/resonate/spark/ +│ ├── apps/ +│ │ ├── identity/ # Core identity graph jobs +│ │ │ ├── ExperianPreprocessJob.scala +│ │ │ ├── L2ConsumerPreprocessJob.scala +│ │ │ ├── TapadIndexJob.scala +│ │ │ ├── L2IndexJob.scala +│ │ │ ├── L2ConsumerIndexJob.scala +│ │ │ ├── OnboardingMatchJob.scala +│ │ │ ├── OnboardingHouseholdLinksJob.scala +│ │ │ ├── OnboardingPersonsJob.scala +│ │ │ ├── MergeOnboardingLinksJob.scala +│ │ │ ├── PersonIdentityJob.scala +│ │ │ ├── CustomerMatchJob.scala +│ │ │ └── ExperianDataProcessor.scala +│ │ └── l2geo/ # L2 geo jobs (GeoLocationDaily, etc.) +│ └── utils/ +│ ├── HashUtils.scala # SHA256 person_id / household_id generation +│ ├── StagingWriter.scala # Cache-count-write-unpersist pattern +│ ├── AddressNormalizer.scala # USPS Pub 28 suffix expansion, nickname resolution +│ ├── IpFilter.scala # RFC1918 + datacenter IP blocklist +│ └── ScoringConfig.scala # Confidence scoring config +├── src/test/scala/ # Test suites (443 unit tests) +├── build.sbt # Scala 2.12.17, Spark 3.5.0 +├── docs/ +│ └── design/ # PRISM design docs (HTML) +│ ├── design_and_production_readiness.html +│ ├── consumer_interface_design.html +│ ├── project_plan.html +│ └── raw_dataset_columns.html +└── docs/scripts/ + └── build_columns_page.py # Regenerates raw_dataset_columns.html +``` + +## Common Development Tasks + +### Build and Test + +```bash +# Run all tests +sbt test + +# Run specific test suite +sbt "testOnly com.resonate.spark.apps.identity.ExperianPreprocessJobSpec" + +# Available test suites: +# ExperianPreprocessJobSpec, L2ConsumerPreprocessJobSpec, TapadIndexJobSpec +# L2IndexJobSpec, L2ConsumerIndexJobSpec, OnboardingMatchJobSpec +# OnboardingHouseholdLinksJobSpec, OnboardingPersonsJobSpec, MergeOnboardingLinksJobSpec +# PersonIdentityJobSpec, CustomerMatchJobSpec +# HashUtilsSpec, ScoringConfigSpec, SchemasSpec, ConfigSpec, IpFilterSpec +# StagingWriterSpec, AddressNormalizerSpec + +# Package JAR for EMR +sbt assembly +# Published to S3 via CI: s3://resonate-core-applications/identity-graph/jars/identity-graph-latest.jar +``` + +**JVM options (JDK 17):** The build.sbt automatically adds `--add-opens` flags for JDK 9+ to satisfy Spark 3.5 reflective access requirements. + +### Deploying + +Deployment follows GitOps — see [Confluence: Pipeline Scala SBT GitOps](https://resonate-jira.atlassian.net/wiki/spaces/ASD/pages/3683057840). + +The CI pipeline publishes the fat JAR to: +``` +s3://resonate-core-applications/identity-graph/jars/identity-graph-latest.jar +``` + +## Key Concepts + +### Pipeline Jobs + +**Preprocessing:** +- `ExperianPreprocessJob` — Normalizes Experian ConsumerView + digital graph; uses `min(id_value)` for deterministic aggregation +- `L2ConsumerPreprocessJob` — Preprocesses L2 consumer tab-delimited files +- `ExperianDataProcessor` — Converts Experian offline graph deliveries (CSV.gz → Parquet); supports glob patterns + +**Indexing:** +- `TapadIndexJob` — Experian/TapAd to `identifier_index` (direct + staging) +- `L2IndexJob` — L2 voter to `identifier_index`; `HOUSEHOLD_ID` null guard (filterNulls=true) +- `L2ConsumerIndexJob` — L2 consumer to `identifier_index` + +**Matching:** +- `OnboardingMatchJob` — 4-phase matching (blocking, corroboration, scoring, fallback); deterministic Phase 3 tie-breaker via `person_id.asc` +- `OnboardingHouseholdLinksJob` — Household CTV/TTD attribution via digital graph +- `MergeOnboardingLinksJob` — BL-30 multi-provider collapse + BL-4/BL-107 confidence scoring + +**Core:** +- `PersonIdentityJob` — Main orchestrator: persons + identifier_index from L2/LI/LR/Sovrn; null `identifier_value` guard before groupBy +- `CustomerMatchJob` — Ephemeral customer file matching (7 fallback tiers); deterministic tie-breaker + +### Shared Utilities + +| Utility | Purpose | +|---------|---------| +| `HashUtils` | Deterministic SHA256-based `person_id` and `household_id` generation | +| `StagingWriter` | Cache → count → write → unpersist pattern for staging partitions | +| `AddressNormalizer` | USPS Pub 28 suffix expansion, nickname resolution (up to 5 hops), soundex | +| `IpFilter` | RFC1918 + datacenter/cloud IP blocklist with IPv6 exclusion | +| `ScoringConfig` | Centralized confidence scoring, recency decay, artifact thresholds | + +### PRISM Design + +PRISM (Person Resolution and Identity System Map) has 6 delivery tracks: +- **Track A** — L2 ingestion +- **Track B** — Experian ingestion +- **Track C** — TapAd/LiveRamp linkage +- **Track D** — Consumer interface (Append/BlockGraph/GeoFix/Clean Room/Cortex) +- **Track E** — Operational readiness +- **Track F** — Monitoring/alerting + +Design docs are published at: +`s3://s3.nonprod.aws.resonatedigital.net/person-identity-graph/` + +### All Jobs Use Scopt CLI Args + +All jobs use `scopt` for command-line argument parsing (no `application.conf`). Run with: +```bash +spark-submit --class com.resonate.spark.apps.identity. \ + identity-graph-latest.jar \ + -- \ + -- +``` + +## Recent Changes (May–June 2026) + +- **11 Scala jobs ported** from `resonate-research` into this repo (PR #22): ExperianPreprocess, L2ConsumerPreprocess, TapadIndex, L2Index, L2ConsumerIndex, OnboardingMatch, OnboardingHouseholdLinks, OnboardingPersons, MergeOnboardingLinks, PersonIdentity, CustomerMatch — 443 unit tests, all passing +- **ExperianDataProcessor** added (PR #21, CDP-118890): converts Experian gzip CSV deliveries to Parquet using Spark +- **PRISM design docs** committed to `docs/design/` (PR #25): 6 HTML artifacts backing 20 open Jira tickets (CDP-118884, 118991–119016) +- **Bug fixes applied during port:** + - L2IndexJob `HOUSEHOLD_ID filterNulls` → true (prevents phantom household collapse) + - OnboardingMatchJob Phase 3 non-determinism → added `person_id.asc` tie-breaker + - ExperianPreprocessJob non-deterministic aggregation → `first()` → `min()` + - AddressNormalizer nickname chains → iterative lookup (max 5 hops) + - MergeOnboardingLinksJob + PersonIdentityJob → null `identifier_value` guard before groupBy diff --git a/claude-md-updates/step-function-workflow-orchestrator/CLAUDE.md b/claude-md-updates/step-function-workflow-orchestrator/CLAUDE.md new file mode 100644 index 0000000..5620789 --- /dev/null +++ b/claude-md-updates/step-function-workflow-orchestrator/CLAUDE.md @@ -0,0 +1,170 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **X-Men Step Function Workflow Orchestrator** — AWS Step Functions state machines and supporting infrastructure that orchestrate Resonate's core data pipelines. Each pipeline is a self-contained directory under `pipelines/` with its own EMR cluster config, Step Function ASL definition, Terraform infrastructure, and integration tests. + +### Active Pipelines + +| Pipeline | Purpose | +|----------|---------| +| `behavior-stitch` | Stitches behavioral signal to RCIDs | +| `topic-aggregation` | Aggregates topic signals | +| `idsync-overlap` | Computes identity-sync overlap metrics | +| `damlam-preparation` | Prepares DAMLAM data for audience modeling | +| `marketops-overlap` | Computes market-ops overlap metrics | +| `sovrn-weekly` | Weekly Sovrn behavioral preprocess | +| `geo-location` | GeoLocation daily + full backfill pipeline | +| `l2-processing` | L2 voter data processing and stitch | +| `thirdparty-enrichment` | Third-party enrichment (TPE) | +| `experian-data-processing` | Experian offline graph pipeline | +| `total-sketch-first-party` | First-party total sketch | +| `state-legislative-districts-aggregation` | State legislative district bitmap aggregation | + +## Repository Structure + +``` +├── pipelines// +│ ├── config/ +│ │ ├── emr.json # EMR cluster config (prod) +│ │ ├── emr-on-demand.json # EMR on-demand cluster config +│ │ ├── prod/params.json # Pipeline parameters (prod) +│ │ ├── dev/params.json # Pipeline parameters (dev) +│ │ ├── integration/ # Integration test configs +│ │ └── qa/ # QA environment configs +│ ├── statemachine/*.asl.json # Step Function ASL definition +│ ├── events//event.json # EventBridge event payload +│ ├── tests/integration/ # Python integration test suite +│ └── terraform/ # Terraform/Terragrunt modules +├── terraform/pipelines///terragrunt.hcl +├── .github/workflows/ # GitHub Actions +└── lambdas/ + └── dynamic-dates/ # Lambda for resolving dynamic date paths +``` + +## Common Development Tasks + +### Deploying a Pipeline + +```bash +# Deploy via GitHub Actions workflow_dispatch +# Navigate to Actions → "X-Men Step Function Workflow" +# Select: environment (dev/qa/integration/prod), step_function name + +# Or via gh CLI: +gh workflow run step_function.yml \ + -f environment=dev \ + -f step_function= +``` + +### Running Integration Tests + +```bash +# Upload golden data (once per pipeline) +python3 pipelines//tests/integration/create_golden_dataset_synthetic.py + +# Run tests (allow up to 90 min for EMR) +pytest pipelines//tests/integration/ -v --timeout=6000 +``` + +Integration tests: +- Deploy integration-specific Step Function via terragrunt +- Upload synthetic golden data to `s3://resonate-core-datasets-dev/integration-test//` +- Execute the `-integration` state machine +- Assert S3 output, `_SUCCESS` markers, and execution history + +### Terraform / Terragrunt + +```bash +# Plan infrastructure changes +cd terraform/pipelines// +terragrunt plan + +# Apply (requires AWS SSO login) +aws sso login +terragrunt apply +``` + +### EMR Configuration + +Each pipeline's `config/emr.json` specifies: +- `ReleaseLabel` — EMR version (currently migrating to `emr-7.12.0`) +- Instance types and counts +- YARN memory/vcore settings +- `AutoTerminationPolicy.IdleTimeout` + +**EMR 5 → 7 migration notes (CDP-118269):** +- Bump `ReleaseLabel` from `emr-5.34.0` → `emr-7.12.0` +- Update JAR path to `core-data-pipeline-spark3-latest.jar` +- Fix YARN vcores: set `yarn.nodemanager.resource.cpu-vcores` explicitly (EMR 7 no longer auto-detects with multiplier) +- Reduce executor memory for integration: r5.xlarge YARN limit is ~24576 MB with higher overhead factor on EMR 7 + +### Dynamic Dates Lambda + +The `lambdas/dynamic-dates/` Lambda resolves `` and `` tokens in event JSON path fields. + +**Two resolution modes:** +- **Directory mode** (prefix ends with `/`): lists subdirectories, picks latest with `_SUCCESS` +- **Flat-file mode** (prefix does NOT end with `/`): lists objects, parses date from key. The text after `>` is a **completion sentinel** — a date is only valid if `prefix + DATE + sentinel_suffix` is an actual S3 key + +```json +// Example dual-field flat-file pattern (TapAd Digital Graph) +"TapAdDigitalGraphSuccessMarker": "s3://bucket/path/resonate_ids_full__000000.parquet.master.trigger", +"TapAdDigitalGraphData": "s3://bucket/path/resonate_ids_full_<@tapadDate>_000000_part-*.parquet" +``` + +## Key Concepts + +### Pipeline Anatomy + +Each pipeline Step Function follows a standard pattern: +1. **EventBridge trigger** → Lambda (`dynamic-dates`) resolves date tokens +2. **Precondition check** → Lambda (`precondition-checker`) validates S3 inputs +3. **EMR Create Cluster** → Provisions cluster +4. **EMR Add Step(s)** → Runs Spark job(s) +5. **EMR Terminate Cluster** → Cleans up +6. **SNS Notify** → Success or failure notification + +### Environments + +| Environment | Account | Purpose | +|-------------|---------|---------| +| `dev` / `dev2`-`dev7` | dev | Individual developer sandboxes | +| `integration` | dev | CI integration testing (manual trigger, no cron) | +| `qa` | dev | QA testing (manual trigger, no cron) | +| `nonprod` | nonprod | Pre-production | +| `prod` | prod | Production (daily EventBridge cron) | + +### JAR Naming Convention + +- `core-data-pipeline-latest.jar` → Spark 2 (EMR 5) build (legacy) +- `core-data-pipeline-spark3-latest.jar` → Spark 3 (EMR 7) build (current standard) + +Pipeline-specific JARs (e.g., `identity-graph-latest.jar`) live in `s3://resonate-core-applications/`. + +### Geo-Location Pipeline + +The `geo-location` pipeline runs `GeoLocationDaily` and `GeoLocationFullBackfill` with: +- **zip→district CSV mappings** (4 files): `zip-congress-mapping.csv`, `zip-proposed-congress-mapping.csv`, `zip-state-senate-mapping.csv`, `zip-state-house-mapping.csv` +- Deployed to `s3://resonate-core-applications{-env}/dos/geo-location/configs/zip-district-mappings/` +- `district_source` provenance: `L2_CONFIRMED`, `L2_UNCONFIRMED`, `IP_INFERRED` + +**State machine backfill logic:** +The `Should Run Full` choice state checks `$.FullInputPath` before path-equality comparison — when backfill ran, `Use Backfill Output` sets `$.FullInputPath` to the backfill path and that must be preferred (CDP-118512 fix). + +### L2 Processing Pipeline + +Wired to discover TapAd Digital Graph data via flat-file sentinel pattern and LiveRamp Tradedesk Agg via directory pattern. Parameters `TapAdDigitalGraphSuccessMarker` / `TapAdDigitalGraphData` / `TapAdDigitalGraphLookbackDays` control the new HEM→RCID derivation path. + +## Recent Changes (May–June 2026) + +- **EMR 7 migration** (CDP-118269): `behavior-stitch`, `topic-aggregation`, `idsync-overlap`, `damlam-preparation`, `marketops-overlap`, `sovrn-weekly` all migrated to EMR 7.12.0 / Spark 3 +- **Integration test suites** added for `marketops-overlap`, `idsync-overlap`, `fusion-behavior-preprocess`, `behavior-stitch` +- **QA environments** added for `geo-location`, `l2-processing`, `thirdparty-enrichment`, `total-sketch-first-party` +- **`fusion-behavior-preprocess` removed** (CDP incident #16828 — pipeline was dormant; deploy on 2026-05-28 caused EC2 capacity failure) +- **geo-location zip→district mappings** rewritten to L2 canonical namespace for NH, MA, MN, VT, MD, SD, ND state house and senate (CDP-118946) +- **L2 stitch** wired to TapAd Digital Graph for HEM→RCID via flat-file sentinel pattern (CDP-118512) +- **Geo backfill fix**: `Should Run Full` choice now uses `IsPresent` check on `$.FullInputPath` before path-equality (CDP-118512) +- **Experian pipeline**: switched source from cross-account `tapad-resonate` to `resonate-experian-rld` bucket (CDP-118890)