diff --git a/claude-md-updates/README.md b/claude-md-updates/README.md new file mode 100644 index 0000000..3327846 --- /dev/null +++ b/claude-md-updates/README.md @@ -0,0 +1,25 @@ +# CLAUDE.md Updates + +This directory contains updated (or new) `CLAUDE.md` files for 5 repositories that have seen recent merged PRs from the team. These files were generated by reviewing merged PRs and main-branch commits as of 2026-06-06. + +## Repos Covered + +| Repo | Action | Key Changes | +|---|---|---| +| [step-function-workflow-orchestrator](./step-function-workflow-orchestrator/CLAUDE.md) | **Update** | Added EMR 7.12 migration tracker, decommissioned pipelines section, CheckSourceFreshness Lambda docs, Experian bucket change | +| [batch-audience-delivery-syndication](./batch-audience-delivery-syndication/CLAUDE.md) | **Create** | New repo — documents BlockGraph delivery Lambdas (T06/T07/T08), OpenX path layout fix, testing patterns | +| [identity-graph](./identity-graph/CLAUDE.md) | **Create** | New repo — documents PRISM Scala/Spark pipeline jobs (11 jobs) + prism_dbt v1.0 package (4 macros, 3 service models, NAME_ADDRESS_HASH UDF) | +| [batch-expression-modeling](./batch-expression-modeling/CLAUDE.md) | **Update** | Added BlockGraph vendor support (stitch_columns, audience_bitmap_path config keys), batch-stitch rate-limiting docs, formatter path layout change | +| [core-data-pipelines-spark](./core-data-pipelines-spark/CLAUDE.md) | **Update** | Added Sovrn Spark 3 fixes, full app inventory, deprecated CookieJarSampler note, security note on credentials | + +## How to Apply + +Each CLAUDE.md file should be copied to the root of the corresponding repo and committed as a PR. The files in this directory represent the **complete** intended content (not a diff). + +## PR Authors Whose Work Is Reflected + +- **SayaliPat** — identity-graph (prism_dbt, Scala pipeline jobs, Experian pipeline fixes) +- **shrivastavakapil2000** — step-function-workflow-orchestrator (EMR 7.12 migrations, decommissions), core-data-pipelines-spark (Sovrn fixes, TopicTag port) +- **JoeVsVolcano** — step-function-workflow-orchestrator (shuffle partitions, lost files) +- **mike-brant** — batch-expression-modeling (formatter, BlockGraph config), step-function-workflow-orchestrator (geo-location fixes), batch-audience-delivery-syndication (formatter path swap) +- **nathan-resonate** — batch-audience-delivery-syndication (BlockGraph T06/T07/T08 Lambdas), batch-expression-modeling (BlockGraph config lambda) diff --git a/claude-md-updates/batch-audience-delivery-syndication/CLAUDE.md b/claude-md-updates/batch-audience-delivery-syndication/CLAUDE.md new file mode 100644 index 0000000..043ac8c --- /dev/null +++ b/claude-md-updates/batch-audience-delivery-syndication/CLAUDE.md @@ -0,0 +1,131 @@ +# batch-audience-delivery-syndication + +## Project Purpose and Architecture Overview + +This repo contains Lambda functions and infrastructure for syndicated and custom **batch audience delivery** — the post-BEM/Stitch/Formatter step that uploads audience files to third-party platforms (OpenX, Viant, Experian, BlockGraph/FreeWheel, and others). + +Each Lambda handles one stage of a vendor delivery workflow: config resolution → file renaming / format transformation → upload to vendor S3 / SFTP. + +**Top-level layout:** + +``` +workflows/ + lambdas/ # Python Lambda source — one subdir per function + openx-publish-data-files/ + viant-publish-files/ + experian-syndication-notify/ + blockgraph-create-taxonomy-file/ # T06: generates BlockGraph metadata CSV + blockgraph-rename-files/ # T07: concatenates + renames Spark output + blockgraph-publish-files/ # T08: cross-account upload to BlockGraph S3 + ... +terraform/ + workflows/ + lambdas/ # Terragrunt configs per Lambda per environment + / + dev/ + prod/ + (qa/ where applicable) +.github/ + workflows/ + lambda.yml # Manual deploy workflow for individual Lambdas + all.yml # CI: auto-discovers all Lambda dirs and runs tests +``` + +--- + +## Key Commands + +### Python Lambda Development + +Each Lambda has its own `requirements.txt`. Use `pip` or `uv` to install dependencies per Lambda. + +```bash +# Run tests for a specific Lambda (from the Lambda dir) +cd workflows/lambdas/ +pip install -r requirements.txt +python -m pytest tests/ -v + +# Run all Lambda tests (from repo root) +for dir in workflows/lambdas/*/; do + (cd "$dir" && python -m pytest tests/ -q 2>/dev/null || true) +done +``` + +### GitHub Actions Deployment + +All deploys go through GitHub Actions. The `lambda.yml` workflow supports manual dispatch with an `environment` input. + +```bash +# Deploy a specific Lambda (via gh CLI from your machine) +gh workflow run lambda.yml \ + -f lambda=blockgraph-publish-files \ + -f environment=dev \ + --ref + +# Check deployment status +gh run list --workflow=lambda.yml --limit=5 +gh run watch +``` + +**Environments:** `dev`, `qa`, `prod` (not all Lambdas have all environments — check the `terraform/` dir). + +--- + +## Lambda Inventory + +### OpenX +- **`openx-publish-data-files`** — Uploads audience segment files to OpenX S3 (`resonate-openx-syndication` bucket). Outputs `*.csv.gz` files (hardcoded extension as of PR #46). + +### Viant +- **`viant-publish-files`** — Publishes audience files to Viant's S3 via cross-account creds from SSM. + +### Experian +- **`experian-syndication-notify`** — Notifies Experian after delivery. + +### BlockGraph / FreeWheel (CDP-118694 epic) + +The BlockGraph pipeline delivers RID-keyed (person-keyed) audience data to BlockGraph's S3 bucket using BG-issued cross-account credentials. Three Lambdas implement the delivery chain: + +| Lambda | Ticket | Purpose | +|---|---|---| +| `blockgraph-create-taxonomy-file` | CDP-118915 (T06) | Generates metadata CSV(s): 13-field (initial/net-new) or 8-field (refresh/known) per BlockGraph spec. Reads audience set from ADS (syndicated) or event `audience_key_list` (custom). Routes by delivery state (known PSIDs). | +| `blockgraph-rename-files` | CDP-118916 (T07) | Concatenates per-audience Spark output parts into a single `resonate__.csv.gz`. Uses S3 multipart copy for large files, download-concat-upload fallback for small parts. | +| `blockgraph-publish-files` | CDP-118917 (T08) | Uploads renamed segment files and metadata CSVs to BlockGraph's S3 (`auto/segment/upload/`, `auto/segment/metadata/`). Uses BG-issued cross-account credentials stored in SSM under a BlockGraph-specific prefix (see `terraform/workflows/lambdas/blockgraph-publish-files/` for authoritative parameter names); writes delivery-state delta (net-new PSIDs) to our own bucket. | + +**BlockGraph delivery key facts:** +- Person-keyed (RID), not cookie-keyed (RCID) — audiences evaluated against a personJar bitmap +- Stitch table: `person_identity_graph_beta`; stitch columns: `norm_address_line, norm_city, norm_state, norm_zip, zip_plus4` +- Taxonomy metadata paths: `/batch-delivery-payload/metadata/resonate_metadata_{initial,refresh}_.csv` +- State file path: `/state/known-segments/run_date=YYYYMMDD/run_.csv` +- Two delivery modes: `blockgraph_syndicated` (ADS-sourced) and `blockgraph_custom` (event `audience_key_list`) +- SSM keys: BG-issued credentials stored under a BlockGraph-specific SSM prefix (see `terraform/workflows/lambdas/blockgraph-publish-files/` for authoritative names) + +--- + +## Infrastructure Notes + +- **IAM:** Each Lambda has its own execution role in `terraform/workflows/lambdas///`. The `blockgraph-publish-files` role has **no** direct permission on BlockGraph's S3 — writes happen via SSM-stored BG credentials. +- **No `terragrunt apply` locally** — all infra changes go through GitHub Actions. +- **Multipart upload abort:** `blockgraph-rename-files` calls `abort_multipart_upload` on failure so partial uploads don't accumulate. + +--- + +## Formatter Path Layout + +As of CDP-118857 / CDP-118937 (May 2026), the batch-expression-modeling Formatter outputs partitions in the order: +``` +/date=/vendor=/method=/akey=/ +``` + +Previous layout was `method=av/vendor=*/` — the swap caused "No files were able to be copied" errors in this repo's ASLs. Any ASL that constructs a `source_prefix` must use the new `vendor=*/method=*/` order. + +--- + +## Testing + +- Each Lambda's `tests/` directory uses `pytest` with `moto` or `unittest.mock` for S3/SSM simulation. +- `blockgraph-rename-files` uses an in-memory `FakeS3` that byte-checks gzip concatenation output. +- `blockgraph-create-taxonomy-file`: 18 unit tests (100% of ticket cases a–i). +- `blockgraph-rename-files`: 23 unit tests (100% line coverage). +- `blockgraph-publish-files`: 23 unit tests (94% coverage). +- CI (`all.yml`) auto-discovers all Lambda dirs and runs their test suites. diff --git a/claude-md-updates/batch-expression-modeling/CLAUDE.md b/claude-md-updates/batch-expression-modeling/CLAUDE.md new file mode 100644 index 0000000..be49e8a --- /dev/null +++ b/claude-md-updates/batch-expression-modeling/CLAUDE.md @@ -0,0 +1,535 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This repository contains the **Batch Expression Modeling (BEM)** system for Resonate's audience delivery platform. It evaluates audience expressions against user behavioral data at scale, processes the results through identity stitching, and formats the output for various downstream vendors (Meta, TikTok, TheTradeDesk, BlockGraph, etc.). + +### High-Level Architecture + +The system consists of three main components orchestrated by AWS Step Functions: + +1. **BEM (Batch Expression Modeling)** - Scala/Spark job that evaluates ~350 audience expressions against 2TB+ of SuperTagAv data +2. **Stitch** - Scala/Spark job that joins evaluated expressions with vendor identity tables (cookies, HEMs, device IDs, or RID+address for BlockGraph) +3. **Formatter** - Scala/Spark job that formats stitched data for specific vendor requirements + +Each component runs on AWS EMR clusters and is configured/orchestrated by Lambda functions and Step Functions state machines. + +## Repository Structure + +``` +├── src/main/scala/com/resonate/ +│ ├── bem/ # Batch Expression Modeling (evaluates expressions against SuperTagAv) +│ ├── stitch/ # Identity stitching (joins RCID with vendor IDs) +│ ├── delivery/ # Delivery formatting and vendor-specific logic +│ └── utils/ # Shared utilities +├── workflows/ +│ ├── lambdas/ # Lambda functions for pipeline configuration +│ │ ├── batch-audience-delivery-processing-config/ # Main config lambda +│ │ ├── batch-vendor-stitch/ # Stitch config lambda +│ │ ├── bem-gen-job-parameters/ # BEM job parameters +│ │ └── ... +│ └── step-functions/ # Step function state machines (ASL JSON) +│ ├── batch-audience-delivery-processing/ # Main pipeline +│ ├── batch-audience-delivery-daily-pipeline/ # Daily orchestrator +│ ├── bem-pipeline/ # BEM execution +│ ├── batch-stitch-pipeline/ # Stitch execution +│ └── batch-delivery-formatter-pipeline/ # Formatter execution +├── terraform/ # Infrastructure as code (one directory per Lambda/Step Function) +├── integration-tests/ # End-to-end integration tests +└── build.sbt # SBT build configuration +``` + +## Common Development Tasks + +### Scala/Spark Development + +**Build and test:** +```bash +# Compile Scala code +./compile.sh # or: sbt compile + +# Run tests +sbt test + +# Run specific test +sbt "testOnly com.resonate.bem.BatchExpressionModelJobTest" + +# Package JAR for EMR deployment +./package.sh # or: sbt assembly +# Output: target/batch-expression-modeling.jar +``` + +**AWS authentication:** +```bash +# Required before uploading JARs to S3 or accessing AWS resources +aws sso login +``` + +### Python Lambda Development + +**Lambda functions are in `workflows/lambdas/`**. Each has its own `requirements.txt`. + +**Run tests for a Lambda:** +```bash +cd workflows/lambdas/ +python -m pytest tests/ +``` + +**Common Lambda functions:** +- `batch-audience-delivery-processing-config` - Generates pipeline configuration from delivery schedule +- `bem-gen-job-parameters` - Generates EMR step parameters for BEM job +- `batch-vendor-stitch` - Generates stitch configuration per vendor + +### Integration Tests + +**Run full integration test:** +```bash +cd integration-tests + +# First time: install dependencies +pip install -r requirements.txt + +# Run setup + execution + validation +bash run_full_integration_test.sh +``` + +The integration test: +1. Sets up test data in S3 with datetime-based paths (e.g., `integration/data/20250914_143022/`) +2. Triggers the `aud-delivery-proc-integration` step function +3. Polls for completion and validates SUCCESS status + +**Cost:** ~$0.25-0.30 per run, ~15-25 minutes with minimal test data. + +### GitHub Actions Workflows + +**Workflow files (in `.github/workflows/`):** + +| Workflow File | Description | +|--------------|-------------| +| `bem-jar.yml` | Builds and publishes the Scala JAR to S3 | +| `bem-lambdas.yml` | Deploys Lambda functions | +| `bem-step-functions.yml` | Deploys Step Functions | +| `bem-unit-tests.yml` | Runs Scala unit tests | + +**Deploy to an environment:** + +⚠️ **CRITICAL: Always specify `--ref` when deploying from a feature branch!** + +Without `--ref`, `gh workflow run` deploys from the **default branch (main)**, NOT your current branch. This is a common mistake that results in deploying old code. + +```bash +# Deploy JAR (required input: environment) +# JAR goes to qa (integration environment uses qa JAR) +gh workflow run bem-jar.yml -f environment=qa --ref + +# Deploy Lambdas (required input: environment) +gh workflow run bem-lambdas.yml -f environment=integration --ref + +# Deploy Step Functions (required inputs: environment, deployment_type) +gh workflow run bem-step-functions.yml -f environment=integration -f deployment_type=All --ref +``` + +**Example deploying all components from a feature branch:** +```bash +BRANCH="feature/CDP-123456-my-feature" +gh workflow run bem-jar.yml -f environment=qa --ref $BRANCH +gh workflow run bem-lambdas.yml -f environment=integration --ref $BRANCH +gh workflow run bem-step-functions.yml -f environment=integration -f deployment_type=All --ref $BRANCH +``` + +**Step Functions deployment_type options:** +- `All` - Deploy all step functions +- `BEM Pipeline Step Function` +- `Batch Stitch Pipeline` +- `Batch Delivery Formatter Pipeline` +- `Audience Delivery Pipeline` +- `Audience Delivery Custom Delivery Pipeline` +- `Audience Delivery Processing Pipeline` +- `Audience Delivery Daily Pipeline` + +**Environment values:** +- `integration` - Integration testing environment (uses QA JAR) +- `dev`, `dev2`-`dev7` - Development environments +- `nonprod` - QA/staging environment +- `prod` - Production environment + +**Check deployment status:** +```bash +# List recent runs for a workflow +gh run list --workflow=bem-jar.yml --limit=3 +gh run list --workflow=bem-lambdas.yml --limit=3 +gh run list --workflow=bem-step-functions.yml --limit=3 + +# Watch a specific run +gh run watch +``` + +**Note:** The integration environment uses the QA JAR. When testing changes in integration: +1. Deploy JAR to `qa` +2. Deploy lambdas to `integration` +3. Deploy step functions to `integration` + +### Working with Step Functions + +Step functions are defined in `workflows/step-functions/*/statemachine/*.asl.json` and use **JSONata query language** (not JSONPath). + +**Key step functions:** +- `batch-audience-delivery-daily-pipeline` - Entry point triggered daily by EventBridge +- `batch-audience-delivery-processing` - Main pipeline orchestrating BEM → Stitch → Formatter +- `bem-pipeline` - EMR cluster creation → BEM job execution → cluster termination +- `batch-stitch-pipeline` - Parallel execution of stitch jobs per vendor +- `batch-delivery-formatter-pipeline` - Formatting for vendor-specific requirements + +## Key Concepts + +### Expression Evaluation (BEM) + +The BEM job (`BatchExpressionModelJob.scala`) evaluates audience expressions against SuperTagAv data: + +1. **Input:** + - `surveyexpressions.csv` - List of expressions to evaluate (e.g., `NOT (E999999998)`) + - SuperTagAv parquet data - User behavioral data with attribute bitmaps (evmap, avmap, cvmap) + - Models CSV - Expression evaluation models + +2. **Processing:** + - Reads SuperTagAv data (~2TB) with optimized schema (only needed bitmap columns) + - Evaluates each expression against each RCID's attribute bitmaps + - Uses `AudienceEvaluator` from `das-expression` library + - Outputs matches partitioned by `exprHash` + +3. **Output:** Parquet files with schema `(rcid, exprHash, score)` partitioned by `exprHash=` + +### Identity Stitching + +The Stitch job (`TableStitch.scala`) joins BEM output with vendor identity tables: + +1. **Input:** + - BEM cache (evaluated expressions with RCIDs) + - Vendor stitch tables (e.g., `tmid`, `hemsha2`, `viant_id` columns) + - Audience metadata (expression hash to audience ID mappings) + +2. **Processing:** + - Joins BEM cache with stitch table on `rcid` + - Maps `exprHash` to `audienceIds` and explodes + - Filters empty vendor keys + +3. **Output:** `(akey, vkey, score)` where `akey` = audience key, `vkey` = vendor key (e.g., cookie, email hash) + +### Delivery Configuration + +The `batch-audience-delivery-processing-config` Lambda generates pipeline configuration: + +- Queries RTP database for delivery schedules (via `RTP_PSQL_CONNECTION_STRING`) +- Determines which vendors need delivery based on schedule windows +- Generates S3 paths for tagav data, cookiejar, expressions, and output locations +- Sets EMR cluster configurations (instance types, counts, etc.) + +**Important fields:** +- `delivery_type` - "daily", "training", "custom" +- `tagav_date_folder` - Date folder for SuperTagAv input (format: YYYYMMDD) +- `cookie_jar_date` - Date folder for cookiejar data +- `start_time_of_delivery` / `end_time_of_delivery` - Time window for vendor deliveries (epoch milliseconds) + +### BlockGraph Vendor Support (CDP-118913) + +BlockGraph (BG) is a **person-keyed (RID)** delivery type — distinct from cookie-keyed (RCID) vendors. + +**Two delivery types registered:** +- `blockgraph_syndicated` — ADS-sourced audience set from the BlockGraph ADS group +- `blockgraph_custom` — audience set supplied via event `audience_key_list` + +**New generic config keys added to `vendor-config.ini`:** + +| Key | Description | +|---|---| +| `stitch_columns` | Plural form for multi-column stitch (e.g., `norm_address_line, norm_city, norm_state, norm_zip, zip_plus4`). Existing single-column vendors continue using `stitch_column`. | +| `audience_bitmap_path` | Logical name for the audience-evaluation source. Defaults to `tagav`; BlockGraph uses `person_jar`. | + +**BG stitch table:** `person_identity_graph_beta` + +**`person_jar_path`** is supplied per-run in the delivery event and passed through to the output config. Missing `person_jar_path` fails fast before any ADS/RTP work. + +To query `audience_bitmap_path` for a vendor: `get_audience_bitmap_path(delivery_type)` in `survey_expressions.py`. + +### Formatter Output Path Layout (Post-CDP-118857) + +As of May 2026, the Formatter outputs to: +``` +/date=/vendor=/method=/akey=/ +``` + +Previous layout was `method=av/vendor=*/`. Any ASL or Lambda that constructs a `source_prefix` for reading Formatter output must use the new `vendor=*/method=*/` order. + +### Batch-Stitch Rate Limiting (CDP-118972) + +The `batch-stitch-pipeline` Map state uses `MaxConcurrency=2` and staggers `AddJobFlowSteps` submissions by `Map.Item.Index * 5s` to avoid EMR API throttling when many vendors submit steps to the same cluster simultaneously. + +## AWS Step Functions: Converting JSONPath to JSONata + +### Top-Level Changes + +Add `"QueryLanguage": "JSONata"` at the state machine or state level. + +### Field Replacements + +JSONPath's five fields are replaced with two in JSONata: +- **Arguments** - Replaces `Parameters` for sending data to integrated actions +- **Output** - Replaces `ResultPath`, `ResultSelector`, and `OutputPath` for transforming state output + +### JSONPath Expression Syntax + +- **JSONPath**: `"field.$": "$.path.to.value"` +- **JSONata**: `"field": "{% $states.input.path.to.value %}"` + +Remove the `.$` suffix and wrap expressions in `{% %}` delimiters. + +### ResultPath Conversions + +**Merge result into input at a path:** +```json +// JSONPath +"ResultPath": "$.ClusterCreationResult" + +// JSONata +"Output": "{% $merge([$states.input, {'ClusterCreationResult': $states.result}]) %}" +``` + +**Discard result, pass through input only:** +```json +// JSONPath +"ResultPath": null + +// JSONata +"Output": "{% $states.input %}" +``` + +**Replace entire input with result (default):** +```json +// JSONPath +"ResultPath": "$" // or omit ResultPath + +// JSONata +// Omit Output field, or use: +"Output": "{% $states.result %}" +``` + +### OutputPath Conversions + +**Extract specific field from result:** +```json +// JSONPath +"OutputPath": "$.Payload" + +// JSONata +"Output": "{% $states.result.Payload %}" +``` + +### Catch Block Error Handling + +**Merge error into input:** +```json +// JSONPath +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "ResultPath": "$.Error", + "Next": "HandleError" + } +] + +// JSONata +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Output": "{% $merge([$states.input, {'Error': $states.errorOutput}]) %}", + "Next": "HandleError" + } +] +``` + +Note: `$states.errorOutput` is only available in Catch blocks. + +### Map State Conversions + +**ItemsPath:** +```json +// JSONPath +"ItemsPath": "$.step_configs" + +// JSONata +"Items": "{% $states.input.step_configs %}" +``` + +**ItemSelector:** +```json +// JSONPath +"ItemSelector": { + "step_config.$": "$$.Map.Item.Value", + "clusterId.$": "$.ClusterCreationResult.ClusterId" +} + +// JSONata +"ItemSelector": { + "step_config": "{% $states.context.Map.Item.Value %}", + "clusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" +} +``` + +**IMPORTANT:** Use `$states.context.Map.Item.Value` to access the current item in a Map state. There is **NO** `$states.item` variable - this is a common mistake that will cause "Field '$states.item' does not exist" errors. + +**⚠️ CRITICAL: Map States Must Preserve Input** + +By default, a Map state outputs an **array of results** from processing each item, which **loses the original input fields**. If subsequent states need access to fields from the original input (like a ClusterId from cluster creation), you MUST add an Output field to preserve the input. + +```json +// WRONG - This loses ClusterCreationResult from input +"Step Map": { + "Type": "Map", + "ItemSelector": { + "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" + }, + "Items": "{% $states.input.step_configs %}", + "Next": "TerminateCluster" // ❌ Will fail - ClusterCreationResult is lost! +} + +// CORRECT - Preserve input for next state +"Step Map": { + "Type": "Map", + "ItemSelector": { + "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" + }, + "Items": "{% $states.input.step_configs %}", + "Output": "{% $states.input %}", // ✅ Preserves all input fields + "Next": "TerminateCluster" +} +``` + +**Common Error:** +``` +The JSONata expression '$states.input.ClusterCreationResult.ClusterId' specified for the field 'Arguments/ClusterId' returned nothing (undefined). +``` + +This error means the Map state didn't preserve the input, so subsequent states can't access fields that were in the original input. + +### Array Construction + +**Intrinsic function to array literal:** +```json +// JSONPath +"Args.$": "States.Array('cmd', '--flag', $.input.value, '--opt', $.input.other)" + +// JSONata - Use regular JSON array with individual JSONata expressions +"Args": [ + "cmd", + "--flag", + "{% $states.input.value %}", + "--opt", + "{% $states.input.other %}" +] +``` + +### Reserved Variables in JSONata + +- `$states.input` - Original input to the current state +- `$states.result` - Result from API/sub-workflow (Task, Parallel, Map states) +- `$states.errorOutput` - Error output (only in Catch blocks) +- `$states.context` - Execution metadata (StartTime, task token, Map.Item.Value, etc.) + - `$states.context.Map.Item.Value` - Current item in Map state iteration + - `$states.context.Map.Item.Index` - Current iteration index in Map state + +### ⚠️ Common Pitfall: $states.item Does NOT Exist + +**Error:** `Field '$states.item' does not exist` + +**WRONG:** +```json +"ItemSelector": { + "item": "{% $states.item %}" // ❌ This will fail! +} +``` + +**CORRECT:** +```json +"ItemSelector": { + "item": "{% $states.context.Map.Item.Value %}" // ✅ Use this instead +} +``` + +Many online examples and AI suggestions incorrectly reference `$states.item`, but this variable does not exist in AWS Step Functions JSONata. Always use `$states.context.Map.Item.Value` to access the current Map iteration item. + +### Key Differences from JSONPath + +1. **No `.$` suffix** - Regular field names with JSONata expressions in `{% %}` +2. **No Path fields** - InputPath, ResultPath, OutputPath are replaced by Arguments and Output +3. **Assign vs Output** - `Assign` creates variables accessible in all future states; `Output` only affects the immediate next state +4. **Parallel processing** - Assign and Output are processed in parallel; variable assignments don't affect Output + +### Common Patterns + +**Pass through input unchanged (Task states):** +- Always set `"Output": "{% $states.input %}"` explicitly — omitting `Output` on a Task state causes the Task's result to replace the entire input, losing all previous data. + +**IMPORTANT:** Any Task state that needs to pass data to subsequent states must preserve the input explicitly with `"Output": "{% $states.input %}"`. Without it, the Task's result replaces the entire input. + +**Common scenarios requiring input preservation:** + +1. **After Catch blocks** - When error info was merged into input +2. **Cleanup/termination tasks** - When subsequent states need original context (e.g., Lambda functions that publish metrics need delivery_config) +3. **Between pipeline stages** - When chaining multiple operations that all need access to the original request + +```json +// Example 1: Catch block merges error into input +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Output": "{% $merge([$states.input, {'Error': $states.errorOutput}]) %}", + "Next": "CleanupTask" + } +] + +// The CleanupTask MUST preserve input if Error is needed later +"CleanupTask": { + "Type": "Task", + "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster", + "Arguments": { "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" }, + "Output": "{% $states.input %}", // ✅ Preserves Error field + "Next": "NotifyFailure" +} + +// Example 2: EMR termination before Lambda that needs delivery config +"EMR TerminateCluster Success": { + "Type": "Task", + "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster", + "Arguments": { "ClusterId": "{% $states.input.ClusterCreationResult.ClusterId %}" }, + "Output": "{% $states.input %}", // ✅ Lambda needs delivery_config from input + "Next": "Publish Metrics" +} +``` + +**Add result as new field while preserving input:** +```jsonata +"Output": "{% $merge([$states.input, {'resultField': $states.result}]) %}" +``` + +**Transform and extract specific data:** +```jsonata +"Output": { + "field1": "{% $states.input.someValue %}", + "field2": "{% $states.result.someOtherValue %}" +} +``` + +**Create variables for use in later states:** +```jsonata +"Assign": { + "myVariable": "{% $states.input.someValue %}" +} +// Later states can reference {% $myVariable %} +``` + +### Reference Documentation + +- [AWS Step Functions - Transforming data with JSONata](https://docs.aws.amazon.com/step-functions/latest/dg/transforming-data.html) +- [AWS Step Functions - Passing data between states with variables](https://docs.aws.amazon.com/step-functions/latest/dg/workflow-variables.html) diff --git a/claude-md-updates/core-data-pipelines-spark/CLAUDE.md b/claude-md-updates/core-data-pipelines-spark/CLAUDE.md new file mode 100644 index 0000000..07cefd3 --- /dev/null +++ b/claude-md-updates/core-data-pipelines-spark/CLAUDE.md @@ -0,0 +1,68 @@ +# core-data-pipelines-spark + +Spark 3.5 / Scala 2.12 / JDK 17 upgrade of `core-data-pipeline`. Runs on EMR 7.12. + +## Build + +**JDK 17 is required for all builds and tests.** + +```bash +export JAVA_HOME=/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home +sbt compile # compile +sbt test # ~210+ tests across all suites +sbt assembly # fat jar for EMR +``` + +## Key Paths + +- Spark apps: `src/main/scala/com/resonate/spark/apps/` +- Shared utils: `src/main/scala/com/resonate/spark/utils/` +- UDAFs / UDFs: `src/main/scala/com/resonate/spark/functions/` +- Tests: `src/test/scala/com/resonate/spark/` + +## Spark Apps + +| App | Package | Notes | +|---|---|---| +| `SovrnLogMetrics` | `sampling` | Computes Sovrn overlap densities; migrated to Spark 3. Null-safe `meanDensities` helper (see below). | +| `CookieJarSampler` | `sampling` | **@deprecated** — `cookiejar-sample-export` pipeline decommissioned (Havas no longer a client). Code retained for reference only. | +| `TopicTagCookiejar` | `topictagmetrics` | Topic-tag sketch for cookiejar populations. | +| `TopicTagSketch` | `topictagmetrics` | Topic-tag sketch (ported from das-pipeline). | +| `TopicTagMetricsParcelsUpload` | `topictagmetrics` | Uploads tag parcels; uses 4-arg flat parcel layout (reverted from 6-arg in PR #28). | +| `BitmapSketch` | `bitmap` | Bitmap-backed sketch UDAFs. | + +## Testing Patterns + +- Tests use `QueryTest with SharedSparkSession` (Spark's built-in test framework) +- UDF tests: use companion objects to avoid serialization issues +- Spark 3 requires `spark.sql.legacy.allowUntypedScalaUDF=true` for old-style UDFs +- `from_unixtime` uses local timezone — use midday timestamps in tests +- `trim()` only strips spaces (0x20), not tabs +- Pre-existing flake in `CookieJarSamplerTest` (`should maintain correct counts after sampling`) — RNG tolerance issue, unrelated to any current changes, does not block merge + +## Dependencies + +- `das-expression-0.6.jar` — pure Java, JDK 8 bytecode, works on JDK 17 as-is. Provides `BitmapUtils` and `AudienceEvaluator`. +- `influxdb-java` — replaced `reactiveinflux_2.11`. Wrapper at `com.resonate.spark.utils.InfluxDBWriter`. +- Jackson pinned at 2.15.2 to match Spark 3.5.4 bundled version. + +## Sovrn Spark 3 Notes (CDP-118269) + +`SovrnLogMetrics` required two fixes for Spark 3 compatibility: + +1. **Null-safe `meanDensities` helper** — In Spark 3, `DataFrame.summary("mean")` returns `null` for empty groups (Spark 2 returned `"NaN"`). The fix extracts the conversion into `meanDensities(mean: Row)` which maps `null → Double.NaN`. + +2. **Empty-group density filter** — Drop zero-density groups before writing to InfluxDB to avoid downstream metric pollution. + +## Rules + +- **Tests first, then upgrade**: Any workflow must have unit + integration tests BEFORE deploying. +- **JDK 17 only**: Do not build or test with JDK 8. +- **Spot instances**: Every EMR workflow must use spot for core fleet with capacity-optimized allocation and on-demand fallback. +- **No hardcoded credentials**: `ParcelManager` had a dead branch with hardcoded IAM credentials — it was removed (security fix). Never add IAM keys to source code. +- **CookieJarSampler is deprecated**: Do not add new features or fix non-critical bugs in `CookieJarSampler`. It is kept for historical reference only. + +## Related Repos + +- `step-function-workflow-orchestrator` — EMR configs, step functions, deployment +- `core-data-pipeline` — original Spark 2.4 / Scala 2.11 repo (do not modify) diff --git a/claude-md-updates/identity-graph/CLAUDE.md b/claude-md-updates/identity-graph/CLAUDE.md new file mode 100644 index 0000000..e3ef8bf --- /dev/null +++ b/claude-md-updates/identity-graph/CLAUDE.md @@ -0,0 +1,200 @@ +# identity-graph + +## Project Purpose and Architecture Overview + +This repo is the home of **PRISM** (Person Resolution and Identity System Map) — Resonate's person-centric identity graph. It contains two distinct sub-systems: + +1. **Scala/Spark pipeline jobs** — batch EMR jobs that build the identity graph from raw L2, Experian/Tapad, LiveIntent, LiveRamp, Sovrn, and customer data. +2. **`prism_dbt` v1.0** — a Snowflake-native dbt package exposing consumer-facing macros for identity resolution (waterfall match, identifier expand, persons project, lookup). + +**Top-level layout:** + +``` +src/ + main/scala/com/resonate/ + identity/ # Spark pipeline jobs (11 jobs) + ExperianPreprocessJob + L2ConsumerPreprocessJob + TapadIndexJob + L2IndexJob + L2ConsumerIndexJob + OnboardingMatchJob + OnboardingHouseholdLinksJob + OnboardingPersonsJob # deprecated (BL-106) + MergeOnboardingLinksJob + PersonIdentityJob + CustomerMatchJob + shared/ # Shared Spark utilities + HashUtils # SHA256-based person_id / household_id + StagingWriter # cache/count/write/unpersist pattern + AddressNormalizer # USPS Pub 28, nickname chains, soundex + IpFilter # RFC1918 + cloud IP blocklist + ScoringConfig # confidence scoring, recency decay + test/scala/com/resonate/ # 443 unit tests across 20 test suites +dbt/ + prism_dbt/ # dbt package (consumer-facing) + macros/ # 4 primitive macros + internal helpers + waterfall_match.sql + identifier_expand.sql + persons_project.sql + lookup.sql + _internal/ # routing, validation helpers + models/ + service/ # 3 Lambda-invokable service models + waterfall.sql + identifier_expand.sql + persons_project.sql + seeds/ # Test seed data + tests/ # dbt unit_tests framework tests (33 tests) + snowpark_udf/ + NAME_ADDRESS_HASH/ # Canonical name+address → person_id hash UDF + snowpark_stored_procedures/ + PRISM_LOOKUP/ # Single-row debug lookup SP + NAME_ADDRESS_LOOKUP/ # Resolve by raw PII SP +docs/ + design/ # PRISM design + project plan HTML artifacts + index.html # Documentation hub + design_and_production_readiness.html + consumer_interface_design.html + project_plan.html + raw_dataset_columns.html + id_path_explorer.html + scripts/ + build_columns_page.py # Regenerates raw_dataset_columns.html +.github/ + workflows/ + snowflake_dbt.yml # Deploy prism_dbt via `snow dbt deploy` + snowflake_stored_procedure.yml # Deploy PRISM_LOOKUP + NAME_ADDRESS_LOOKUP SPs + snowflake_function.yml # Deploy NAME_ADDRESS_HASH UDF + ci.yml # Run sbt tests on PRs +``` + +--- + +## Key Commands + +### Scala/Spark (Identity Pipeline Jobs) + +**JDK 17 is required.** + +```bash +export JAVA_HOME=/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home + +sbt compile # compile +sbt test # 443 unit tests, 20 suites +sbt assembly # fat jar for EMR (output: target/identity-graph.jar) +``` + +**Deploy JAR to S3:** +```bash +aws sso login +aws s3 cp target/identity-graph.jar \ + s3://resonate-core-applications/identity-graph/jars/identity-graph-latest.jar +``` + +### dbt (prism_dbt Package) + +```bash +# Install Snowflake CLI (snow) +# See https://docs.snowflake.com/en/developer-guide/snowflake-cli + +# Validate dbt models (no connection needed) +cd dbt/prism_dbt +dbt parse + +# Run all tests against RESONATE-DEV +dbt seed && dbt run && dbt test + +# Run specific test +dbt test --select test_waterfall_match +``` + +**Note:** All 33 dbt tests must pass against dev Snowflake before merging to main. The CI workflow runs `dbt parse` as a gate; full `dbt test` runs are manual against RESONATE-DEV. + +### GitHub Actions Workflows + +| Workflow | Trigger | What it does | +|---|---|---| +| `ci.yml` | PR to main | `sbt test` — runs all 443 Scala unit tests | +| `snowflake_dbt.yml` | Manual | `snow dbt deploy` — deploys prism_dbt package to Snowflake | +| `snowflake_stored_procedure.yml` | Manual | Deploys PRISM_LOOKUP + NAME_ADDRESS_LOOKUP SPs | +| `snowflake_function.yml` | Manual | Deploys NAME_ADDRESS_HASH UDF | + +Workflows substitute `__PRISM_DB__` / `__PRISM_SCHEMA__` placeholders at deploy time. + +--- + +## Spark Jobs: Key Concepts + +### Data Flow + +``` +Raw sources (S3) + ├── Experian ConsumerView + digital graph → ExperianPreprocessJob + ├── L2 consumer tab-delimited → L2ConsumerPreprocessJob + └── ... + ↓ + identifier_index (Parquet, partitioned by identifier_type) + persons (Parquet — person_id, household_id, attributes) + ↓ + OnboardingMatchJob → 4-phase match (blocking, corroboration, scoring, fallback) + MergeOnboardingLinksJob → BL-30 multi-provider collapse + BL-4/BL-107 confidence scoring + PersonIdentityJob → main orchestrator + CustomerMatchJob → ephemeral 7-tier customer file matching +``` + +### identifier_index Schema +`(identifier_type, identifier_value, person_id, household_id, confidence, last_seen, run_date)` + +Partitioned by `identifier_type`. Supported types: `HEM_MD5`, `HEM_SHA1`, `HEM_SHA256`, `MAID`, `ZIP11`, `IP_ADDRESS`, `HEM_INDIRECT`, `MAID_INDIRECT`, `HOUSEHOLD_ID`. + +### Important Bug Fixes Already Applied + +- **L2IndexJob HOUSEHOLD_ID null guard** — `filterNulls=true` prevents phantom household collapse. +- **OnboardingMatchJob Phase 3 tie-breaker** — `person_id.asc` added to `selectBestMatch` window for determinism. +- **ExperianPreprocessJob** — `min(id_value)` instead of `first(id_value)` for deterministic aggregation. +- **MergeOnboardingLinksJob / PersonIdentityJob** — null `identifier_value` filter before `groupBy`. +- **AddressNormalizer nickname chains** — iterative lookup (max 5 hops) for transitive nicknames. + +--- + +## prism_dbt: Key Concepts + +### 4 Primitive Macros (Consumer-Facing) + +| Macro | Purpose | +|---|---| +| `waterfall_match(waterfall_order, input_column_aliases)` | Multi-step waterfall resolution: for each step in `waterfall_order`, looks up identifier in `identifier_index`, returns matched `person_id` + confidence. Routes ZIP11 to `persons.zip11`, all other types to `identifier_index`. | +| `identifier_expand(identifier_type, input_col)` | Expands one identifier type to all associated person_ids. Single identifier_type per call. | +| `persons_project(person_ids, output_columns)` | Projects person attributes from the `persons` table for a set of person_ids. | +| `lookup(identifier_type, identifier_value)` | Single-identifier debug/point lookup. Non-deterministic on confidence ties is broken by confidence + person_id. | + +### Service Models (Lambda-Invokable) + +Three Snowflake-native service models under `models/service/` are invokable via `EXECUTE DBT PROJECT … --select --vars ''`. Each wraps a primitive macro for Lambda-style invocation by Append, BlockGraph, Cortex, etc. + +### Consumer Integration (Append v1.0) + +**Supported identifier types for Append cutover:** +- `HEM_MD5`, `HEM_SHA1`, `HEM_SHA256` — HEM type selected from sibling `HEM_TYPE` column +- `MAID` — single type + +**Not in default waterfall (v1.0):** +- `ZIP11` — supported by macros (routes to `persons.zip11`), but not in default waterfall until WU-26 lands +- `ip_address`, `hem_indirect`, `maid_indirect` — opt-in or obsoleted + +### NAME_ADDRESS_HASH UDF + +Canonical name+address → person_id hash recipe. Deployed to `RESONATE.PRISM`. Does **not** do canonical first-name lookup (`Bill` ≠ `William`) or address-abbreviation expansion in v1.0 — tracked for future versions. + +--- + +## Project-Specific Rules and Gotchas + +- **dbt `dbt_project.yml` schema:** Consumers declare their own `waterfall_order` and `input_column_aliases`. Zero changes to `prism_dbt` are needed for new consumers — just add `prism_dbt` to their `packages.yml`. +- **Spark 3 / JDK 17:** Tests require `spark.sql.legacy.allowUntypedScalaUDF=true`. The `--add-opens` JVM flags in `build.sbt` are conditional on JDK 9+ to avoid warnings. +- **StagingWriter pattern:** All heavy Spark jobs use the `StagingWriter` cache/count/write/unpersist pattern to avoid OOM on large datasets. Do not skip the `unpersist` call. +- **PRISM_LOOKUP SP:** Single-row debug tool only — not intended for batch use. +- **Snowflake deploy:** Workflows substitute `__PRISM_DB__` / `__PRISM_SCHEMA__` at deploy time. Local testing uses `RESONATE-DEV` database. +- **Design docs:** HTML files under `docs/design/` are the canonical source of truth and are published to `s3://s3.nonprod.aws.resonatedigital.net/person-identity-graph/`. To regenerate `raw_dataset_columns.html`, run `docs/scripts/build_columns_page.py` against live source headers. +- **Raw source samples:** Do NOT commit raw L2/Experian header rows from prod buckets. The `docs/README.md` explicitly warns about this. diff --git a/claude-md-updates/step-function-workflow-orchestrator/CLAUDE.md b/claude-md-updates/step-function-workflow-orchestrator/CLAUDE.md new file mode 100644 index 0000000..a83e252 --- /dev/null +++ b/claude-md-updates/step-function-workflow-orchestrator/CLAUDE.md @@ -0,0 +1,220 @@ +# step-function-workflow-orchestrator + +## Project Purpose and Architecture Overview + +This repo contains the X-Men team's AWS Step Function pipelines and the supporting Lambda functions that power the Connected Profiles data platform. It is **not** a single application — it is a collection of ~50 independent pipelines (state machines) and ~30 Lambda functions grouped by runtime. + +**Top-level layout:** + +``` +pipelines/ # Step Function definitions — one subdir per pipeline + / + statemachine/ # ASL JSON state machine definition + config/ # EMR cluster configs + per-environment params.json + events/ # EventBridge event payloads per environment + tests/ # Unit + integration tests +terraform/ + pipelines/ # Terragrunt config for deploying step functions + //terragrunt.hcl + python_lambdas/ # Terragrunt config for Python lambdas + java_lambdas/ # Terragrunt config for Java lambdas + javascript-lambdas/ + lambda.hcl # Shared lambda Terragrunt root config + step_function.hcl # Shared step function Terragrunt root config +python_lambdas/ # Python Lambda source (one subdir per function) +java_lambdas/ # Java Lambda source (dos-lambdas, metrics-formatter) +javascript-lambdas/ # Node.js Lambda source (human-approval-*) +scripts/ # Operational helper scripts (Python, not deployed) +``` + +**Infrastructure accounts:** +- Non-prod: default AWS account (no `--profile` flag needed) +- Prod: `arn:aws:iam:::role/ProdTerraform` (assumed via `role_arn` in `step_function.hcl` — see the actual value in `terraform/step_function.hcl`) + +**State bucket:** `resonate-terraforming-state` (non-prod) / `resonate-terraforming-state-prod` (prod) +**State key prefix:** `environments/mgmt/lambdas/x_men/` (lambdas) or `environments/mgmt/lambdas/x_men|agents_of_shield/` (step functions) + +--- + +## Key Commands + +### Python Lambdas + +All Python lambdas live under `python_lambdas//`. Dependencies are per-lambda `requirements.txt` files. The workspace-level dev tooling uses `uv` (see `python_lambdas/pyproject.toml`). + +```bash +# Install dev deps (from python_lambdas/) +uv sync + +# Run unit tests for a specific lambda +cd python_lambdas/ +pytest tests/ + +# Run unit tests across all Python lambdas (from python_lambdas/) +pytest + +# Lint +flake8 python_lambdas/ + +# Build/run a single lambda locally with SAM +cd python_lambdas/ +sam build --use-container +sam local invoke --event events/event.json +``` + +### Java Lambdas + +```bash +# Build dos-lambdas fat JAR (shadow JAR) +cd java_lambdas/dos-lambdas +./gradlew shadowJar + +# Build metrics-formatter fat JAR +cd java_lambdas/metrics-formatter +./gradlew shadowJar + +# Run tests +./gradlew test +``` + +### JavaScript Lambdas + +```bash +# No build step — plain Node.js +cd javascript-lambdas/human-approval-emailer +npm install + +cd javascript-lambdas/human-approval-callback +npm install +``` + +### Integration Tests (Step Functions) + +Integration tests start the real Step Function in AWS and poll to completion. They require AWS credentials for the dev/integration environment. + +```bash +cd pipelines//tests/integration +pytest --state-machine-name -integration +# e.g. +pytest --state-machine-name behavior-stitch-integration +pytest --state-machine-name marketops-overlap-integration +pytest --state-machine-name segment-aggregator-integration +pytest --state-machine-name total-sketch-first-party-integration +``` + +### Operational Scripts + +```bash +# Generate environment-specific configs from integration config (run from scripts/) +python3 generate_environment_configs.py [path-to-pipelines] +# e.g. +python3 generate_environment_configs.py dev6 + +# Mark DOS unprocessed files as processed (recovery helper, run from scripts/) +python3 mark_dos_unprocessed_as_processed.py \ + --onboard_path s3://... \ + --completion_status_path s3://... +``` + +--- + +## Terraform / Infrastructure Specifics + +All deployments go through **GitHub Actions** — there are no local `terragrunt apply` commands in normal workflow. Use the Actions UI (or `workflow_dispatch`) for all deploys. + +### GitHub Actions Workflows + +| Workflow | Trigger | What it deploys | +|---|---|---| +| `step_function.yml` | push to `pipelines/**` or manual | Step Functions via `terraform/pipelines///` | +| `python_lambda.yml` | push to `python_lambdas/**` or manual | Python Lambdas via `terraform/python_lambdas///` | +| `java_lambda.yml` | push to `java_lambdas/**` or manual | Java Lambdas via `terraform/java_lambdas///` | +| `nodejs_lambda.yml` | push to `javascript-lambdas/**` or manual | JS Lambdas via `terraform/javascript-lambdas///` | +| `unzip-copy.yml` | push to `common/unzip-copy/**` or manual | Docker image → ECR → ECS task definition | + +**Supported environments by workflow:** +- `step_function.yml`: `dev`, `dev2`–`dev9`, `integration`, `qa`, `prod` +- `python_lambda.yml`: `dev`, `dev2`–`dev9`, `nonprod`, `qa`, `prod` +- `java_lambda.yml` / `nodejs_lambda.yml`: varies per pipeline's deployed env folders +- `unzip-copy.yml`: `dev`, `prod` only + +### Terragrunt Module Sources + +- **Step functions:** `git::ssh://github.com/resonate/resonate-terraform.git//modules/resources/step_function` +- **Lambdas:** `git::ssh://github.com/resonate/resonate-terraform.git//modules/resources/lambda` +- Terraform version pinned to **1.5.7** in all workflows. + +### Pipeline Config Layout + +Each pipeline has per-environment configs at `pipelines//config//params.json` and shared configs at `pipelines//config/emr.json`, `emr-on-demand.json`, `steps.json`. The `generate_environment_configs.py` script derives `dev6` (or other dev variants) configs from the `integration` config by rewriting S3 bucket suffixes. + +### Lambda ARN Injection + +State machine ASL files use template variables like `${DynamicDatesFunctionArn}`. These are resolved at deploy time via `lambda_replacement_vars` in each pipeline's `terragrunt.hcl`. + +### unzip-copy is Different + +`python_lambdas/unzip-copy` is deployed as a **Docker container to ECS** (not as a Lambda ZIP). It has its own `Dockerfile`, `taskdef-dev.json` / `taskdef-prod.json`, and a dedicated `unzip-copy.yml` workflow. + +--- + +## EMR 7.12 Migration (CDP-118269) + +Several pipelines are being migrated from EMR 5.34 (Spark 2) to EMR 7.12 (Spark 3). As of June 2026, the following have been migrated to `emr-7.12.0`: + +| Pipeline | Status | Notes | +|---|---|---| +| `behavior-stitch` | Migrated | YARN vcore fix required (`yarn.nodemanager.resource.cpu-vcores=16`) | +| `topic-aggregation` | Migrated | Use `--master yarn` for Spark 3 | +| `idsync-overlap` | Migrated | | +| `marketops-overlap` | Migrated | | +| `sovrn-weekly` | Migrated | | +| `maid-onboarder` | Migrated | | +| `damlam-preparation` | Migrated | | +| `sovrn-overlap` | Migrated | Null-safe `meanDensities` fix required (see core-data-pipelines-spark) | + +**JAR naming convention:** Dev builds use `-dev` bucket suffix (e.g. `core-data-pipelines-spark-dev-latest.jar`); prod uses `core-data-pipeline-spark3-latest.jar`. Do **not** point dev configs at the prod bucket. + +--- + +## Key Python Lambdas + +| Lambda | Purpose | +|---|---| +| `check-source-freshness` | Config-driven freshness check for 6 data sources across 3 S3 buckets. Two detection strategies: `timestamp_compare` (S3 LastModified) and `latest_directory_vs_ssm` (YYYYMMDD dirs vs SSM last-run). Adding a new source requires only a JSON config entry — no code change. | +| `dynamic-dates` | Computes run-date windows for pipeline schedules | +| `human-approval-emailer` / `human-approval-callback` | Step Function task token approval workflow via email | + +--- + +## Decommissioned Pipelines (Do Not Re-Deploy) + +The following pipelines have been intentionally removed from this repo. Their Step Function infrastructure, EventBridge schedules, and IAM roles should be torn down in AWS before or after the PR that removes them: + +| Pipeline | Removed In | Reason | +|---|---|---| +| `fusion-behavior-preprocess` (deployed as `sovrn-identity-daily`) | PR #734 | Dormant since 2023; accidentally deployed 2026-05-28; failed on EC2 capacity; confirmed not needed | +| `cookiejar-sample-export` | PR #748 | Havas is no longer a client; pipeline was dormant and accidentally re-deployed alongside infra changes | + +The **pipeline code** for `cookiejar-sample-export` is retained under `pipelines/cookiejar-sample-export/` with a `DEPRECATED.md` for historical reference. The `CookieJarSampler` Scala app in `core-data-pipelines-spark` is similarly marked `@deprecated`. + +--- + +## Experian Pipeline Notes + +The Experian pipeline (`experian-data-processing`) reads from `s3://resonate-experian-rld/` (Resonate's own bucket). The previous source (`s3://tapad-resonate/`) required cross-account `AssumeRole` which caused cascading IAM issues and was removed (PR #641). No `role_arn` / `SourceRoleArn` is needed for the Resonate-owned bucket. + +--- + +## Project-Specific Rules and Gotchas + +- **Never run `terragrunt apply` locally** — all infra changes must go through GitHub Actions CI/CD. +- **Config drift in dev variants:** When creating a new dev environment (e.g. `dev7`), run `scripts/generate_environment_configs.py dev7` to generate correct S3 bucket paths — do not hand-edit `params.json` copies. +- **S3 bucket naming convention:** Non-prod buckets append `-` (e.g. `resonate-core-datasets-dev`). The `generate_environment_configs.py` script handles rewriting these automatically. +- **Lambda ARN template variables** (`${XxxFunctionArn}`) in ASL JSON are **not** real ARNs — they are substituted by Terraform at deploy time using `lambda_replacement_vars` in terragrunt.hcl. +- **Prod-only step function scheduling:** Step functions are only enabled (`is_enabled = true`) in prod environments; all other environments deploy the state machine but leave the EventBridge schedule disabled. +- **Python runtime:** Local dev tooling requires Python 3.13+ (enforced by `python_lambdas/pyproject.toml`). The AWS runtime deployed per-lambda varies: currently `python3.7`, `python3.9`, `python3.11`, `python3.12`, or `python3.13` depending on the function — check the relevant `terraform/python_lambdas//terragrunt.hcl`. Dev tooling uses `uv`; individual lambdas declare their own `requirements.txt`. +- **Java lambdas use shadow JARs:** The Gradle `shadowJar` task produces the fat JAR uploaded to Lambda. Do not use the plain `jar` task. +- **Integration tests are long-running:** The behavior-stitch integration test has a 2-hour timeout (`EXECUTION_TIMEOUT = 7200`). Do not run locally unless you intend to wait. +- **SNS topics for failure/pass alerts:** Pipelines use `sns_topic_replacement_vars` in each environment's `terragrunt.hcl` to inject `TopicFailArn`, `TopicPassArn`, and (where applicable) `TopicWarnArn`. Topic names are environment-specific: dev environments typically use `datapipeline-fail-qa-topic` / `datapipeline-pass-qa-topic`, dev6 uses `datapipeline-fail-dev6-topic` / `datapipeline-pass-dev6-topic`, and prod pipelines use pipeline-specific topics (e.g. `dos-datapipeline-fail-prod-topic`, `modeling-team-pagerduty`). These SNS topics are managed outside this repo. +- **EMR 7.12 dev jars:** When writing dev configs for EMR 7.12 pipelines, the jar must point to the `-dev` S3 bucket, not the prod bucket. The convention is `s3://resonate-core-datasets-dev/...` — never `s3://resonate-core-datasets/...` for dev/integration configs.