TypeScript SDK for integrating human-in-the-loop checkpoints into AI agent workflows.
Datashift enables AI agents to submit tasks for human review before committing changes. Create review queues, define review types (approval, classification, labeling, scoring, augmentation), and integrate human oversight into any AI workflow — via REST API or MCP.
npm install @datashift/sdkimport {DatashiftRestClient} from '@datashift/sdk';
const datashift = new DatashiftRestClient({
apiKey: process.env.DATASHIFT_API_KEY!,
});
// AI agent enriches company data from external sources
const enrichedData = {
companyId: 'acme_corp_123',
original: {name: 'Acme Corp', industry: 'Unknown'},
enriched: {
name: 'Acme Corporation',
industry: 'Manufacturing',
employees: 5200,
revenue: '$1.2B',
linkedIn: 'https://linkedin.com/company/acme',
},
sources: ['LinkedIn', 'Crunchbase', 'SEC Filings'],
};
// Submit enrichment for human verification before updating CRM
const task = await datashift.task.submit({
queueKey: 'data-enrichment',
data: enrichedData,
summary: 'Verify Acme Corp enrichment before CRM update',
});
// Wait for reviewer approval
const result = await datashift.task.waitForReview(task.id, {
timeout: 300000, // 5 minutes
pollInterval: 5000, // Check every 5 seconds
});
// Apply verified changes to CRM
if (result.result.includes('approved')) {
await updateCRM(enrichedData.companyId, enrichedData.enriched);
}Create an API key in the Datashift Console.
const datashift = new DatashiftRestClient({
apiKey: 'sk_live_...'
});// Submit a task
const task = await datashift.task.submit({
queueKey: 'approvals',
data: {...},
summary: 'Short description',
});
// Get task by ID
const task = await datashift.task.get(taskId);
// List tasks
const tasks = await datashift.task.list({
queueId?: string;
state?: 'pending' | 'queued' | 'reviewed';
});
// Wait for review
const reviewed = await datashift.task.waitForReview(taskId, {
timeout: 300000, // 5 minutes
pollInterval: 2000, // Start at 2s
maxPollInterval: 30000 // Back off to 30s
});// List queues
const queues = await datashift.queue.list();
// Get queue config
const queue = await datashift.queue.get('refund-approvals');// List reviews with filters
const reviews = await datashift.review.list({
queueKey: 'approvals',
reviewerType: 'human',
from: new Date('2024-01-01'),
limit: 50,
});
// Get review by ID
const review = await datashift.review.get(reviewId);// Create webhook
const webhook = await datashift.webhook.create({
url: 'https://your-service.com/webhook',
events: ['task.reviewed'],
});
// Save webhook.secret securely!
// List webhooks
const webhooks = await datashift.webhook.list();
// Update webhook
const updated = await datashift.webhook.update(webhookId, {
url: 'https://new-url.com/webhook',
events: ['task.reviewed', 'task.created'],
active: true,
});
// Rotate webhook secret
const rotated = await datashift.webhook.rotateSecret(webhookId);
// Save rotated.secret securely!
// Send test event
await datashift.webhook.test(webhookId);
// Delete webhook
await datashift.webhook.delete(webhookId);Webhooks provide async notifications when tasks are reviewed.
import express from 'express';
import {verifyWebhookSignature, parseWebhookEvent} from '@datashift/sdk';
import type {WebhookEvent} from '@datashift/sdk';
const app = express();
// IMPORTANT: Use raw body for signature verification
app.post('/webhook/datashift',
express.raw({type: 'application/json'}),
(req, res) => {
const signature = req.headers['x-datashift-signature'] as string;
// Verify signature
if (!verifyWebhookSignature(req.body, signature, WEBHOOK_SECRET)) {
return res.status(401).send('Invalid signature');
}
// Parse event
const event = parseWebhookEvent<WebhookEvent>(req.body);
if (event.event === 'task.reviewed') {
const {task, queue, reviews} = event.data;
console.log(`Task ${task.id} in queue ${queue.key} reviewed`);
console.log(`Result: ${reviews[0].result}`);
console.log(`Reviewer: ${reviews[0].reviewer.name}`);
} else if (event.event === 'task.created') {
const {task, queue} = event.data;
console.log(`Task ${task.id} created in queue ${queue.key}`);
}
res.status(200).send('OK');
}
);| Event | Description |
|---|---|
task.created |
A new task was submitted for review |
task.reviewed |
A task has been reviewed (includes all reviews) |
// task.created
interface TaskCreatedWebhookEvent {
event: 'task.created';
timestamp: string;
data: {
task: WebhookTaskData;
queue: WebhookQueueData;
};
}
// task.reviewed
interface TaskReviewedWebhookEvent {
event: 'task.reviewed';
timestamp: string;
data: {
task: WebhookTaskData;
queue: WebhookQueueData;
reviews: WebhookReviewData[]; // All reviews (supports two-step workflows)
};
}
interface WebhookTaskData {
id: string;
external_id: string | null;
state: string;
summary: string | null;
data: Record<string, unknown>;
metadata: Record<string, unknown>;
sla_deadline: string | null;
reviewed_at: string | null;
created_at: string;
}
interface WebhookQueueData {
key: string;
name: string;
review_type: string;
}
interface WebhookReviewData {
result: string[];
data: Record<string, unknown>;
comment: string | null;
reviewer: { name: string; type: string };
created_at: string;
}
// Union type for all webhook events
type WebhookEvent = TaskCreatedWebhookEvent | TaskReviewedWebhookEvent;interface Task {
id: string;
queue_id: string;
external_id: string | null;
state: 'pending' | 'queued' | 'reviewed';
data: Record<string, unknown>;
context: Record<string, unknown>;
metadata: Record<string, unknown>;
summary: string | null;
sla_deadline: string | null;
reviewed_at: string | null;
created_at: string;
updated_at: string;
reviews?: Review[];
}interface Queue {
id: string;
key: string;
name: string;
description: string | null;
review_type: 'approval' | 'labeling' | 'classification' | 'scoring' | 'augmentation';
review_options: ReviewOption[];
assignment: 'manual' | 'round_robin' | 'ai_first';
sla_minutes: number | null;
deleted_at: string | null;
}interface Review {
id: string;
task_id: string;
reviewer_id: string | null;
result: string[];
data: Record<string, unknown>;
comment: string | null;
created_at: string;
}interface ListReviewsFilters {
taskId?: string;
reviewerId?: string;
queueKey?: string;
reviewerType?: 'ai' | 'human';
from?: Date | string;
to?: Date | string;
limit?: number;
offset?: number;
}interface ReviewResult {
task_id: string;
result: string[];
data: Record<string, unknown>;
reviewed_at: string;
review: Review;
}The SDK provides typed error classes for common error scenarios.
import {
DatashiftError,
AuthenticationError,
NotFoundError,
ValidationError,
TimeoutError,
RateLimitError,
ServerError,
} from '@datashift/sdk';
try {
const result = await datashift.task.waitForReview(taskId, {timeout: 60000});
} catch (error) {
if (error instanceof TimeoutError) {
console.log('Review not finished in time');
} else if (error instanceof AuthenticationError) {
console.log('Invalid credentials');
} else if (error instanceof NotFoundError) {
console.log('Task not found');
} else if (error instanceof RateLimitError) {
console.log(`Rate limited. Retry after ${error.retryAfter}s`);
} else if (error instanceof DatashiftError) {
console.log(`API error: ${error.message}`);
}
}interface RestClientConfig {
apiKey: string; // Required
baseUrl?: string; // Default: 'https://api.datashift.io'
timeout?: number; // Request timeout (default: 30000ms)
retries?: number; // Retry count (default: 3)
retryDelay?: number; // Initial retry delay (default: 1000ms)
}interface WaitOptions {
timeout?: number; // Max wait time (default: 300000ms)
pollInterval?: number; // Poll interval (default: 2000ms)
maxPollInterval?: number; // Max poll interval for backoff (default: 30000ms)
}import {DatashiftRestClient, verifyWebhookSignature, parseWebhookEvent} from '@datashift/sdk';
import type {WebhookEvent} from '@datashift/sdk';
import express from 'express';
const datashift = new DatashiftRestClient({
apiKey: process.env.DATASHIFT_API_KEY!,
});
// Store pending tasks (use Redis/DB in production)
const pendingTasks = new Map<string, (result: any) => void>();
// Submit task and return immediately
async function submitForReview(data: any): Promise<string> {
const task = await datashift.task.submit({
queueKey: 'reviews',
data,
});
return task.id;
}
// Webhook handler
const app = express();
app.post('/webhook/datashift',
express.raw({type: 'application/json'}),
(req, res) => {
const signature = req.headers['x-datashift-signature'] as string;
if (!verifyWebhookSignature(req.body, signature, process.env.WEBHOOK_SECRET!)) {
return res.status(401).send('Invalid signature');
}
const event = parseWebhookEvent<WebhookEvent>(req.body);
if (event.event === 'task.reviewed') {
const resolver = pendingTasks.get(event.data.task.id);
if (resolver) {
resolver(event.data);
pendingTasks.delete(event.data.task.id);
}
}
res.status(200).send('OK');
}
);
app.listen(3000);- Node.js 18+
- TypeScript 5.0+ (for type definitions)
- Datashift Website — Product overview and sign up
- Documentation — Guides, API reference, and examples
- MCP Server — Connect AI agents via Model Context Protocol
- Console — Manage queues, reviewers, and API keys
- GitHub — SDKs and sample projects
MIT