Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copy this to .env.local and fill in your values
# NEVER commit .env.local to git
# Copy this to .env and fill in your values
# NEVER commit .env to git

# Database
DATABASE_URL=postgresql://pulseops:dev_password_change_in_production@localhost:5432/pulseops_dev
Expand All @@ -13,6 +13,10 @@ KAFKA_BROKERS=localhost:9092
# API Configuration
PORT=3001
NODE_ENV=development
API_URL=http://localhost:3001
GRAPHQL_URL=http://localhost:3002/graphql
VITE_GRAPHQL_URL=http://localhost:3002/graphql
VITE_PULSEOPS_API_KEY=demo_key_change_this

# Security (generate secure values for production)
JWT_SECRET=your_jwt_secret_here_change_in_production
Expand All @@ -25,7 +29,7 @@ GOOGLE_CALLBACK_URL=http://localhost:3001/auth/google/callback

# Rate Limiting
RATE_LIMIT_WINDOW_MS=60000
RATE_LIMIT_MAX_REQUESTS=100
RATE_LIMIT_MAX_REQUESTS=20000

# Monitoring
LOG_LEVEL=info
163 changes: 99 additions & 64 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,83 +2,94 @@

Event-driven analytics backend with PostgreSQL time-series partitioning, Kafka event queue, Redis caching, and GraphQL API.

Designed to handle 1000+ events/second from heterogeneous sources, aggregate into daily/hourly metrics, and serve sub-second dashboard queries via cached GraphQL.
Designed to evaluate heterogeneous event ingestion, asynchronous aggregation, tenant-skew behavior, and dashboard query performance with benchmark evidence rather than hard-coded performance claims.

## Evidence
## Article Evidence

**Event Ingestion** — HTTP API accepts JSON events (any schema) and publishes to Kafka topic. Batches before writes reduce database load.
PulseOps includes architecture and benchmark artifacts for evidence-based writeups. The repository supports honest claims about the shape of the system: Fastify ingest, Kafka decoupling, worker aggregation, PostgreSQL raw/aggregate storage, Redis-backed GraphQL caching, synthetic skew generation, and k6 benchmark scripts.

- Max: 1000+ events/second per node (multi-instance deployment supported)
- Latency: p95 < 100ms ingestion time
- Rate limiter: 100 req/min per API key (configurable)
Do not publish specific throughput, latency, cache-hit, Kafka-lag, or availability numbers unless they are tied to a dated benchmark report under `docs/benchmarks/` with raw k6 output and environment notes.

(`services/ingest-api/src/handler.ts`)
- Evidence guide: `docs/article-evidence.md`
- Observability map: `docs/observability.md`
- Benchmark report template: `docs/benchmarks/YYYY-MM-DD-pulseops-benchmark.md`
- Clean full local benchmark: `docs/benchmarks/2026-06-16-clean-full-benchmark.md`
- Canonical local smoke report: `docs/benchmarks/2026-06-16-final-benchmark-smoke-pulseops-benchmark.md`
- Heavier ingest-scale report: `docs/benchmarks/2026-06-16-ingest-scale-pulseops-benchmark.md`
- Synthetic skew generator: `scripts/generate-skewed-events.ts`
- k6 load tests: `tests/load/`

**Event Processing** — Worker consumes from Kafka, aggregates events into hourly/daily metrics, writes to PostgreSQL. Handles out-of-order events and late arrivals via watermarking.
## Architecture Evidence

(`services/worker/src/processor.ts`)
**Event Ingestion** — HTTP API accepts validated JSON events and publishes them to Kafka topic `events-raw`. Batches reduce per-event request overhead and keep PostgreSQL writes out of the request path.

(`services/ingest-api/src/index.ts`)

**Event Processing** — Worker consumes from Kafka, writes raw events to PostgreSQL, and updates daily aggregates. Late-arrival behavior should be benchmarked before making correctness or freshness claims.

(`services/worker/src/index.ts`, `services/worker/src/processing.ts`, `services/worker/src/aggregators/daily.ts`)

**Storage** — PostgreSQL 16 with time-series optimization:
- Partitioned tables by date (automatic rotation)
- Retention policies (30-day default, configurable)
- Indexes on (timestamp, tenant_id, metric_name)
- Vacuum tuning for write-heavy workload
- `events` partitioned by timestamp in the initial schema
- `daily_aggregates` keyed by `org_id`, `project_id`, metric, date, and dimensions
- Tenant/time indexes for raw event lookup
- Aggregate lookup indexes for dashboard queries

(`services/ingest-api/migrations/` — schema definitions)
(`scripts/init-db.sql`, `migrations/006_performance_optimizations.sql`)

**Query Layer** — Apollo GraphQL API:
- Cached queries (Redis TTL: 5min for hourly, 1h for daily aggregates)
- Multi-tenant authorization (checked on resolver entry)
- OAuth 2.0 (Google integration for user auth)
- Query complexity limits (prevent DOS via deep nested queries)
- Redis cache with tenant/project cache-version keys and 300-second TTL
- Queries require `X-API-Key` and reject cross-tenant `orgId`/`projectId`
- Aggregate-backed paths when no property filters are supplied
- Raw event fallback for supported property filters

(`services/graphql-api/src/schema.ts`)

**Query Performance**:
- Cache hits: p95 < 50ms
- Database queries (cache miss): p95 < 500ms
- Dashboard (typical 5-10 queries): p95 < 2s total

(`scripts/load-test.js` — k6 benchmark script)
**Benchmark Tooling**:
- `scripts/generate-skewed-events.ts` produces hot/medium/quiet tenant workloads with late arrivals, duplicates, bursts, multiple metrics, and 7/30/90-day windows.
- `tests/load/ingest-throughput.js` measures ingest acceptance behavior.
- `tests/load/hot-tenant.js` stresses skewed tenant distribution.
- `tests/load/dashboard-query.js` measures GraphQL dashboard query behavior.
- `tests/load/backpressure.js` captures behavior under burst pressure.

**Dashboard** — React 18 + Recharts, renders:
- Real-time event count trends (updated via GraphQL subscriptions)
- Event count trends from GraphQL queries
- Custom metric dashboards (user-defined dimensions)
- Anomaly detection (threshold alerts)
- Timezone-aware charting

(`web/src/pages/Dashboard.tsx`)
(`web/src/components/Dashboard.tsx`)

**Infrastructure** — Docker Compose local dev stack:
- PostgreSQL 16 (time-series partitioned)
- Redis 7 (query cache)
- Kafka + Redpanda (message queue, 3 partitions)
- Apache Kafka (message queue, 3 partitions)
- Ingest API (Node.js, port 3001)
- GraphQL API (Node.js, port 3002)
- Worker (Node.js, Kafka consumer)
- React frontend (Vite, port 5173)
- React frontend (Vite, port 5173, run through `pnpm dev:web`)

All services configured with health checks + auto-restart.
Docker Compose health checks cover PostgreSQL, Redis, Kafka, and the app services; the Node app services also use restart policies.

(`docker-compose.yml`)

**Stack** — Node.js 20, Express, Apollo GraphQL, React 18, PostgreSQL 16, Redis 7, Kafka, Playwright (E2E), k6 (load testing), pnpm workspaces.
**Stack** — Node.js 20, Fastify, Apollo GraphQL, React 18, PostgreSQL 16, Redis 7, Kafka, Playwright (E2E), k6 (load testing), pnpm workspaces.

## How It Works

1. **Event source sends JSON** → POST to ingest API
2. **API validates + publishes** → Kafka topic (with batching)
3. **Worker consumes** → Aggregates into hourly buckets by (tenant_id, metric_name, dimension_values)
4. **Aggregates stored** → PostgreSQL partitioned tables
2. **API validates + authenticates** → binds org/project from `X-API-Key`
3. **API publishes** → Kafka topic `events-raw`
4. **Worker consumes** → writes idempotent raw events and updates daily aggregates
5. **GraphQL query** → Checks cache (Redis) → if miss, queries DB → returns to dashboard
6. **Dashboard renders** → Real-time updates via subscriptions
6. **Dashboard renders** → React queries GraphQL and can be refreshed by the client

## Getting Started

### One-Command Setup
```bash
pnpm bootstrap # Installs deps, starts Docker services, runs migrations
pnpm dev # Starts all 4 backend services + frontend concurrently
pnpm dev # Starts ingest API, GraphQL API, worker, and frontend concurrently
```

Visit `http://localhost:5173`.
Expand All @@ -90,6 +101,7 @@ docker-compose up -d

# Create schema
pnpm db:migrate
pnpm db:verify:fresh

# Seed sample data
pnpm db:seed
Expand All @@ -99,38 +111,42 @@ pnpm dev
```

### Services
- **Ingest API** (port 3001): `curl -X POST http://localhost:3001/ingest -H 'Content-Type: application/json' -d '{"event":"signup","user_id":"123"}'`
- **GraphQL API** (port 3002): `http://localhost:3002/graphql`
- **Ingest API** (port 3001): `curl -X POST http://localhost:3001/api/v1/events -H 'X-API-Key: demo_key_change_this' -H 'Content-Type: application/json' -d '{"event_name":"signup","user_id":"123"}'`; metrics at `http://localhost:3001/metrics`
- **GraphQL API** (port 3002): `http://localhost:3002/graphql`; metrics at `http://localhost:3002/metrics`
- **Worker** (metrics port 3003): `http://localhost:3003/metrics`
- **Frontend** (port 5173): `http://localhost:5173`

## API Examples

### Ingest Event
```bash
curl -X POST http://localhost:3001/ingest \
-H "Authorization: Bearer API_KEY" \
curl -X POST http://localhost:3001/api/v1/events \
-H "X-API-Key: demo_key_change_this" \
-H "Content-Type: application/json" \
-d '{
"event": "page_view",
"event_name": "page_view",
"user_id": "u123",
"page": "/pricing",
"timestamp": 1705945000
"properties": {
"page": "/pricing"
},
"timestamp": "2026-06-16T12:00:00.000Z"
}'
```

### Query Metrics (GraphQL)
```graphql
query {
metrics(
tenant: "acme-corp"
name: "page_views"
startTime: "2024-01-01T00:00:00Z"
endTime: "2024-01-31T23:59:59Z"
groupBy: ["page"]
orgId: "00000000-0000-0000-0000-000000000001"
projectId: "00000000-0000-0000-0000-000000000002"
startDate: "2026-06-01"
endDate: "2026-06-16"
) {
timestamp
value
dimensions
totalEvents
topEvents {
eventName
count
}
}
}
```
Expand All @@ -154,18 +170,38 @@ pnpm test:e2e # Playwright

### Load Testing
```bash
pnpm test:load # k6, 1000 RPS sustained for 5 minutes
pnpm --silent benchmark:generate -- --tenants 100 --events 100000 --days 30 --hot-tenant-ratio 0.6 --late-arrival-ratio 0.05 --duplicate-ratio 0.01 --output jsonl > docs/benchmarks/evidence/events.jsonl
RUN_ID=local-smoke API_URL=http://localhost:3001 GRAPHQL_URL=http://localhost:3002/graphql API_KEY=demo_key_change_this pnpm benchmark
pnpm benchmark:report -- --run-id local-smoke --output docs/benchmarks/local-smoke-pulseops-benchmark.md
RUN_ID=local-smoke pnpm validate:evidence # writes docs/benchmarks/latest-pulseops-benchmark.md
pnpm db:verify:fresh
API_URL=http://localhost:3001 API_KEY=demo_key_change_this pnpm benchmark:ingest
pnpm benchmark:seed-tenants -- --tenants 100 --hot-tenants 1 --medium-tenants 10 --manifest tmp/benchmark-tenants.json
TENANT_KEYS_FILE=tmp/benchmark-tenants.json API_URL=http://localhost:3001 pnpm benchmark:hot-tenant
RUN_ID=local-smoke TENANT_KEYS_FILE=tmp/benchmark-tenants.json pnpm benchmark:hot-db -- --require-complete
GRAPHQL_URL=http://localhost:3002/graphql API_KEY=demo_key_change_this ORG_ID=00000000-0000-0000-0000-000000000001 PROJECT_ID=00000000-0000-0000-0000-000000000002 pnpm benchmark:dashboard
RUN_ID=cache-smoke WARM_ITERATIONS=12 pnpm benchmark:cache -- --run-id cache-smoke --warm-iterations 12
pnpm benchmark:worker -- --run-id worker-catchup-smoke --events 1000 --batch-size 100 --poll-ms 500 --timeout-ms 60000
docker compose stop worker
pnpm prove:worker-retry-offsets -- --timeout-ms 120000 --poll-ms 500
docker compose start worker
API_URL=http://localhost:3001 API_KEY=demo_key_change_this pnpm benchmark:backpressure
```

## Performance Characteristics

| Metric | Target | Notes |
These are benchmark targets and measurement areas, not measured claims.

| Metric | Status | Notes |
|--------|--------|-------|
| Ingest throughput | 1000+ RPS | Single node; horizontally scalable |
| Ingest p95 latency | < 100ms | Network + Kafka publish |
| Query p95 (cached) | < 50ms | Redis hit |
| Query p95 (DB) | < 500ms | PostgreSQL + index |
| Availability | 99%+ | Demo target; prod requires multi-region |
| Ingest throughput | Measured locally | See `docs/benchmarks/2026-06-16-clean-full-benchmark.md` for the clean full local run, and `docs/benchmarks/2026-06-16-ingest-scale-pulseops-benchmark.md` for the heavier fixed-rate ingest runs. The 1000 RPS target was not sustained locally. |
| Ingest p95 latency | Measured locally | See dated benchmark reports; request acceptance latency is not aggregate visibility latency |
| Dashboard query p95 | Measured locally | See canonical smoke report; includes k6 dashboard smoke and cold/warm cache smoke |
| Worker catch-up | Measured locally | 200-event local smoke run; see canonical smoke report |
| Kafka lag | Measured locally | Smoke run returned lag to 0; heavier ingest-scale snapshot captured 10,254,305 queued messages. Do not claim a lag limit or freshness guarantee. |
| Tenant skew impact | Smoke measured locally | Canonical local smoke reconciled 249 persisted hot-test events with Kafka lag 0: hot 201, quiet 40, medium 8. Evidence: `docs/benchmarks/evidence/hot-tenant-db-2026-06-16-final-benchmark-smoke.json`; full long-duration skew benchmark still needed |
| Hot-tenant DB pressure | Measured locally when `benchmark:hot-db -- --require-complete` is run | Aggregate-key pressure, request/persistence/lag reconciliation, and after-run DB snapshot; not continuous lock sampling |
| Backpressure behavior | TBD | Record rate limits, errors, queue lag, and recovery |

## Deployment

Expand Down Expand Up @@ -194,21 +230,20 @@ Event Source
Worker (aggregation)
PostgreSQL 16 (time-series partitioned, auto-vacuum tuned)
PostgreSQL 16 (time-series partitioned)
GraphQL API (port 3002, Redis cache layer)
React Dashboard (port 5173, subscriptions for real-time updates)
React Dashboard (port 5173, polling refresh)
```

## Security

- **No secrets in git** — `.env` files gitignored, secrets in environment
- **API key hashing** — bcrypt, stored in PostgreSQL
- **Multi-tenant isolation** — All queries filtered by `tenant_id` at resolver entry
- **OAuth 2.0** — Google sign-in for user authentication
- **Rate limiting** — Token bucket per API key (100 req/min default)
- **SQL injection prevention** — Parameterized queries via ORM
- **API key hashing** — SHA-256 fingerprint lookup plus bcrypt verification
- **Multi-tenant isolation** — API keys bind org/project context; GraphQL rejects cross-tenant args
- **Rate limiting** — Token bucket per API-key fingerprint
- **SQL injection prevention** — Parameterized PostgreSQL queries
- **Automated scans** — ESLint security plugin, npm audit in CI

## License
Expand Down
38 changes: 31 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ services:
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
Expand All @@ -51,7 +52,7 @@ services:
volumes:
- kafka_data:/var/lib/kafka/data
healthcheck:
test: ["CMD-SHELL", "kafka-metadata.sh --snapshot /var/lib/kafka/data/__cluster_metadata-0/00000000000000000000.log --print --skip-record-metadata || exit 0"]
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list > /dev/null 2>&1"]
interval: 10s
timeout: 10s
retries: 5
Expand All @@ -69,7 +70,9 @@ services:
PORT: 3001
DATABASE_URL: postgresql://pulseops:dev_password_change_in_production@postgres:5432/pulseops_dev
REDIS_URL: redis://redis:6379
KAFKA_BROKERS: kafka:9092
KAFKA_BROKERS: kafka:29092
RATE_LIMIT_WINDOW_MS: 60000
RATE_LIMIT_MAX_REQUESTS: 20000
depends_on:
postgres:
condition: service_healthy
Expand All @@ -79,6 +82,11 @@ services:
condition: service_healthy
volumes:
- ./services/ingest-api/src:/app/src
healthcheck:
test: ["CMD-SHELL", "node -e \"fetch('http://localhost:3001/health').then(r => process.exit(r.ok ? 0 : 1)).catch(() => process.exit(1))\""]
interval: 10s
timeout: 5s
retries: 6
restart: unless-stopped

graphql-api:
Expand All @@ -100,25 +108,41 @@ services:
condition: service_healthy
volumes:
- ./services/graphql-api/src:/app/src
healthcheck:
test: ["CMD-SHELL", "node -e \"fetch('http://localhost:3002/health').then(r => process.exit(r.ok ? 0 : 1)).catch(() => process.exit(1))\""]
interval: 10s
timeout: 5s
retries: 6
restart: unless-stopped

worker:
build:
context: ./services/worker
dockerfile: Dockerfile
container_name: pulseops-worker
ports:
- "3003:3003"
environment:
NODE_ENV: development
DATABASE_URL: postgresql://pulseops:dev_password_change_in_production@postgres:5432/pulseops_dev
KAFKA_BROKERS: kafka:9092
REDIS_URL: redis://redis:6379
KAFKA_BROKERS: kafka:29092
KAFKA_GROUP_ID: pulseops-aggregators
WORKER_METRICS_PORT: 3003
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
kafka:
condition: service_healthy
volumes:
- ./services/worker/src:/app/src
healthcheck:
test: ["CMD-SHELL", "node -e \"fetch('http://localhost:3003/metrics').then(r => process.exit(r.ok ? 0 : 1)).catch(() => process.exit(1))\""]
interval: 10s
timeout: 5s
retries: 6
restart: unless-stopped

volumes:
Expand Down
Loading
Loading