β οΈ Beta Status: This project is currently in beta. Features and APIs may change.
A production-grade data ingestion and normalization service that treats CSVs as hostile input and processes them through a deterministic, multi-stage pipeline.
- Frontend: https://csv-intelligence.vercel.app/
- Backend API: https://csv-intelligence-layer-production.up.railway.app
This service functions like a compiler for data β inferring schema, reconciling with user-defined expectations, normalizing types, validating constraints, and producing explainable outputs.
- Safe Streaming Parser: Handles malformed CSVs, detects encoding/delimiters
- Type Inference: Automatically detects column types with confidence scores
- Schema Reconciliation: Maps source columns to canonical schema using heuristics
- Human-in-the-Loop: Pauses pipeline on ambiguity, exposes decisions via API
- Decision Persistence: Reuses mapping decisions for future ingestions
- Explainability: Full audit trail of every decision made
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β PARSE β βββΆ β INFER β βββΆ β MAP β βββΆ β VALIDATE β βββΆ β OUTPUT β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β
βΌ
βββββββββββββββββββ
β HUMAN REVIEW β
β (if needed) β
βββββββββββββββββββ
- Runtime: Node.js 20+, TypeScript
- API: Fastify with Zod validation & Swagger/OpenAPI docs
- Queue: BullMQ + Redis
- Database: PostgreSQL + Drizzle ORM
- Storage: Local filesystem (dev) / S3-ready (production)
- AI: OpenAI integration for ambiguous column mapping
- Framework: React 18 + TypeScript + Vite
- UI: TailwindCSS + shadcn/ui components
- State: Zustand for history management
- API Client: Native fetch with type-safe interfaces
- Deployment: Vercel with Analytics
- Node.js 20+
- Docker & Docker Compose
- pnpm (recommended) or npm
# Clone and install
git clone <repo-url>
cd csv-intelligence-layer
pnpm install
# Copy environment file
cp .env.example .env
# Start infrastructure (Postgres, Redis)
pnpm docker:up
# Generate database migrations
pnpm db:generate
# Run migrations
pnpm db:migrate
# Start development server (in separate terminals)
pnpm dev # API server
pnpm worker:dev # Background workers# Navigate to frontend directory
cd frontend
# Install dependencies
pnpm install
# Copy environment file and configure API URL
cp .env.example .env
# Edit .env and set VITE_API_URL to your backend URL
# Start development server
pnpm dev
# Build for production
pnpm build
# Preview production build
pnpm previewThe frontend will be available at http://localhost:5173 and includes:
- Interactive CSV upload playground
- Schema management interface
- Real-time pipeline status tracking
- Decision review panel for ambiguous mappings
- Ingestion history viewer
| Method | Endpoint | Description |
|---|---|---|
POST |
/schemas |
Create a canonical schema |
GET |
/schemas |
List all schemas |
GET |
/schemas/:id |
Get schema by ID |
POST |
/ingestions?schemaId={uuid} |
Upload CSV and start pipeline |
GET |
/ingestions/:id |
Get ingestion status and results |
GET |
/ingestions/:id/review |
Get pending decisions (awaiting_review status) |
POST |
/ingestions/:id/resolve |
Submit human decisions to resume pipeline |
GET |
/ingestions/:id/output?format=csv|json |
Download cleaned data (CSV or JSON) |
GET |
/ingestions/:id/decisions |
Get decision audit log |
GET |
/health |
Health check |
GET |
/docs |
Interactive Swagger API documentation |
# 1. Create a canonical schema
curl -X POST http://localhost:3000/schemas \
-H "Content-Type: application/json" \
-d '{
"name": "customers",
"version": "1.0.0",
"columns": [
{"name": "email", "type": "email", "required": true},
{"name": "name", "type": "string", "required": true},
{"name": "signup_date", "type": "date"}
]
}'
# 2. Upload a CSV for ingestion
curl -X POST "http://localhost:3000/ingestions?schemaId=<schema-id>" \
-F "file=@customers.csv"
# 3. Check status
curl http://localhost:3000/ingestions/<ingestion-id>
# 4. If awaiting_review, resolve ambiguities
curl -X POST http://localhost:3000/ingestions/<id>/resolve \
-H "Content-Type: application/json" \
-d '{
"decisions": [
{"sourceColumn": "user_email", "targetColumn": "email"}
]
}'
# 5. Download cleaned output
curl -O http://localhost:3000/ingestions/<id>/output.
βββ frontend/ # React frontend application
β βββ src/
β β βββ components/ # UI components (schema, pipeline, upload)
β β βββ pages/ # Route pages (Home, Playground, Docs, About)
β β βββ lib/ # API client, config, utilities
β β βββ hooks/ # React hooks for data fetching
β β βββ stores/ # Zustand state management
β β βββ types/ # TypeScript type definitions
β βββ vercel.json # Vercel deployment config
β
βββ src/ # Backend application
β βββ api/ # Fastify route handlers
β β βββ health.ts
β β βββ ingestions.ts
β β βββ schemas.ts
β βββ db/ # Database schema and connection
β β βββ index.ts
β β βββ schema.ts
β βββ services/ # Business logic
β β βββ column-mapping.ts # AI-powered column mapping
β β βββ csv-parser.ts # Streaming CSV parser
β β βββ storage.ts # Storage abstraction layer
β β βββ type-inference.ts # Column type detection
β βββ workers/ # BullMQ job processors
β β βββ index.ts # Worker orchestration
β β βββ parse.worker.ts # CSV parsing stage
β β βββ infer.worker.ts # Type inference stage
β β βββ map.worker.ts # Column mapping stage
β β βββ validate.worker.ts # Data validation stage
β β βββ output.worker.ts # Output generation stage
β β βββ queues.ts # Queue definitions
β βββ types/ # TypeScript types and Zod schemas
β βββ utils/ # Utilities (logger, waitForDeps)
β βββ config.ts # Environment configuration
β βββ index.ts # API server entry point
β βββ prod-entry.ts # Production entry point
β βββ server.ts # Fastify server setup
β
βββ docker-compose.yml # Local dev infrastructure
pnpm test # Watch mode
pnpm test:run # Single run
pnpm test:coverage # With coveragepnpm db:generate # Generate migrations from schema
pnpm db:migrate # Apply migrations
pnpm db:studio # Open Drizzle Studio (visual editor)- Full 5-stage pipeline (parse β infer β map β validate β output)
- All worker implementations (BullMQ-based)
- OpenAI integration for intelligent column mapping
- Human-in-the-loop review system
- Decision audit logging
- Interactive web UI (React + TailwindCSS)
- Swagger/OpenAPI documentation
- Multi-format output (CSV, JSON)
- Production deployment (Railway + Vercel)
- Complete S3 storage implementation for production scale
- Webhook notifications for pipeline completion
- Comprehensive test suite (unit + integration)
- Advanced validation rules engine
- Batch ingestion support
- Rate limiting and API key authentication
- Data quality scoring and reports
MIT