Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions claude-md-proposals/batch-expression-modeling.CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# CLAUDE.md update — resonate/batch-expression-modeling
# Append the following section to the END of the existing CLAUDE.md

---

## Vendor Configuration (`vendor-config.ini`) — Recent Additions

File: `workflows/lambdas/batch-audience-delivery-config/vendor-config.ini`

### New Config Keys

| Key | Default | Purpose |
|---|---|---|
| `stitch_columns` | — | Comma-separated multi-column stitch (use instead of `stitch_column` for vendors that join on multiple fields, e.g. BlockGraph address). |
| `audience_bitmap_path` | `tagav` | Bitmap source for audience evaluation. Set to `person_jar` for person-keyed (RID) vendors like BlockGraph. |

All existing single-column vendors continue to use `stitch_column` (singular). `get_audience_bitmap_path(delivery_type)` returns the configured source (default `tagav`).

### BlockGraph Delivery (CDP-118913)

BlockGraph is **person-keyed (RID)**, not cookie-keyed (RCID). Two new sections in `vendor-config.ini`:

- `[blockgraph_syndicated]` — syndication delivery via normalized RID→ADDRESS table
- `[blockgraph_custom]` — custom delivery, same stitch shape

Key differences from cookie-keyed vendors:
- `stitch_columns = norm_address_line,norm_city,norm_state,norm_zip,zip_plus4` — normalized address stitch (5 columns)
- `stitch_table_name = person_identity_graph_beta` — beta RID→ADDRESS table
- `audience_bitmap_path = person_jar` — evaluates audiences against the personJar bitmap (not tagav)
- `person_jar_path` is supplied per-run in the delivery event (`delivery_config.person_jar_path`)

### Formatter Output Path Layout Change (CDP-118857 / CDP-118937)

The batch-delivery-formatter flipped partition order from `date=*/method=av/vendor=*/akey=*` to `date=*/vendor=*/method=*/akey=*`. All downstream consumer state machines were updated accordingly:
- `batch-audience-custom-delivery` (#256)
- `batch-audience-delivery-syndication` (yahoo, experian, openx/viant variants)

If you see `No files were able to be copied` errors after a formatter run, check that the consumer ASL's `source_prefix` uses `vendor=*/method=*` order.

### Formatter Metrics Lambda (CDP-118857)

`batch-delivery-formatter-pipeline` now invokes `batch-delivery-formatter-publish-metrics` Lambda after each successful EMR run. Lambda ARN injected via `PublishMetricsFunctionArn` in each pipeline's `terragrunt.hcl`.

Metrics published to InfluxDB:
- `com.resonate.delivery.format.count` — per-audience row counts
- `com.resonate.delivery.format.aggregate` — aggregate metrics (vendor + method + delivery_type tags, no audience_key)

### Batch Stitch Throttling (CDP-118972)

`batch-stitch-pipeline` uses `MaxConcurrency=2` on the stitch Map state to prevent bursting `AddJobFlowSteps` API calls. A stagger wait of `(Map.Item.Index × wait_seconds)` is inserted before each step submission.
112 changes: 112 additions & 0 deletions claude-md-proposals/dos-data-pipeline.CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# CLAUDE.md — resonate/dos-data-pipeline
# This is a NEW file to be created at the root of the dos-data-pipeline repo

# dos-data-pipeline

## Project Purpose and Architecture Overview

This repo contains the **DOS (Data Operations Systems) data pipeline** — Scala/Spark ETL jobs and configuration for geo-location enrichment, segment aggregation, and political district bitmap preparation for Resonate's audience platform.

**Key pipelines:**
- **geo-location** — Daily enrichment of RCIDs with geographic + political district data (congress, state-senate, state-house); full backfill for historical correction. Feeds `ToBitmap` which sets marker bits.
- **segment-aggregator** — Aggregates behavioral segment data.

**Deployment:** GitHub Actions. Config in this repo deploys directly. Other pipelines are activated by `datapipeline-trigger` in `lambda-modeling`; their config comes from `services-api-configuration`.

---

## Repository Structure

```
src/main/scala/com/resonate/dos/
GeoLocationDaily.scala # Daily geo enrichment: IP → zip → district + district_source
GeoLocationFullBackfill.scala # Backfills historical dates (daily-on-multi-day-pixels approach)
GeoLocationFull.scala # Merges daily+backfill into rolling-full
ToBitmap.scala # Converts geo-location output to bitmap; gates L2-confirmed bit
DistrictResolver.scala # Centralizes L2-vs-IP district coalescing + district_source derivation

src/test/scala/com/resonate/dos/
GeoLocationDailyTest.scala
GeoLocationFullBackfillTest.scala
ToBitmapTest.scala
...

config/geo-location/
zip-district-mappings/ # 4 CSV files (one per district type)
zip-congress-mapping.csv # 34,785 rows — L2 mode-per-zip, all 51 states
zip-proposed-congress-mapping.csv # 8,362 rows — 2026 redistricted (CA/MO/NC/OH/TX/UT/VA)
zip-state-senate-mapping.csv # 33,527 rows
zip-state-house-mapping.csv # 32,620 rows
```

**Scala version:** 2.11.8 | **Spark version:** 2.4.3 (Spark 2 — not yet migrated to Spark 3)

---

## Key Commands

```bash
# Run all unit tests
sbt test

# Run a specific test suite
sbt "testOnly com.resonate.dos.GeoLocationDailyTest"
sbt "testOnly com.resonate.dos.ToBitmapTest"

# Build assembly JAR
sbt assembly

# Deploy — push to main triggers GitHub Actions deployment pipeline
```

---

## Key Concepts

### district_source Provenance (CDP-118946 / CDP-118947)

Every RCID in geo-location output carries a `district_source` column set by `DistrictResolver`:

| Value | Meaning |
|---|---|
| `L2_CONFIRMED` | L2 has a district AND `l2_party_confirmed = true`. **Only this value sets the L2-confirmed marker bit in ToBitmap.** |
| `L2_UNCONFIRMED` | L2 has a district AND `l2_party_confirmed != true`. |
| `IP_INFERRED` | No L2 district; NetAcuity IP → zip → 4-CSV-mapping found a district. |
| `null` | No district data from any source. |

**Observed distribution (dev 2026-05-12):** IP_INFERRED ~86.8%, L2_CONFIRMED ~9.9%, L2_UNCONFIRMED ~2.9%, null ~0.4%.

**ToBitmap gating (CDP-118947):** The L2-confirmed marker bit is set **only** when `district_source = "L2_CONFIRMED"`. Before this fix (CDP-118947) the bit was set unconditionally (broken for L2_UNCONFIRMED and IP_INFERRED rows).

### Zip→District Mappings (4-file Shape, CDP-118946)

The district lookup uses 4 separate CSV files (one per district type) rather than a single consolidated CSV. Schema: `zip, state, <district-type>`. Files version together as a snapshot.

Passed to Spark jobs as a single `zipDistrictMappingsBasePath` argument pointing at the directory. `DistrictResolver` reads all 4 files by convention from that path.

Deployed to S3 via terragrunt `configurations` entries in the geo-location pipeline. After updating CSVs, deploy via the geo-location GitHub Actions workflow.

**IP-inferred fallback flow:** `GeoLocationDaily` calls NetAcuity to resolve the RCID's IP to a zip, then joins the 4 CSVs for the district. State-leg mappings are many-to-many and collapsed to 1:1 using rank-based dedup with `sort_array` for deterministic tiebreaking.

### GeoLocationFullBackfill Rewrite (CDP-118946)

Rewritten as **daily-on-multi-day-pixels**: applies daily geo enrichment logic to each date in the backfill range rather than re-reading the prior rolling-full. Key changes:

- No longer takes `DistrictColumnList` arg — always re-derives all 4 districts (prevents stale labels)
- Reads existing `zip` from `fullDf`; applies same IP-fallback as Daily (no second NetAcuity call)
- `GeoLocationFull.expectedCols` extended with `zip` and `district_source`
- Orchestrator passes the date range as a rendered glob to backfill

### Backfill `Should Run Full` Bug Fix (CDP-118512)

A bug in the geo-location state machine (`geo_location.asl.json` in `step-function-workflow-orchestrator`) caused a pre-existing geo full for today to override backfill output. Fixed by adding an `IsPresent` check on `$.FullInputPath` before the path-equality choice. If backfill output is not propagating to the full-build stage, check the `Should Run Full` choice state.

---

## Project-Specific Rules and Gotchas

- **Spark 2.4.3**: This repo is still on Spark 2. Do not apply Spark 3-specific optimizations (EMR 7 migration for this repo is not yet underway).
- **State-leg mappings are many-to-many**: Do not assume 1:1 zip→district. `DistrictResolver` handles collapsing with `sort_array` for deterministic hash-based tiebreaking.
- **CSV versioning**: The 4 zip-district CSVs version together as a snapshot. Updating one requires coordinated update of all 4 if the underlying data changes.
- **`GeoLocationFull.expectedCols`**: If you add a new column to `GeoLocationDaily`, also add it to `expectedCols` in `GeoLocationFull.scala` so it flows through rolling-full and backfill.
- **NetAcuity in Backfill**: `GeoLocationFullBackfill` does NOT call NetAcuity. It reads the `zip` column already resolved by `GeoLocationDaily` and applies the CSV-based district lookup.
181 changes: 181 additions & 0 deletions claude-md-proposals/identity-graph.CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# CLAUDE.md — resonate/identity-graph
# This is a NEW file to be created at the root of the identity-graph repo

# identity-graph

## Project Purpose and Architecture Overview

This repo contains the **Identity Graph** — Resonate's Person Resolution and Identity System (PRISM). It has two layers:

1. **Scala/Spark Jobs** (`src/main/scala/`) — 11 batch ETL jobs that build the underlying identity tables (identifier index, persons) from raw sources (L2, Experian, TapAd, LiveIntent, etc.)
2. **prism_dbt** (`dbt/prism_dbt/`) — PRISM's consumer-facing dbt package v1.0. Exposes four primitives (`waterfall_match`, `identifier_expand`, `persons_project`, `lookup`) as Lambda-invokable Snowflake service models.

**Production JAR path:** `s3://resonate-core-applications/identity-graph/jars/identity-graph-latest.jar`

**Design docs (published to S3):**
- `https://s3.nonprod.aws.resonatedigital.net/person-identity-graph/index.html` — hub
- `docs/design/design_and_production_readiness.html` — main architecture doc
- `docs/design/consumer_interface_design.html` — consumer integration (Append/BlockGraph/GeoFix/Cortex)
- `docs/design/project_plan.html` — 6 tracks, Jira ticket DAG

---

## Repository Structure

```
src/main/scala/com/resonate/spark/apps/
ExperianPreprocessJob.scala # Normalizes Experian ConsumerView + digital graph
L2ConsumerPreprocessJob.scala # Preprocesses L2 consumer tab-delimited files
TapadIndexJob.scala # Experian/TapAd → identifier_index
L2IndexJob.scala # L2 voter → identifier_index
L2ConsumerIndexJob.scala # L2 consumer → identifier_index
OnboardingMatchJob.scala # 4-phase matching (blocking, corroboration, scoring, fallback)
OnboardingHouseholdLinksJob.scala # Household CTV/TTD attribution via digital graph
OnboardingPersonsJob.scala # TapAd-only person records (deprecated: BL-106)
MergeOnboardingLinksJob.scala # Multi-provider collapse + confidence scoring
PersonIdentityJob.scala # Main orchestrator: persons + identifier_index
CustomerMatchJob.scala # Ephemeral customer file matching (7 fallback tiers)

src/main/scala/com/resonate/spark/utils/
HashUtils.scala # Deterministic SHA-256 person_id + household_id
StagingWriter.scala # Cache/count/write/unpersist pattern
AddressNormalizer.scala # USPS Pub 28 normalization + nickname resolution (max 5 hops)
IpFilter.scala # RFC1918 + cloud IP blocklist
ScoringConfig.scala # Centralized confidence scoring

dbt/prism_dbt/ # Consumer-facing dbt package (see dbt/prism_dbt/README.md)
macros/ # waterfall_match, identifier_expand, persons_project, lookup
models/service/ # Lambda-invokable: waterfall.sql, identifier_expand.sql, persons_project.sql
snowflake/
function/name_address_hash/ # NAME_ADDRESS_HASH Snowflake UDF
stored_procedure/prism_lookup/ # PRISM_LOOKUP SP
stored_procedure/name_address_lookup/ # NAME_ADDRESS_LOOKUP SP

docs/design/ # HTML design artifacts (published to S3)
docs/scripts/ # build_columns_page.py, sample_parquet.py

.github/workflows/
snowflake_dbt.yml # Deploy prism_dbt via snow dbt deploy
snowflake_stored_procedure.yml # Deploy PRISM_LOOKUP / NAME_ADDRESS_LOOKUP
snowflake_function.yml # Deploy NAME_ADDRESS_HASH UDF
```

---

## Key Commands

### Scala/Spark Jobs (SBT)

```bash
# Run all unit tests (443 tests across 20 suites)
sbt test

# Run a specific test suite
sbt "testOnly com.resonate.spark.apps.PersonIdentityJobSpec"
sbt "testOnly com.resonate.spark.apps.CustomerMatchJobSpec"

# Build assembly fat JAR for EMR deployment
sbt assembly

# Upload to S3 (prod)
aws s3 cp target/identity-graph-latest.jar \
s3://resonate-core-applications/identity-graph/jars/identity-graph-latest.jar
```

See the [deployment wiki](https://resonate-jira.atlassian.net/wiki/spaces/ASD/pages/3683057840/Pipeline+Scala+SBT+GitOps) for the GitOps deploy process.

### prism_dbt (Snowflake Native dbt Package)

```bash
cd dbt/prism_dbt

# Install dbt dependencies
dbt deps

# Local dev (requires ~/.dbt/profiles.yml — copy from profiles.yml.example)
dbt seed --profiles-dir <your-profiles-dir>
dbt run --select waterfall --profiles-dir <your-profiles-dir>
dbt test --profiles-dir <your-profiles-dir>

# Offline unit tests only (no Snowflake needed — fast)
dbt test --select test_type:unit # 16 tests

# All tests (includes live Snowflake singular tests against resonate-dev)
dbt test # 25 tests total
```

### Snowflake Deployments (GitHub Actions)

All Snowflake deployments go through GitHub Actions. Do not run `snow dbt deploy` manually from the CLI.

| Workflow | What it deploys |
|---|---|
| `snowflake_dbt.yml` | `prism_dbt` package via `snow dbt deploy` (runs `dbt parse` as fast-fail gate) |
| `snowflake_stored_procedure.yml` | `PRISM_LOOKUP` or `NAME_ADDRESS_LOOKUP` stored procedures |
| `snowflake_function.yml` | `NAME_ADDRESS_HASH` UDF |

---

## PRISM dbt Package (prism_dbt v1.0)

### The Four Primitives

| Macro | Direction | Purpose |
|---|---|---|
| `waterfall_match` | identifiers → 1 person_id per row | Resolve mixed-identifier customer rows to a canonical `person_id`. First-match-wins by priority. |
| `identifier_expand` | person_id → identifiers of ONE type | Fan out person_ids to deliverable identifiers. Call once per identifier type. |
| `persons_project` | person_id → persons attributes | Project selected `persons` columns (name, address, lalvoterid, …) onto person_ids. |
| `lookup` | one identifier → one person + linked IDs | Single-row debug. Also deployed as `PRISM_LOOKUP` stored procedure. |

### v1.0 Data Sources
- `PERSONS_LATEST` + `IDENTIFIER_INDEX_LATEST` (filtered to `identifier_rank = 1`)
- `CROSSWALK_LATEST` exists but is **NOT used** in v1.0 (missing metadata for confidence filtering)
- `ZIP11` routes automatically to `persons.zip11` (not `identifier_index`) via `_internal/lookup_strategy.sql`

### Service Model Invocation

```sql
EXECUTE DBT PROJECT RESONATE.PRISM.prism_dbt
ARGS = $$run --select waterfall --vars '{
"input_relation": "MY_DB.MY_SCHEMA.my_input",
"consumer": "append",
"job_run_id": "run_001",
"waterfall_order": [
{"identifier_type": "HEM_MD5", "stitch_label": "hem"},
{"identifier_type": "HEM_SHA256", "stitch_label": "hem"},
{"identifier_type": "MAID", "stitch_label": "maid"}
]
}'$$;
```

### NAME_ADDRESS_HASH UDF

Normalizes raw PII to a SHA-256 hash for name+address waterfall steps:

```sql
SELECT NAME_ADDRESS_HASH(first_name, last_name, address_line, zip) AS NAME_ADDRESS FROM my_input;
```

v1.0: `LOWER + TRIM + strip non-alphanumeric + LPAD zip + pipe-concat + SHA-256`. Does not yet do canonical-first-name resolution (Bill ≠ William — planned v1.1).

---

## Key Concepts and Bug Fixes in v1.0

- **ZIP11 routing**: `waterfall_match` auto-routes `ZIP11` steps to `persons.zip11`. Just include `{"identifier_type": "ZIP11", "stitch_label": "zip11"}` in `waterfall_order`.
- **Fan-out tiebreak**: Always deterministic. Order: confidence DESC → source_count DESC → max_effective_weight DESC → person_id ASC.
- **`L2IndexJob` HOUSEHOLD_ID null guard**: `filterNulls = true` — prevents phantom household collapse.
- **`OnboardingMatchJob` Phase 3 tiebreak**: `ORDER BY person_id ASC` as final tiebreak — fixes non-determinism.
- **`ExperianPreprocessJob` aggregation**: `min(id_value)` not `first(id_value)` — deterministic.
- **Null guards**: `identifier_value` filtered before `groupBy` in `PersonIdentityJob` and `MergeOnboardingLinksJob`.

---

## Project-Specific Rules

- **All Snowflake deployments via GitHub Actions** — never `snow dbt deploy` from the CLI.
- **`snowflake_profiles/profiles.yml` is a deploy stub** (account/user = `'_'`). Do not put real credentials there.
- **dbt unit tests are offline** — run `--select test_type:unit` first for fast feedback.
- **Singular tests require `resonate-dev` Snowflake** — live mock tables in `RESONATE.PRISM.*` (provisioned from `.context/cdp-119018-mock-tables.sql`).
- **person_id ≠ RID**: PRISM produces `person_id`. `RID` (Resonate-internal) is owned by Append's downstream pipeline. `waterfall_match` output column is `prism_matched_person_id`.
- **SemVer**: v1.0 is the Append cutover baseline. Release tagging tracked in CDP-119015.
Loading