Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,25 @@ That's it. Tasks are queued to Redis, workers process them in parallel, progress

## Supported Patterns

### Activity Metadata (Multi-Tenancy)

Attach arbitrary metadata when enqueueing tasks for filtering and tenant isolation:

```python
task = await ax.enqueue(
"process_document",
context,
metadata={"organization_id": "org-123", "user_id": "user-456"}
)

# Filter activities by metadata
activities = ax.activity.list(db, metadata_filter={"organization_id": "org-123"})
detail = ax.activity.detail(db, agent_id, metadata_filter={"organization_id": "org-123"})

# Access metadata programmatically (excluded from API serialization by default)
org_id = detail.metadata["organization_id"]
```

### Automatic Activity Tracking

Every task gets full lifecycle tracking without manual updates:
Expand Down Expand Up @@ -386,6 +405,7 @@ Activity tracking uses SQLAlchemy with two tables:
**`agentexec_activity`** - Main activity records
- `agent_id` - Unique identifier (UUID)
- `agent_type` - Task name
- `metadata` - JSON field for custom data (e.g., tenant info)
- `created_at`, `updated_at` - Timestamps

**`agentexec_activity_log`** - Status and progress
Expand Down Expand Up @@ -733,4 +753,5 @@ MIT License - see [LICENSE](LICENSE) for details.
- **npm**: [agentexec-ui](https://www.npmjs.com/package/agentexec-ui)
- **Documentation**: [docs/](docs/)
- **Example App**: [examples/openai-agents-fastapi/](examples/openai-agents-fastapi/)
- **Multi-Tenancy Example**: [examples/multi-tenancy/](examples/multi-tenancy/)
- **Issues**: [GitHub Issues](https://github.com/Agent-CI/agentexec/issues)
92 changes: 92 additions & 0 deletions examples/multi-tenancy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Multi-Tenancy with Activity Metadata

This example demonstrates how to use activity metadata for multi-tenant applications.

## Overview

When building multi-tenant applications, you often need to:
1. Associate background tasks with specific organizations/tenants
2. Filter activity views to only show tasks belonging to the current tenant
3. Ensure proper data isolation between tenants

The `metadata` parameter on `ax.enqueue()` and `pipeline.enqueue()` enables this by attaching arbitrary key-value pairs to activity records.

## Usage

### Enqueueing with Metadata

```python
import agentexec as ax

# Enqueue a task with organization context
task = await ax.enqueue(
"process_document",
DocumentContext(file_id="doc-123"),
metadata={"organization_id": "org-456", "user_id": "user-789"}
)
```

### Filtering Activities by Metadata

```python
from agentexec import activity

# List only activities for a specific organization
activities = activity.list(
session,
metadata_filter={"organization_id": "org-456"}
)

# Get activity detail with tenant validation
# Returns None if the activity doesn't belong to this organization
detail = activity.detail(
session,
agent_id="...",
metadata_filter={"organization_id": "org-456"}
)
```

### Pipeline Example

```python
pipeline = ax.Pipeline(pool)

class DocumentPipeline(pipeline.Base):
@pipeline.step(0)
async def extract(self, ctx: DocumentContext) -> ExtractedData:
...

# Enqueue pipeline with metadata
task = await pipeline.enqueue(
context=DocumentContext(file_id="doc-123"),
metadata={"organization_id": "org-456"}
)
```

## Running the Example

```bash
# Install dependencies
pip install agentexec

# Run the example
python example.py
```

## Database Schema

The metadata is stored as a JSON column on the `agentexec_activity` table:

```sql
ALTER TABLE agentexec_activity ADD COLUMN metadata JSON;
```

For PostgreSQL, this maps to JSONB which supports efficient filtering.

## Notes

- Metadata is immutable once set at enqueue time
- Filtering uses exact string matching on metadata values
- **Metadata is excluded from API serialization by default** to prevent accidental leakage of tenant info
- Access metadata programmatically via the `.metadata` attribute (e.g., `activity.metadata`)
- To include metadata in API responses, explicitly add it to your response model
188 changes: 188 additions & 0 deletions examples/multi-tenancy/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Multi-tenancy example demonstrating activity metadata filtering.

This example shows how to:
1. Attach metadata (like organization_id) when enqueueing tasks
2. Filter activities by metadata for tenant isolation
3. Use metadata in both list and detail views
"""

import asyncio
from uuid import UUID

from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import agentexec as ax


# --- Models ---


class DocumentContext(BaseModel):
"""Input context for document processing."""

file_id: str
filename: str


class ProcessedDocument(BaseModel):
"""Result of document processing."""

file_id: str
word_count: int
summary: str


# --- Setup ---

# Create in-memory SQLite database for demo
engine = create_engine("sqlite:///multi_tenant_demo.db", echo=False)
SessionLocal = sessionmaker(bind=engine)

# Create tables
ax.Base.metadata.create_all(bind=engine)

# Create worker pool
pool = ax.Pool(engine=engine)


# --- Task Definition ---


@pool.task("process_document")
async def process_document(
*,
agent_id: UUID,
context: DocumentContext,
) -> ProcessedDocument:
"""Simulate document processing."""
# Simulate some work
ax.activity.update(agent_id, "Extracting text...", percentage=25)
await asyncio.sleep(0.1)

ax.activity.update(agent_id, "Analyzing content...", percentage=50)
await asyncio.sleep(0.1)

ax.activity.update(agent_id, "Generating summary...", percentage=75)
await asyncio.sleep(0.1)

return ProcessedDocument(
file_id=context.file_id,
word_count=1234,
summary=f"Summary of {context.filename}",
)


# --- Demo Functions ---


async def enqueue_tasks_for_tenants():
"""Enqueue tasks for different organizations."""
print("\n=== Enqueueing Tasks ===\n")

# Organization A: Enqueue 2 tasks
task1 = await ax.enqueue(
"process_document",
DocumentContext(file_id="doc-001", filename="report.pdf"),
metadata={"organization_id": "org-A", "user_id": "user-1"},
)
print(f"Org A - Task 1: {task1.agent_id}")

task2 = await ax.enqueue(
"process_document",
DocumentContext(file_id="doc-002", filename="invoice.pdf"),
metadata={"organization_id": "org-A", "user_id": "user-2"},
)
print(f"Org A - Task 2: {task2.agent_id}")

# Organization B: Enqueue 1 task
task3 = await ax.enqueue(
"process_document",
DocumentContext(file_id="doc-003", filename="contract.pdf"),
metadata={"organization_id": "org-B", "user_id": "user-3"},
)
print(f"Org B - Task 1: {task3.agent_id}")

return task1, task2, task3


def list_activities_for_tenant(org_id: str):
"""List activities filtered by organization."""
print(f"\n=== Activities for {org_id} ===\n")

with SessionLocal() as session:
result = ax.activity.list(
session,
metadata_filter={"organization_id": org_id},
)

print(f"Total activities: {result.total}")
for item in result.items:
print(f" - {item.agent_id}: {item.agent_type} ({item.status})")
print(f" Metadata: {item.metadata}")


def get_activity_detail_with_tenant_check(agent_id: UUID, org_id: str):
"""Get activity detail with tenant validation."""
print(f"\n=== Detail for {agent_id} (checking {org_id}) ===\n")

with SessionLocal() as session:
# This returns None if the activity doesn't belong to the org
detail = ax.activity.detail(
session,
agent_id,
metadata_filter={"organization_id": org_id},
)

if detail:
print(f"Found: {detail.agent_type}")
print(f"Metadata: {detail.metadata}")
print(f"Logs: {len(detail.logs)} entries")
else:
print(f"Not found (or doesn't belong to {org_id})")


async def main():
"""Run the multi-tenancy demo."""
print("Multi-Tenancy Demo with Activity Metadata")
print("=" * 50)

# 1. Enqueue tasks for different organizations
task1, task2, task3 = await enqueue_tasks_for_tenants()

# 2. List all activities (no filter)
print("\n=== All Activities (no filter) ===\n")
with SessionLocal() as session:
all_activities = ax.activity.list(session)
print(f"Total: {all_activities.total}")
for item in all_activities.items:
print(f" - {item.agent_type}: {item.metadata}")

# 3. List activities filtered by organization
list_activities_for_tenant("org-A") # Should show 2 tasks
list_activities_for_tenant("org-B") # Should show 1 task
list_activities_for_tenant("org-C") # Should show 0 tasks

# 4. Detail view with tenant validation
# Try to access Org A's task as Org A (should work)
get_activity_detail_with_tenant_check(task1.agent_id, "org-A")

# Try to access Org A's task as Org B (should return None)
get_activity_detail_with_tenant_check(task1.agent_id, "org-B")

# 5. Filter by multiple metadata fields
print("\n=== Filter by org AND user ===\n")
with SessionLocal() as session:
result = ax.activity.list(
session,
metadata_filter={"organization_id": "org-A", "user_id": "user-1"},
)
print(f"Found {result.total} activities for org-A + user-1")

print("\n" + "=" * 50)
print("Demo complete!")


if __name__ == "__main__":
asyncio.run(main())
Loading