Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ BUCKET_NAME=my-media-bucket
GCS_SA_PATH=.secrets/service-account.json
BUCKET_SA_PATH=.secrets/service-account.json

# S3 / S3-compatible (when BUCKET_PROVIDER=s3) — not yet implemented (sub-project 4)
# BUCKET_REGION=us-east-1
# BUCKET_ACCESS_KEY=
# BUCKET_SECRET_KEY=
# BUCKET_ENDPOINT_URL= # optional — for MinIO or S3-compatible endpoints
# S3 / S3-compatible (when BUCKET_PROVIDER=s3, e.g. AWS S3 or MinIO)
# S3_* names are read by both the Go API and the Python worker.
# S3_BUCKET_NAME=my-media-bucket
# S3_REGION=us-east-1
# S3_ACCESS_KEY_ID=
# S3_SECRET_ACCESS_KEY=
# S3_ENDPOINT_URL=http://localhost:9000 # internal/server-side endpoint (MinIO / S3-compatible). Leave unset for AWS S3.
# S3_PUBLIC_ENDPOINT_URL=http://localhost:9000 # optional client-facing endpoint for presigned + public URLs.
# # Set this when internal services reach the store by a private host
# # (e.g. http://minio:9000) that external clients cannot resolve.
# # Falls back to S3_ENDPOINT_URL when empty.

# ─── OpenTelemetry ────────────────────────────────────────────────────────────
OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317
Expand Down
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ coverage/
.vscode/
*.code-workspace
# Ignore generated documentation
docs/
docs/superpowers/
docs/*.json
docs/ASSESSMENT.md
docs/FIX_PLAN.md
doc/
*.pdf
# Ignore custom files
Expand All @@ -118,3 +121,5 @@ API_DOCUMENTATION.md
.env.local
.secrets/
docker-compose.override.yml

.pytest_cache/
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Entry: `worker/__main__.py` → `consumer/main.py`
- `_handle_job` takes a `SELECT … FOR UPDATE` lock, marks the row `in_progress`, calls `process_asset_dispatch`, then marks `done` + acks the stream message. On failure it re-queues (up to `MAX_JOB_ATTEMPTS`).
- `_recover_stuck_pending` re-adds `pending/in_progress` jobs older than 2 min back to the stream (recovery path, called when no messages available).
- `worker/processing/processor.py` — `process_asset_dispatch` routes by asset type to `images.py` or `videos.py`.
- `worker/storage/` — `StorageX` ABC; `GCSStorage` is the concrete impl.
- `worker/storage/` — `StorageX` ABC; `GCSStorage` and `S3Storage` concrete impls (selected by a factory in `worker/storage/__init__.py`). `S3Storage` mirrors the Go split-endpoint behavior: object I/O uses `endpoint_url`, persisted variant URLs use `public_endpoint_url`.
- `worker/utils/metrics.py` — Prometheus metrics via `prometheus_client`.

### Shared concerns
Expand All @@ -84,7 +84,7 @@ Entry: `worker/__main__.py` → `consumer/main.py`

**Error types (Go):** `pkg/errors` has typed API errors (`NotFoundError`, `BadRequestError`, `UnauthorizedError`, `ConflictError`, `InternalServerErrorError`) each embedding `*ApiError` (carries `StatusCode`). Handler layer type-asserts on these to set HTTP status. Use `fmt.Errorf("op: %w", err)` for internal wrapping; use `errors.New*` constructors (e.g. `errors.NewNotFoundError`) at the service/handler boundary.

**Storage (`pkg/utils/storagex`):** `StorageX` interface with `PutObject`, `GetObject`, `GeneratePresignedURL`, `PublicURL`, `DeleteObject`. Current impl: `GCSStorage`. S3/MinIO provider types exist in config but are not yet implemented.
**Storage (`pkg/utils/storagex`):** `StorageX` interface with `PutObject`, `GetObject`, `GeneratePresignedURL`, `PublicURL`, `DeleteObject`. Implementations: `GCSStorage` and `s3Storage` (S3 / S3-compatible MinIO). The S3 impl supports a split endpoint — `Endpoint` (internal/server-side) for object I/O and `PublicEndpoint` (client-facing) for presigned + public URLs; presigning happens against the public endpoint because SigV4 signs the Host header.

**OTel:** Full tracing + metrics on the API side. Go instruments are in `internal/metrics/metrics.go`. Collector config at `observability/otel-collector.yml`; Grafana/Loki/Tempo/Prometheus configs in `observability/`. Python side uses `prometheus_client` (not OTel).

Expand Down
149 changes: 128 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Create a `.env.local` file in the project root (`development` → `.env.local`,
# Server
ENV=development
HOST=0.0.0.0
PORT=8080
PORT=5010
LOG_LEVEL=DEBUG

# Database
Expand Down Expand Up @@ -108,7 +108,11 @@ S3_BUCKET_NAME=your-bucket-name
S3_REGION=us-east-1
S3_ACCESS_KEY_ID=your-access-key
S3_SECRET_ACCESS_KEY=your-secret-key
S3_ENDPOINT_URL=http://localhost:9000 # set for MinIO / S3-compatible stores
S3_ENDPOINT_URL=http://localhost:9000 # internal/server-side endpoint (MinIO / S3-compatible)
# Optional client-facing endpoint baked into presigned + public URLs. Set this
# when internal services reach the store by a private host (e.g. http://minio:9000)
# that external clients cannot resolve. Falls back to S3_ENDPOINT_URL when empty.
S3_PUBLIC_ENDPOINT_URL=http://localhost:9000

# Worker
STREAM_NAME=media:jobs
Expand Down Expand Up @@ -159,8 +163,27 @@ python -m worker # worker

### 6. Test the API

All `/api/v1` routes require a Bearer token — an AES-256-GCM token carrying a
user id, signed with `ENCRYPTION_KEY` (see [`pkg/utils/crypt.go`](pkg/utils/crypt.go)).
Mint one for local testing:

```bash
TOKEN=$(python3 - <<'PY'
import base64, os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = b"change_me_to_a_32_byte_secret____" # your 32-byte ENCRYPTION_KEY
nonce = os.urandom(12)
ct = AESGCM(key).encrypt(nonce, b"demo-user", None)
print(base64.urlsafe_b64encode(nonce + ct).rstrip(b"=").decode())
PY
)
```

Request a presigned upload URL:

```bash
curl -X POST http://localhost:8080/api/v1/assets/upload \
curl -X POST http://localhost:5010/api/v1/storage/presign \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"fileName": "image.jpg",
Expand All @@ -169,6 +192,19 @@ curl -X POST http://localhost:8080/api/v1/assets/upload \
}'
```

Upload the file to the returned `uploadUrl`, then mark the asset complete to
enqueue processing:

```bash
curl -X PUT "<uploadUrl>" -H "Content-Type: image/jpeg" --data-binary @image.jpg

curl "http://localhost:5010/api/v1/assets/<assetId>/complete" \
-H "Authorization: Bearer $TOKEN"
```

> Prefer the scripted path? [`scripts/demo-e2e.sh`](scripts/demo-e2e.sh) runs this
> entire flow (image + video + webhooks) end-to-end — see **Run the demo** below.

## 🐳 Docker Deployment

### Pull the published image (GHCR)
Expand Down Expand Up @@ -199,9 +235,12 @@ kubectl apply -f deploy/k8s/

## 📖 API Documentation

### Upload Asset
All `/api/v1` routes require an `Authorization: Bearer <token>` header (see
[Test the API](#6-test-the-api) for how to mint a token).

### Request a presigned upload URL

**Endpoint:** `POST /api/v1/assets/upload`
**Endpoint:** `POST /api/v1/storage/presign`

**Request:**
```json
Expand All @@ -215,32 +254,86 @@ kubectl apply -f deploy/k8s/
**Response:**
```json
{
"uploadUrl": "https://<storage-host>/...",
"assetId": "550e8400-e29b-41d4-a716-446655440000",
"method": "PUT",
"headers": {
"Content-Type": "image/jpeg"
},
"objectPath": "media/raw/550e8400-e29b-41d4-a716-446655440000",
"publicUrl": "https://<storage-host>/...",
"expiresAt": 1702468800
"status": "success",
"data": {
"uploadUrl": "http://localhost:9000/...",
"assetId": "550e8400-e29b-41d4-a716-446655440000",
"method": "PUT",
"headers": { "Content-Type": "image/jpeg" },
"objectPath": "example.jpg",
"publicUrl": "http://localhost:9000/...",
"expiresAt": 300
}
}
```

> The `uploadUrl` / `publicUrl` host depends on the configured storage provider (GCS, S3, or a MinIO endpoint).
> The `uploadUrl` / `publicUrl` host comes from the configured storage provider.
> For MinIO it is `S3_PUBLIC_ENDPOINT_URL` (the client-facing endpoint), so the
> URL is reachable from wherever the client runs — see [Storage Providers](#storage-providers).

### Mark Asset as Uploaded
### Mark an asset complete (enqueue processing)

**Endpoint:** `POST /api/v1/assets/{assetId}/uploaded`
**Endpoint:** `GET /api/v1/assets/{assetId}/complete`

Verifies the raw object exists in storage, transitions the asset to `uploaded`,
creates the processing job, and enqueues it (transactionally, via the outbox).

**Response:**
```json
{
"message": "Asset marked as uploaded",
"assetId": "550e8400-e29b-41d4-a716-446655440000"
"status": "success",
"message": "Asset marked as uploaded"
}
```

### Webhooks

Register an endpoint to receive processing-lifecycle events.

**Endpoints:**
- `POST /api/v1/webhooks` — register `{ "url", "secret", "events" }`
- `GET /api/v1/webhooks` — list your registrations
- `DELETE /api/v1/webhooks/{id}` — remove a registration

**Events:** `job.starting`, `job.started`, `job.done`, `job.failed`.

Deliveries are signed: each POST carries an `X-Webhook-Signature: sha256=<hmac>`
header computed over the JSON body using your registration `secret` (stored
encrypted at rest). A background dispatcher delivers pending events with
exponential-backoff retries and tracks them in the `webhook_deliveries` table.

```bash
curl -X POST http://localhost:5010/api/v1/webhooks \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"url": "https://example.com/hooks/mpiper",
"secret": "my-signing-secret",
"events": ["job.starting", "job.started", "job.done", "job.failed"]
}'
```

## 🎬 Run the demo

[`scripts/demo-e2e.sh`](scripts/demo-e2e.sh) drives the entire pipeline from the
host — exactly like a real client — for both an image and a video, including
webhook delivery. Bring the stack up **with the webhooks overlay**, then run it:

```bash
docker compose -f docker-compose.yml -f docker-compose.webhooks.yml up -d --build

./scripts/demo-e2e.sh
```

For each asset it presigns an upload, PUTs the file straight to MinIO over the
public `localhost:9000` endpoint, marks it complete, waits for the worker to
produce variants, fetches a variant back over HTTP, and asserts the
`job.starting → job.started → job.done` webhooks were delivered. It prints a
PASS/FAIL summary and exits non-zero on any failure.

Requirements on the host: `bash`, `curl`, `jq`, `docker`, and a `python3` with
the `cryptography` package (used only to mint the auth token).

## 🔧 Development

### Project Structure
Expand Down Expand Up @@ -318,6 +411,20 @@ MPiper selects a storage backend via `BUCKET_PROVIDER`:

Both the Go API and the Python worker share the same provider selection and env vars, so a single configuration drives the whole pipeline.

#### Internal vs public endpoints (`S3_PUBLIC_ENDPOINT_URL`)

When the store is reachable by a different host internally than externally —
the classic Docker case, where services talk to `http://minio:9000` but a
browser or a host-run client must use `http://localhost:9000` — set both:

- `S3_ENDPOINT_URL` — the **internal/server-side** endpoint used for object I/O (`http://minio:9000`)
- `S3_PUBLIC_ENDPOINT_URL` — the **client-facing** endpoint baked into presigned upload URLs and persisted variant URLs (`http://localhost:9000`)

This matters because SigV4 signs the `Host` header: a presigned URL must be
generated against the exact host the client will connect to, so it can't simply
be rewritten afterwards. When `S3_PUBLIC_ENDPOINT_URL` is unset it falls back to
`S3_ENDPOINT_URL` (single-endpoint behavior).

### Observability

The API emits OpenTelemetry traces and metrics; the worker exposes Prometheus metrics. The `observability/` directory contains a ready-to-run collector plus Grafana, Tempo, Loki, and Prometheus configuration.
Expand Down Expand Up @@ -368,9 +475,9 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
## 📊 Roadmap

- [x] Support for AWS S3 / MinIO storage
- [x] Webhook delivery tracking (schema)
- [x] Webhook delivery with HMAC signing + retry tracking
- [x] Video transcoding with FFmpeg (poster, 720p, preview)
- [ ] Support for Azure Blob Storage
- [ ] Video transcoding with FFmpeg
- [ ] Admin dashboard
- [ ] Batch processing API
- [ ] CDN integration
Expand Down
78 changes: 78 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
Expand All @@ -13,7 +14,11 @@ import (
"github.com/rndmcodeguy20/mpiper/internal/config"
"github.com/rndmcodeguy20/mpiper/internal/database"
"github.com/rndmcodeguy20/mpiper/internal/metrics"
"github.com/rndmcodeguy20/mpiper/internal/outbox"
"github.com/rndmcodeguy20/mpiper/internal/queue"
"github.com/rndmcodeguy20/mpiper/internal/repository"
"github.com/rndmcodeguy20/mpiper/internal/server"
"github.com/rndmcodeguy20/mpiper/internal/webhook"
"github.com/rndmcodeguy20/mpiper/pkg/logger"
"go.uber.org/zap"
)
Expand All @@ -27,6 +32,16 @@ var (
)

func main() {
// --health-check is used by the container HEALTHCHECK. It must be a
// lightweight probe against the already-running server — NOT a second
// server boot (which would fail to bind the port). Exit 0 if /healthz is OK.
for _, arg := range os.Args[1:] {
if arg == "--health-check" {
runHealthCheck()
return
}
}

cfg, err := config.InitializeConfig(config.ToEnvironment(Env))
if err != nil {
panic(err)
Expand Down Expand Up @@ -91,6 +106,46 @@ func main() {
baseLogger.Info("Migrations applied successfully")
}

// --- Outbox relay ---
rc, err := queue.MustGetRedisClient(&cfg.Redis, baseLogger)
if err != nil {
baseLogger.Sugar().Fatalf("Failed to create Redis client: %v", err)
}
rq := queue.NewRedisQueue(serverCtx, rc, queue.RedisQueueOptions{
QueueName: "media:jobs",
ConnectionTimeOut: 2 * time.Second,
MaxStreamLength: 10_000,
MaxRetries: 3,
RetryInterval: 2 * time.Second,
EnableMetrics: true,
}, baseLogger, m)

outboxRepo := repository.NewOutboxRepository(db, baseLogger)
relay := outbox.NewRelay(outboxRepo, rq, baseLogger, m, cfg.Outbox.RelayInterval, cfg.Outbox.RelayBatch)
_ = m.RegisterOutboxPendingFunc(func(ctx context.Context) (int64, error) {
return outboxRepo.CountPending(ctx)
})
go relay.Start(serverCtx)
go relay.StartCleanup(serverCtx, cfg.Outbox.Retention)

// --- Webhook dispatcher ---
webhookDispatcher := webhook.NewDispatcher(db, baseLogger, webhook.DispatcherConfig{
PollInterval: cfg.Webhook.PollInterval,
BatchSize: cfg.Webhook.BatchSize,
Timeout: cfg.Webhook.Timeout,
MaxAttempts: cfg.Webhook.MaxAttempts,
EncryptionKey: cfg.EncryptionKey,
Retention: cfg.Webhook.Retention,
})
go webhookDispatcher.Start(serverCtx)
go webhookDispatcher.StartCleanup(serverCtx)

_ = m.RegisterWebhookPendingFunc(func(ctx context.Context) (int64, error) {
var count int64
err := db.GetContext(ctx, &count, `SELECT COUNT(*) FROM webhook_deliveries WHERE status = 'pending'`)
return count, err
})

srv := server.NewServer(db, cfg, m)
go func() {
if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) {
Expand All @@ -107,3 +162,26 @@ func main() {
baseLogger.Error("shutdown failed", zap.Error(err))
}
}

// runHealthCheck performs a lightweight HTTP probe against the running server's
// /healthz endpoint and exits 0 (healthy) or 1 (unhealthy). It deliberately
// avoids the full startup path so it can run as a container HEALTHCHECK without
// contending for the listen port.
func runHealthCheck() {
port := os.Getenv("PORT")
if port == "" {
port = "5010"
}
client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%s/healthz", port))
if err != nil {
fmt.Fprintf(os.Stderr, "health check failed: %v\n", err)
os.Exit(1)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
fmt.Fprintf(os.Stderr, "health check failed: status %d\n", resp.StatusCode)
os.Exit(1)
}
os.Exit(0)
}
Loading
Loading