Skip to content

A deterministic engine that transforms messy, user-uploaded CSVs into clean, schema-compliant, import-ready data.

Notifications You must be signed in to change notification settings

parv3213/csv-intelligence-layer

Repository files navigation

CSV Intelligence Layer

⚠️ Beta Status: This project is currently in beta. Features and APIs may change.

A production-grade data ingestion and normalization service that treats CSVs as hostile input and processes them through a deterministic, multi-stage pipeline.

πŸš€ Live Demo

Try Live Demo Watch Demo Video

Overview

This service functions like a compiler for data β€” inferring schema, reconciling with user-defined expectations, normalizing types, validating constraints, and producing explainable outputs.

Key Features

  • Safe Streaming Parser: Handles malformed CSVs, detects encoding/delimiters
  • Type Inference: Automatically detects column types with confidence scores
  • Schema Reconciliation: Maps source columns to canonical schema using heuristics
  • Human-in-the-Loop: Pauses pipeline on ambiguity, exposes decisions via API
  • Decision Persistence: Reuses mapping decisions for future ingestions
  • Explainability: Full audit trail of every decision made

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    PARSE    β”‚ ──▢ β”‚    INFER    β”‚ ──▢ β”‚     MAP     β”‚ ──▢ β”‚  VALIDATE   β”‚ ──▢ β”‚   OUTPUT    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                              β”‚
                                              β–Ό
                                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                    β”‚  HUMAN REVIEW   β”‚
                                    β”‚   (if needed)   β”‚
                                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Tech Stack

Backend

  • Runtime: Node.js 20+, TypeScript
  • API: Fastify with Zod validation & Swagger/OpenAPI docs
  • Queue: BullMQ + Redis
  • Database: PostgreSQL + Drizzle ORM
  • Storage: Local filesystem (dev) / S3-ready (production)
  • AI: OpenAI integration for ambiguous column mapping

Frontend

  • Framework: React 18 + TypeScript + Vite
  • UI: TailwindCSS + shadcn/ui components
  • State: Zustand for history management
  • API Client: Native fetch with type-safe interfaces
  • Deployment: Vercel with Analytics

Getting Started

Prerequisites

  • Node.js 20+
  • Docker & Docker Compose
  • pnpm (recommended) or npm

Setup

# Clone and install
git clone <repo-url>
cd csv-intelligence-layer
pnpm install

# Copy environment file
cp .env.example .env

# Start infrastructure (Postgres, Redis)
pnpm docker:up

# Generate database migrations
pnpm db:generate

# Run migrations
pnpm db:migrate

# Start development server (in separate terminals)
pnpm dev          # API server
pnpm worker:dev   # Background workers

Frontend Development

# Navigate to frontend directory
cd frontend

# Install dependencies
pnpm install

# Copy environment file and configure API URL
cp .env.example .env
# Edit .env and set VITE_API_URL to your backend URL

# Start development server
pnpm dev

# Build for production
pnpm build

# Preview production build
pnpm preview

The frontend will be available at http://localhost:5173 and includes:

  • Interactive CSV upload playground
  • Schema management interface
  • Real-time pipeline status tracking
  • Decision review panel for ambiguous mappings
  • Ingestion history viewer

API Endpoints

Method Endpoint Description
POST /schemas Create a canonical schema
GET /schemas List all schemas
GET /schemas/:id Get schema by ID
POST /ingestions?schemaId={uuid} Upload CSV and start pipeline
GET /ingestions/:id Get ingestion status and results
GET /ingestions/:id/review Get pending decisions (awaiting_review status)
POST /ingestions/:id/resolve Submit human decisions to resume pipeline
GET /ingestions/:id/output?format=csv|json Download cleaned data (CSV or JSON)
GET /ingestions/:id/decisions Get decision audit log
GET /health Health check
GET /docs Interactive Swagger API documentation

Example Usage

# 1. Create a canonical schema
curl -X POST http://localhost:3000/schemas \
  -H "Content-Type: application/json" \
  -d '{
    "name": "customers",
    "version": "1.0.0",
    "columns": [
      {"name": "email", "type": "email", "required": true},
      {"name": "name", "type": "string", "required": true},
      {"name": "signup_date", "type": "date"}
    ]
  }'

# 2. Upload a CSV for ingestion
curl -X POST "http://localhost:3000/ingestions?schemaId=<schema-id>" \
  -F "file=@customers.csv"

# 3. Check status
curl http://localhost:3000/ingestions/<ingestion-id>

# 4. If awaiting_review, resolve ambiguities
curl -X POST http://localhost:3000/ingestions/<id>/resolve \
  -H "Content-Type: application/json" \
  -d '{
    "decisions": [
      {"sourceColumn": "user_email", "targetColumn": "email"}
    ]
  }'

# 5. Download cleaned output
curl -O http://localhost:3000/ingestions/<id>/output

Development

Project Structure

.
β”œβ”€β”€ frontend/              # React frontend application
β”‚   β”œβ”€β”€ src/
β”‚   β”‚   β”œβ”€β”€ components/   # UI components (schema, pipeline, upload)
β”‚   β”‚   β”œβ”€β”€ pages/        # Route pages (Home, Playground, Docs, About)
β”‚   β”‚   β”œβ”€β”€ lib/          # API client, config, utilities
β”‚   β”‚   β”œβ”€β”€ hooks/        # React hooks for data fetching
β”‚   β”‚   β”œβ”€β”€ stores/       # Zustand state management
β”‚   β”‚   └── types/        # TypeScript type definitions
β”‚   └── vercel.json       # Vercel deployment config
β”‚
β”œβ”€β”€ src/                   # Backend application
β”‚   β”œβ”€β”€ api/              # Fastify route handlers
β”‚   β”‚   β”œβ”€β”€ health.ts
β”‚   β”‚   β”œβ”€β”€ ingestions.ts
β”‚   β”‚   └── schemas.ts
β”‚   β”œβ”€β”€ db/               # Database schema and connection
β”‚   β”‚   β”œβ”€β”€ index.ts
β”‚   β”‚   └── schema.ts
β”‚   β”œβ”€β”€ services/         # Business logic
β”‚   β”‚   β”œβ”€β”€ column-mapping.ts    # AI-powered column mapping
β”‚   β”‚   β”œβ”€β”€ csv-parser.ts        # Streaming CSV parser
β”‚   β”‚   β”œβ”€β”€ storage.ts           # Storage abstraction layer
β”‚   β”‚   └── type-inference.ts    # Column type detection
β”‚   β”œβ”€β”€ workers/          # BullMQ job processors
β”‚   β”‚   β”œβ”€β”€ index.ts             # Worker orchestration
β”‚   β”‚   β”œβ”€β”€ parse.worker.ts      # CSV parsing stage
β”‚   β”‚   β”œβ”€β”€ infer.worker.ts      # Type inference stage
β”‚   β”‚   β”œβ”€β”€ map.worker.ts        # Column mapping stage
β”‚   β”‚   β”œβ”€β”€ validate.worker.ts   # Data validation stage
β”‚   β”‚   β”œβ”€β”€ output.worker.ts     # Output generation stage
β”‚   β”‚   └── queues.ts            # Queue definitions
β”‚   β”œβ”€β”€ types/            # TypeScript types and Zod schemas
β”‚   β”œβ”€β”€ utils/            # Utilities (logger, waitForDeps)
β”‚   β”œβ”€β”€ config.ts         # Environment configuration
β”‚   β”œβ”€β”€ index.ts          # API server entry point
β”‚   β”œβ”€β”€ prod-entry.ts     # Production entry point
β”‚   └── server.ts         # Fastify server setup
β”‚
└── docker-compose.yml    # Local dev infrastructure

Running Tests

pnpm test           # Watch mode
pnpm test:run       # Single run
pnpm test:coverage  # With coverage

Database

pnpm db:generate    # Generate migrations from schema
pnpm db:migrate     # Apply migrations
pnpm db:studio      # Open Drizzle Studio (visual editor)

Roadmap

βœ… Completed

  • Full 5-stage pipeline (parse β†’ infer β†’ map β†’ validate β†’ output)
  • All worker implementations (BullMQ-based)
  • OpenAI integration for intelligent column mapping
  • Human-in-the-loop review system
  • Decision audit logging
  • Interactive web UI (React + TailwindCSS)
  • Swagger/OpenAPI documentation
  • Multi-format output (CSV, JSON)
  • Production deployment (Railway + Vercel)
  • Complete S3 storage implementation for production scale

🚧 In Progress / Planned

  • Webhook notifications for pipeline completion
  • Comprehensive test suite (unit + integration)
  • Advanced validation rules engine
  • Batch ingestion support
  • Rate limiting and API key authentication
  • Data quality scoring and reports

License

MIT

About

A deterministic engine that transforms messy, user-uploaded CSVs into clean, schema-compliant, import-ready data.

Topics

Resources

Stars

Watchers

Forks

Contributors 2

  •  
  •  

Languages