From 98140091078a44b40157989ae96a7e571d7baf7d Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 16 Jan 2026 16:02:24 +0000 Subject: [PATCH 1/5] Add activity metadata support for multi-tenancy Add a JSON metadata field to activity records that can be set when enqueueing tasks and used for filtering in list/detail views. Changes: - Add metadata_ column (JSON) to Activity model - Update Activity.get_list() and get_by_agent_id() with metadata filtering - Add metadata field to ActivityDetailSchema and ActivityListItemSchema - Update activity.create/list/detail tracker functions with metadata params - Update Task.create() to accept and pass through metadata - Update queue.enqueue() to accept metadata parameter - Update Pipeline.enqueue() to accept metadata parameter - Add comprehensive tests for metadata functionality - Add multi-tenancy example demonstrating the feature Usage: task = await ax.enqueue( "process_document", context, metadata={"organization_id": "org-123"} ) activities = activity.list( session, metadata_filter={"organization_id": "org-123"} ) --- examples/multi-tenancy/README.md | 90 ++++++++++++++ examples/multi-tenancy/example.py | 188 ++++++++++++++++++++++++++++++ src/agentexec/activity/models.py | 52 ++++++++- src/agentexec/activity/schemas.py | 3 + src/agentexec/activity/tracker.py | 34 +++++- src/agentexec/core/queue.py | 11 ++ src/agentexec/core/task.py | 17 ++- src/agentexec/pipeline.py | 14 ++- tests/test_activity_tracking.py | 181 ++++++++++++++++++++++++++++ 9 files changed, 578 insertions(+), 12 deletions(-) create mode 100644 examples/multi-tenancy/README.md create mode 100644 examples/multi-tenancy/example.py diff --git a/examples/multi-tenancy/README.md b/examples/multi-tenancy/README.md new file mode 100644 index 0000000..20109da --- /dev/null +++ b/examples/multi-tenancy/README.md @@ -0,0 +1,90 @@ +# Multi-Tenancy with Activity Metadata + +This example demonstrates how to use activity metadata for multi-tenant applications. + +## Overview + +When building multi-tenant applications, you often need to: +1. Associate background tasks with specific organizations/tenants +2. Filter activity views to only show tasks belonging to the current tenant +3. Ensure proper data isolation between tenants + +The `metadata` parameter on `ax.enqueue()` and `pipeline.enqueue()` enables this by attaching arbitrary key-value pairs to activity records. + +## Usage + +### Enqueueing with Metadata + +```python +import agentexec as ax + +# Enqueue a task with organization context +task = await ax.enqueue( + "process_document", + DocumentContext(file_id="doc-123"), + metadata={"organization_id": "org-456", "user_id": "user-789"} +) +``` + +### Filtering Activities by Metadata + +```python +from agentexec import activity + +# List only activities for a specific organization +activities = activity.list( + session, + metadata_filter={"organization_id": "org-456"} +) + +# Get activity detail with tenant validation +# Returns None if the activity doesn't belong to this organization +detail = activity.detail( + session, + agent_id="...", + metadata_filter={"organization_id": "org-456"} +) +``` + +### Pipeline Example + +```python +pipeline = ax.Pipeline(pool) + +class DocumentPipeline(pipeline.Base): + @pipeline.step(0) + async def extract(self, ctx: DocumentContext) -> ExtractedData: + ... + +# Enqueue pipeline with metadata +task = await pipeline.enqueue( + context=DocumentContext(file_id="doc-123"), + metadata={"organization_id": "org-456"} +) +``` + +## Running the Example + +```bash +# Install dependencies +pip install agentexec + +# Run the example +python example.py +``` + +## Database Schema + +The metadata is stored as a JSON column on the `agentexec_activity` table: + +```sql +ALTER TABLE agentexec_activity ADD COLUMN metadata JSON; +``` + +For PostgreSQL, this maps to JSONB which supports efficient filtering. + +## Notes + +- Metadata is immutable once set at enqueue time +- Filtering uses exact string matching on metadata values +- The metadata field is included in both list and detail API responses diff --git a/examples/multi-tenancy/example.py b/examples/multi-tenancy/example.py new file mode 100644 index 0000000..f4a29fb --- /dev/null +++ b/examples/multi-tenancy/example.py @@ -0,0 +1,188 @@ +"""Multi-tenancy example demonstrating activity metadata filtering. + +This example shows how to: +1. Attach metadata (like organization_id) when enqueueing tasks +2. Filter activities by metadata for tenant isolation +3. Use metadata in both list and detail views +""" + +import asyncio +from uuid import UUID + +from pydantic import BaseModel +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +import agentexec as ax + + +# --- Models --- + + +class DocumentContext(BaseModel): + """Input context for document processing.""" + + file_id: str + filename: str + + +class ProcessedDocument(BaseModel): + """Result of document processing.""" + + file_id: str + word_count: int + summary: str + + +# --- Setup --- + +# Create in-memory SQLite database for demo +engine = create_engine("sqlite:///multi_tenant_demo.db", echo=False) +SessionLocal = sessionmaker(bind=engine) + +# Create tables +ax.Base.metadata.create_all(bind=engine) + +# Create worker pool +pool = ax.Pool(engine=engine) + + +# --- Task Definition --- + + +@pool.task("process_document") +async def process_document( + *, + agent_id: UUID, + context: DocumentContext, +) -> ProcessedDocument: + """Simulate document processing.""" + # Simulate some work + ax.activity.update(agent_id, "Extracting text...", percentage=25) + await asyncio.sleep(0.1) + + ax.activity.update(agent_id, "Analyzing content...", percentage=50) + await asyncio.sleep(0.1) + + ax.activity.update(agent_id, "Generating summary...", percentage=75) + await asyncio.sleep(0.1) + + return ProcessedDocument( + file_id=context.file_id, + word_count=1234, + summary=f"Summary of {context.filename}", + ) + + +# --- Demo Functions --- + + +async def enqueue_tasks_for_tenants(): + """Enqueue tasks for different organizations.""" + print("\n=== Enqueueing Tasks ===\n") + + # Organization A: Enqueue 2 tasks + task1 = await ax.enqueue( + "process_document", + DocumentContext(file_id="doc-001", filename="report.pdf"), + metadata={"organization_id": "org-A", "user_id": "user-1"}, + ) + print(f"Org A - Task 1: {task1.agent_id}") + + task2 = await ax.enqueue( + "process_document", + DocumentContext(file_id="doc-002", filename="invoice.pdf"), + metadata={"organization_id": "org-A", "user_id": "user-2"}, + ) + print(f"Org A - Task 2: {task2.agent_id}") + + # Organization B: Enqueue 1 task + task3 = await ax.enqueue( + "process_document", + DocumentContext(file_id="doc-003", filename="contract.pdf"), + metadata={"organization_id": "org-B", "user_id": "user-3"}, + ) + print(f"Org B - Task 1: {task3.agent_id}") + + return task1, task2, task3 + + +def list_activities_for_tenant(org_id: str): + """List activities filtered by organization.""" + print(f"\n=== Activities for {org_id} ===\n") + + with SessionLocal() as session: + result = ax.activity.list( + session, + metadata_filter={"organization_id": org_id}, + ) + + print(f"Total activities: {result.total}") + for item in result.items: + print(f" - {item.agent_id}: {item.agent_type} ({item.status})") + print(f" Metadata: {item.metadata}") + + +def get_activity_detail_with_tenant_check(agent_id: UUID, org_id: str): + """Get activity detail with tenant validation.""" + print(f"\n=== Detail for {agent_id} (checking {org_id}) ===\n") + + with SessionLocal() as session: + # This returns None if the activity doesn't belong to the org + detail = ax.activity.detail( + session, + agent_id, + metadata_filter={"organization_id": org_id}, + ) + + if detail: + print(f"Found: {detail.agent_type}") + print(f"Metadata: {detail.metadata}") + print(f"Logs: {len(detail.logs)} entries") + else: + print(f"Not found (or doesn't belong to {org_id})") + + +async def main(): + """Run the multi-tenancy demo.""" + print("Multi-Tenancy Demo with Activity Metadata") + print("=" * 50) + + # 1. Enqueue tasks for different organizations + task1, task2, task3 = await enqueue_tasks_for_tenants() + + # 2. List all activities (no filter) + print("\n=== All Activities (no filter) ===\n") + with SessionLocal() as session: + all_activities = ax.activity.list(session) + print(f"Total: {all_activities.total}") + for item in all_activities.items: + print(f" - {item.agent_type}: {item.metadata}") + + # 3. List activities filtered by organization + list_activities_for_tenant("org-A") # Should show 2 tasks + list_activities_for_tenant("org-B") # Should show 1 task + list_activities_for_tenant("org-C") # Should show 0 tasks + + # 4. Detail view with tenant validation + # Try to access Org A's task as Org A (should work) + get_activity_detail_with_tenant_check(task1.agent_id, "org-A") + + # Try to access Org A's task as Org B (should return None) + get_activity_detail_with_tenant_check(task1.agent_id, "org-B") + + # 5. Filter by multiple metadata fields + print("\n=== Filter by org AND user ===\n") + with SessionLocal() as session: + result = ax.activity.list( + session, + metadata_filter={"organization_id": "org-A", "user_id": "user-1"}, + ) + print(f"Found {result.total} activities for org-A + user-1") + + print("\n" + "=" * 50) + print("Demo complete!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/agentexec/activity/models.py b/src/agentexec/activity/models.py index 6ad4874..8d05565 100644 --- a/src/agentexec/activity/models.py +++ b/src/agentexec/activity/models.py @@ -3,11 +3,14 @@ import uuid from datetime import UTC, datetime +from typing import Any + from sqlalchemy import ( DateTime, Enum, ForeignKey, Integer, + JSON, String, Text, Uuid, @@ -58,6 +61,12 @@ def __tablename__(cls) -> str: default=lambda: datetime.now(UTC), onupdate=lambda: datetime.now(UTC), ) + metadata_: Mapped[dict[str, Any] | None] = mapped_column( + "metadata", + JSON, + nullable=True, + default=None, + ) logs: Mapped[list[ActivityLog]] = relationship( "ActivityLog", @@ -113,15 +122,19 @@ def get_by_agent_id( cls, session: Session, agent_id: str | uuid.UUID, + metadata_filter: dict[str, Any] | None = None, ) -> Activity | None: """Get an activity by agent_id. Args: session: SQLAlchemy session agent_id: The agent_id to look up (string or UUID) + metadata_filter: Optional dict of key-value pairs to filter by. + If provided and the activity's metadata doesn't match, + returns None (same as if not found). Returns: - Activity object or None if not found + Activity object or None if not found or metadata doesn't match Example: activity = Activity.get_by_agent_id(session, "abc-123") @@ -129,11 +142,26 @@ def get_by_agent_id( activity = Activity.get_by_agent_id(session, uuid.UUID("abc-123...")) if activity: print(f"Found activity: {activity.agent_type}") + + # With metadata filter (for multi-tenancy) + activity = Activity.get_by_agent_id( + session, + agent_id, + metadata_filter={"organization_id": "org-123"} + ) """ # Normalize to UUID if string if isinstance(agent_id, str): agent_id = uuid.UUID(agent_id) - return session.query(cls).filter_by(agent_id=agent_id).first() + + query = session.query(cls).filter_by(agent_id=agent_id) + + # Apply metadata filtering if provided + if metadata_filter: + for key, value in metadata_filter.items(): + query = query.filter(cls.metadata_[key].as_string() == str(value)) + + return query.first() @classmethod def get_list( @@ -141,6 +169,7 @@ def get_list( session: Session, page: int = 1, page_size: int = 50, + metadata_filter: dict[str, Any] | None = None, ) -> list[RowMapping]: """Get a paginated list of activities with summary information. @@ -148,16 +177,25 @@ def get_list( session: SQLAlchemy session to use for the query page: Page number (1-indexed) page_size: Number of items per page + metadata_filter: Optional dict of key-value pairs to filter by. + Activities must have metadata containing all specified keys + with exactly matching values. Returns: List of RowMapping objects (dict-like) with keys matching ActivitySummarySchema: agent_id, agent_type, latest_log_message, status, latest_log_timestamp, - percentage, started_at + percentage, started_at, metadata Example: results = Activity.get_list(session, page=1, page_size=20) for row in results: print(f"{row['agent_id']}: {row['latest_log_message']}") + + # Filter by organization + results = Activity.get_list( + session, + metadata_filter={"organization_id": "org-123"} + ) """ # Subquery to get the latest log for each agent latest_log_subq = select( @@ -198,6 +236,7 @@ def get_list( latest_log.c.created_at.label("latest_log_timestamp"), latest_log.c.percentage, started_at.c.started_at, + cls.metadata_.label("metadata"), ) .outerjoin( latest_log, @@ -206,6 +245,13 @@ def get_list( .outerjoin(started_at, cls.id == started_at.c.activity_id) ) + # Apply metadata filtering if provided + if metadata_filter: + for key, value in metadata_filter.items(): + # Use JSON path extraction for exact string matching + # This works across SQLite (for testing) and PostgreSQL (for production) + query = query.where(cls.metadata_[key].as_string() == str(value)) + # Custom ordering: active agents (running, queued) at the top is_active = case( (latest_log.c.status.in_([Status.RUNNING, Status.QUEUED]), 0), diff --git a/src/agentexec/activity/schemas.py b/src/agentexec/activity/schemas.py index f0ff930..dc3d6d2 100644 --- a/src/agentexec/activity/schemas.py +++ b/src/agentexec/activity/schemas.py @@ -1,5 +1,6 @@ import uuid from datetime import datetime +from typing import Any from pydantic import BaseModel, ConfigDict, Field, computed_field @@ -29,6 +30,7 @@ class ActivityDetailSchema(BaseModel): created_at: datetime updated_at: datetime logs: list[ActivityLogSchema] = Field(default_factory=list) + metadata: dict[str, Any] | None = Field(default=None, alias="metadata_") class ActivityListItemSchema(BaseModel): @@ -47,6 +49,7 @@ class ActivityListItemSchema(BaseModel): latest_log_timestamp: datetime | None = None percentage: int | None = 0 started_at: datetime | None = None + metadata: dict[str, Any] | None = None @computed_field def elapsed_time_seconds(self) -> int: diff --git a/src/agentexec/activity/tracker.py b/src/agentexec/activity/tracker.py index 9bcc7e8..12aeff8 100644 --- a/src/agentexec/activity/tracker.py +++ b/src/agentexec/activity/tracker.py @@ -1,4 +1,5 @@ import uuid +from typing import Any from sqlalchemy.orm import Session @@ -45,13 +46,17 @@ def create( message: str = "Agent queued", agent_id: str | uuid.UUID | None = None, session: Session | None = None, + metadata: dict[str, Any] | None = None, ) -> uuid.UUID: """Create a new agent activity record with initial queued status. Args: task_name: Name/type of the task (e.g., "research", "analysis") - initial_message: Initial log message (default: "Agent queued") + message: Initial log message (default: "Agent queued") agent_id: Optional custom agent ID (string or UUID). If not provided, one will be auto-generated. + session: Optional SQLAlchemy session. If not provided, uses global session factory. + metadata: Optional dict of arbitrary metadata to attach to the activity. + Useful for multi-tenancy (e.g., {"organization_id": "org-123"}). Returns: The agent_id (as UUID object) of the created record @@ -62,6 +67,7 @@ def create( activity_record = Activity( agent_id=agent_id, agent_type=task_name, + metadata_=metadata, ) db.add(activity_record) db.flush() @@ -208,6 +214,7 @@ def list( session: Session, page: int = 1, page_size: int = 50, + metadata_filter: dict[str, Any] | None = None, ) -> ActivityListSchema: """List activities with pagination. @@ -215,12 +222,26 @@ def list( session: SQLAlchemy session to use for the query page: Page number (1-indexed) page_size: Number of items per page + metadata_filter: Optional dict of key-value pairs to filter by. + Activities must have metadata containing all specified keys + with exactly matching values. Returns: ActivityList with list of ActivityListItemSchema items """ - total = session.query(Activity).count() - rows = Activity.get_list(session, page=page, page_size=page_size) + # Build base query for total count + query = session.query(Activity) + if metadata_filter: + for key, value in metadata_filter.items(): + query = query.filter(Activity.metadata_[key].as_string() == str(value)) + total = query.count() + + rows = Activity.get_list( + session, + page=page, + page_size=page_size, + metadata_filter=metadata_filter, + ) return ActivityListSchema( items=[ActivityListItemSchema.model_validate(row) for row in rows], @@ -233,17 +254,22 @@ def list( def detail( session: Session, agent_id: str | uuid.UUID, + metadata_filter: dict[str, Any] | None = None, ) -> ActivityDetailSchema | None: """Get a single activity by agent_id with all logs. Args: session: SQLAlchemy session to use for the query agent_id: The agent_id to look up + metadata_filter: Optional dict of key-value pairs to filter by. + If provided and the activity's metadata doesn't match, + returns None (same as if not found). Returns: ActivityDetailSchema with full log history, or None if not found + or if metadata doesn't match """ - if item := Activity.get_by_agent_id(session, agent_id): + if item := Activity.get_by_agent_id(session, agent_id, metadata_filter=metadata_filter): return ActivityDetailSchema.model_validate(item) return None diff --git a/src/agentexec/core/queue.py b/src/agentexec/core/queue.py index bf0328b..56b5316 100644 --- a/src/agentexec/core/queue.py +++ b/src/agentexec/core/queue.py @@ -29,6 +29,7 @@ async def enqueue( *, priority: Priority = Priority.LOW, queue_name: str | None = None, + metadata: dict[str, Any] | None = None, ) -> Task: """Enqueue a task for background execution. @@ -40,6 +41,8 @@ async def enqueue( context: Task context as a Pydantic BaseModel. priority: Task priority (Priority.HIGH or Priority.LOW). queue_name: Queue name. Defaults to CONF.queue_name. + metadata: Optional dict of arbitrary metadata to attach to the activity. + Useful for multi-tenancy (e.g., {"organization_id": "org-123"}). Returns: Task instance with typed context and agent_id for tracking. @@ -50,6 +53,13 @@ async def research(agent_id: UUID, context: ResearchContext): ... task = await ax.enqueue("research_company", ResearchContext(company="Acme")) + + # With metadata for multi-tenancy + task = await ax.enqueue( + "research_company", + ResearchContext(company="Acme"), + metadata={"organization_id": "org-123"} + ) """ push_func = { Priority.HIGH: state.backend.rpush, @@ -59,6 +69,7 @@ async def research(agent_id: UUID, context: ResearchContext): task = Task.create( task_name=task_name, context=context, + metadata=metadata, ) push_func( queue_name or CONF.queue_name, diff --git a/src/agentexec/core/task.py b/src/agentexec/core/task.py index cc1221d..0c6471f 100644 --- a/src/agentexec/core/task.py +++ b/src/agentexec/core/task.py @@ -215,7 +215,12 @@ def from_serialized(cls, definition: TaskDefinition, data: dict[str, Any]) -> Ta return task @classmethod - def create(cls, task_name: str, context: BaseModel) -> Task: + def create( + cls, + task_name: str, + context: BaseModel, + metadata: dict[str, Any] | None = None, + ) -> Task: """Create a new task with automatic activity tracking. This is a convenience method that creates both a Task instance and @@ -224,6 +229,8 @@ def create(cls, task_name: str, context: BaseModel) -> Task: Args: task_name: Name/type of the task (e.g., "research", "analysis") context: Task context as a Pydantic model + metadata: Optional dict of arbitrary metadata to attach to the activity. + Useful for multi-tenancy (e.g., {"organization_id": "org-123"}). Returns: Task instance with agent_id set @@ -232,10 +239,18 @@ def create(cls, task_name: str, context: BaseModel) -> Task: ctx = ResearchContext(company="Acme") task = Task.create("research_company", ctx) task.context.company # Typed access + + # With metadata for multi-tenancy + task = Task.create( + "research_company", + ctx, + metadata={"organization_id": "org-123"} + ) """ agent_id = activity.create( task_name=task_name, message=CONF.activity_message_create, + metadata=metadata, ) return cls( diff --git a/src/agentexec/pipeline.py b/src/agentexec/pipeline.py index 1e063c6..9801e79 100644 --- a/src/agentexec/pipeline.py +++ b/src/agentexec/pipeline.py @@ -11,9 +11,9 @@ TypeAlias, Union, cast, - get_type_hints, - get_origin, get_args, + get_origin, + get_type_hints, ) from uuid import UUID @@ -320,17 +320,23 @@ def decorator(func: StepHandler) -> StepHandler: return decorator - async def enqueue(self, context: BaseModel) -> Task: + async def enqueue( + self, + context: BaseModel, + metadata: dict[str, Any] | None = None, + ) -> Task: """Enqueue the pipeline to run on a worker. Args: context: Initial context passed to the first step + metadata: Optional dict of arbitrary metadata to attach to the activity. + Useful for multi-tenancy (e.g., {"organization_id": "org-123"}). Returns: Task instance for tracking the pipeline execution """ self._validate_type_flow() - return await queue.enqueue(self.name, context) + return await queue.enqueue(self.name, context, metadata=metadata) async def run(self, context: BaseModel) -> StepResult: """Execute the pipeline inline (blocking). diff --git a/tests/test_activity_tracking.py b/tests/test_activity_tracking.py index 2893552..72db7e8 100644 --- a/tests/test_activity_tracking.py +++ b/tests/test_activity_tracking.py @@ -353,3 +353,184 @@ def test_create_activity_with_string_agent_id(db_session: Session): ) assert agent_id == custom_id + + +# --- Metadata Tests --- + + +def test_create_activity_with_metadata(db_session: Session): + """Test creating activity with metadata.""" + agent_id = activity.create( + task_name="metadata_task", + message="Test with metadata", + session=db_session, + metadata={"organization_id": "org-123", "user_id": "user-456"}, + ) + + activity_record = Activity.get_by_agent_id(db_session, agent_id) + assert activity_record is not None + assert activity_record.metadata_ == {"organization_id": "org-123", "user_id": "user-456"} + + +def test_create_activity_without_metadata(db_session: Session): + """Test that metadata is None by default.""" + agent_id = activity.create( + task_name="no_metadata_task", + message="Test without metadata", + session=db_session, + ) + + activity_record = Activity.get_by_agent_id(db_session, agent_id) + assert activity_record is not None + assert activity_record.metadata_ is None + + +def test_list_activities_with_metadata_filter(db_session: Session): + """Test filtering activities by metadata.""" + # Create activities for different organizations + activity.create( + task_name="task_org_a", + message="Org A task", + session=db_session, + metadata={"organization_id": "org-A"}, + ) + activity.create( + task_name="task_org_a_2", + message="Org A task 2", + session=db_session, + metadata={"organization_id": "org-A"}, + ) + activity.create( + task_name="task_org_b", + message="Org B task", + session=db_session, + metadata={"organization_id": "org-B"}, + ) + + # Filter by org-A + result = activity.list( + db_session, + metadata_filter={"organization_id": "org-A"}, + ) + assert result.total == 2 + assert len(result.items) == 2 + for item in result.items: + assert item.metadata["organization_id"] == "org-A" + + # Filter by org-B + result = activity.list( + db_session, + metadata_filter={"organization_id": "org-B"}, + ) + assert result.total == 1 + assert result.items[0].metadata["organization_id"] == "org-B" + + # Filter by non-existent org + result = activity.list( + db_session, + metadata_filter={"organization_id": "org-C"}, + ) + assert result.total == 0 + + +def test_list_activities_with_multiple_metadata_filters(db_session: Session): + """Test filtering activities by multiple metadata fields.""" + activity.create( + task_name="task_1", + message="User 1 in Org A", + session=db_session, + metadata={"organization_id": "org-A", "user_id": "user-1"}, + ) + activity.create( + task_name="task_2", + message="User 2 in Org A", + session=db_session, + metadata={"organization_id": "org-A", "user_id": "user-2"}, + ) + + # Filter by both org and user + result = activity.list( + db_session, + metadata_filter={"organization_id": "org-A", "user_id": "user-1"}, + ) + assert result.total == 1 + + +def test_detail_activity_with_metadata(db_session: Session): + """Test getting activity detail includes metadata.""" + agent_id = activity.create( + task_name="detailed_metadata_task", + message="Test", + session=db_session, + metadata={"organization_id": "org-123"}, + ) + + result = activity.detail(db_session, agent_id) + assert result is not None + assert result.metadata == {"organization_id": "org-123"} + + +def test_detail_activity_with_metadata_filter_match(db_session: Session): + """Test detail returns activity when metadata filter matches.""" + agent_id = activity.create( + task_name="filter_match_task", + message="Test", + session=db_session, + metadata={"organization_id": "org-A"}, + ) + + result = activity.detail( + db_session, + agent_id, + metadata_filter={"organization_id": "org-A"}, + ) + assert result is not None + assert result.agent_id == agent_id + + +def test_detail_activity_with_metadata_filter_no_match(db_session: Session): + """Test detail returns None when metadata filter doesn't match.""" + agent_id = activity.create( + task_name="filter_no_match_task", + message="Test", + session=db_session, + metadata={"organization_id": "org-A"}, + ) + + # Try to access with wrong organization + result = activity.detail( + db_session, + agent_id, + metadata_filter={"organization_id": "org-B"}, + ) + assert result is None + + +def test_detail_activity_no_metadata_with_filter(db_session: Session): + """Test detail returns None when activity has no metadata but filter is applied.""" + agent_id = activity.create( + task_name="no_metadata_with_filter", + message="Test", + session=db_session, + ) + + result = activity.detail( + db_session, + agent_id, + metadata_filter={"organization_id": "org-A"}, + ) + assert result is None + + +def test_list_metadata_included_in_response(db_session: Session): + """Test that metadata is included in list item response.""" + activity.create( + task_name="list_metadata_task", + message="Test", + session=db_session, + metadata={"key1": "value1", "key2": "value2"}, + ) + + result = activity.list(db_session) + assert result.total == 1 + assert result.items[0].metadata == {"key1": "value1", "key2": "value2"} From c9d65ce8b3d6b732ebdde10673c34da01bf6145d Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 16 Jan 2026 17:35:44 +0000 Subject: [PATCH 2/5] Document activity metadata feature in README Add brief mentions of the metadata parameter for multi-tenancy: - New "Activity Metadata" section in Supported Patterns - Add metadata field to database schema docs - Update Module Reference for enqueue, activity.list/detail, pipeline - Link to multi-tenancy example --- README.md | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 752ff3e..54b3b32 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,22 @@ That's it. Tasks are queued to Redis, workers process them in parallel, progress ## Supported Patterns +### Activity Metadata (Multi-Tenancy) + +Attach arbitrary metadata when enqueueing tasks for filtering and tenant isolation: + +```python +task = await ax.enqueue( + "process_document", + context, + metadata={"organization_id": "org-123", "user_id": "user-456"} +) + +# Filter activities by metadata +activities = ax.activity.list(db, metadata_filter={"organization_id": "org-123"}) +detail = ax.activity.detail(db, agent_id, metadata_filter={"organization_id": "org-123"}) +``` + ### Automatic Activity Tracking Every task gets full lifecycle tracking without manual updates: @@ -386,6 +402,7 @@ Activity tracking uses SQLAlchemy with two tables: **`agentexec_activity`** - Main activity records - `agent_id` - Unique identifier (UUID) - `agent_type` - Task name +- `metadata` - JSON field for custom data (e.g., tenant info) - `created_at`, `updated_at` - Timestamps **`agentexec_activity_log`** - Status and progress @@ -560,6 +577,7 @@ The components are headless (no built-in styling) and work with any CSS framewor import agentexec as ax task = await ax.enqueue(task_name, context, priority=ax.Priority.LOW) +task = await ax.enqueue(task_name, context, metadata={"org_id": "..."}) # With metadata result = await ax.get_result(task, timeout=300) results = await ax.gather(task1, task2, task3) ``` @@ -586,16 +604,18 @@ pool.shutdown() # Graceful shutdown import agentexec as ax # Create activity (returns agent_id for tracking) -agent_id = ax.activity.create(task_name, message="Starting...") +agent_id = ax.activity.create(task_name, message="Starting...", metadata={"org_id": "..."}) # Update progress ax.activity.update(agent_id, message, percentage=50) ax.activity.complete(agent_id, message="Done") ax.activity.error(agent_id, error="Failed: ...") -# Query activities +# Query activities (with optional metadata filtering) activities = ax.activity.list(db, page=1, page_size=20) +activities = ax.activity.list(db, metadata_filter={"org_id": "..."}) activity = ax.activity.detail(db, agent_id=agent_id) +activity = ax.activity.detail(db, agent_id, metadata_filter={"org_id": "..."}) count = ax.activity.active_count(db) # Cleanup @@ -634,6 +654,9 @@ pipeline = ax.Pipeline(pool) class MyPipeline(pipeline.Base): @pipeline.step(0, "description") async def step_one(self, context): ... + +task = await pipeline.enqueue(context) +task = await pipeline.enqueue(context, metadata={"org_id": "..."}) # With metadata ``` ### Tracker @@ -733,4 +756,5 @@ MIT License - see [LICENSE](LICENSE) for details. - **npm**: [agentexec-ui](https://www.npmjs.com/package/agentexec-ui) - **Documentation**: [docs/](docs/) - **Example App**: [examples/openai-agents-fastapi/](examples/openai-agents-fastapi/) +- **Multi-Tenancy Example**: [examples/multi-tenancy/](examples/multi-tenancy/) - **Issues**: [GitHub Issues](https://github.com/Agent-CI/agentexec/issues) From bdc0738d08eacc896ab374d9db159393aa41b75f Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 16 Jan 2026 17:36:45 +0000 Subject: [PATCH 3/5] Keep metadata examples only in dedicated section --- README.md | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 54b3b32..141342a 100644 --- a/README.md +++ b/README.md @@ -577,7 +577,6 @@ The components are headless (no built-in styling) and work with any CSS framewor import agentexec as ax task = await ax.enqueue(task_name, context, priority=ax.Priority.LOW) -task = await ax.enqueue(task_name, context, metadata={"org_id": "..."}) # With metadata result = await ax.get_result(task, timeout=300) results = await ax.gather(task1, task2, task3) ``` @@ -604,18 +603,16 @@ pool.shutdown() # Graceful shutdown import agentexec as ax # Create activity (returns agent_id for tracking) -agent_id = ax.activity.create(task_name, message="Starting...", metadata={"org_id": "..."}) +agent_id = ax.activity.create(task_name, message="Starting...") # Update progress ax.activity.update(agent_id, message, percentage=50) ax.activity.complete(agent_id, message="Done") ax.activity.error(agent_id, error="Failed: ...") -# Query activities (with optional metadata filtering) +# Query activities activities = ax.activity.list(db, page=1, page_size=20) -activities = ax.activity.list(db, metadata_filter={"org_id": "..."}) activity = ax.activity.detail(db, agent_id=agent_id) -activity = ax.activity.detail(db, agent_id, metadata_filter={"org_id": "..."}) count = ax.activity.active_count(db) # Cleanup @@ -654,9 +651,6 @@ pipeline = ax.Pipeline(pool) class MyPipeline(pipeline.Base): @pipeline.step(0, "description") async def step_one(self, context): ... - -task = await pipeline.enqueue(context) -task = await pipeline.enqueue(context, metadata={"org_id": "..."}) # With metadata ``` ### Tracker From 2442bc51f9cc0719dcd1e27d9967e49b7d6d77ad Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 16 Jan 2026 17:39:42 +0000 Subject: [PATCH 4/5] Exclude metadata from API serialization by default Metadata is now excluded from model_dump() and JSON serialization to prevent accidental leakage of tenant info through API responses. The field is still accessible as an attribute for programmatic use (e.g., item.metadata), but won't appear in API responses unless explicitly included by the user. --- src/agentexec/activity/schemas.py | 4 ++-- tests/test_activity_tracking.py | 33 +++++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/agentexec/activity/schemas.py b/src/agentexec/activity/schemas.py index dc3d6d2..e326348 100644 --- a/src/agentexec/activity/schemas.py +++ b/src/agentexec/activity/schemas.py @@ -30,7 +30,7 @@ class ActivityDetailSchema(BaseModel): created_at: datetime updated_at: datetime logs: list[ActivityLogSchema] = Field(default_factory=list) - metadata: dict[str, Any] | None = Field(default=None, alias="metadata_") + metadata: dict[str, Any] | None = Field(default=None, alias="metadata_", exclude=True) class ActivityListItemSchema(BaseModel): @@ -49,7 +49,7 @@ class ActivityListItemSchema(BaseModel): latest_log_timestamp: datetime | None = None percentage: int | None = 0 started_at: datetime | None = None - metadata: dict[str, Any] | None = None + metadata: dict[str, Any] | None = Field(default=None, exclude=True) @computed_field def elapsed_time_seconds(self) -> int: diff --git a/tests/test_activity_tracking.py b/tests/test_activity_tracking.py index 72db7e8..5b5d0fa 100644 --- a/tests/test_activity_tracking.py +++ b/tests/test_activity_tracking.py @@ -522,8 +522,8 @@ def test_detail_activity_no_metadata_with_filter(db_session: Session): assert result is None -def test_list_metadata_included_in_response(db_session: Session): - """Test that metadata is included in list item response.""" +def test_list_metadata_accessible_as_attribute(db_session: Session): + """Test that metadata is accessible as an attribute on schema objects.""" activity.create( task_name="list_metadata_task", message="Test", @@ -533,4 +533,33 @@ def test_list_metadata_included_in_response(db_session: Session): result = activity.list(db_session) assert result.total == 1 + # Metadata is accessible as attribute for programmatic use assert result.items[0].metadata == {"key1": "value1", "key2": "value2"} + + +def test_metadata_excluded_from_serialization(db_session: Session): + """Test that metadata is excluded from JSON/dict serialization by default. + + This prevents accidental leakage of tenant info through API responses. + Users who want metadata in responses should explicitly include it. + """ + agent_id = activity.create( + task_name="serialization_test", + message="Test", + session=db_session, + metadata={"organization_id": "org-123", "secret": "sensitive"}, + ) + + # List view - metadata excluded from serialization + result = activity.list(db_session) + item_dict = result.items[0].model_dump() + assert "metadata" not in item_dict + + # Detail view - metadata excluded from serialization + detail = activity.detail(db_session, agent_id) + detail_dict = detail.model_dump() + assert "metadata" not in detail_dict + + # But still accessible as attribute for internal use + assert result.items[0].metadata == {"organization_id": "org-123", "secret": "sensitive"} + assert detail.metadata == {"organization_id": "org-123", "secret": "sensitive"} From 6debbd4b4d6c1330442fe0a3844524de1a013316 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 16 Jan 2026 17:40:04 +0000 Subject: [PATCH 5/5] Document that metadata is excluded from API serialization --- README.md | 3 +++ examples/multi-tenancy/README.md | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 141342a..ff0c8aa 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,9 @@ task = await ax.enqueue( # Filter activities by metadata activities = ax.activity.list(db, metadata_filter={"organization_id": "org-123"}) detail = ax.activity.detail(db, agent_id, metadata_filter={"organization_id": "org-123"}) + +# Access metadata programmatically (excluded from API serialization by default) +org_id = detail.metadata["organization_id"] ``` ### Automatic Activity Tracking diff --git a/examples/multi-tenancy/README.md b/examples/multi-tenancy/README.md index 20109da..a086c5a 100644 --- a/examples/multi-tenancy/README.md +++ b/examples/multi-tenancy/README.md @@ -87,4 +87,6 @@ For PostgreSQL, this maps to JSONB which supports efficient filtering. - Metadata is immutable once set at enqueue time - Filtering uses exact string matching on metadata values -- The metadata field is included in both list and detail API responses +- **Metadata is excluded from API serialization by default** to prevent accidental leakage of tenant info +- Access metadata programmatically via the `.metadata` attribute (e.g., `activity.metadata`) +- To include metadata in API responses, explicitly add it to your response model