-
Notifications
You must be signed in to change notification settings - Fork 325
feat: Support for database backend Task Store #259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
b00905a
feat(db): support multiple SQL backends with optional extras (cont. #…
zboyles 88a9f38
Merge branch 'main' into sql-support
kthota-g 8ab1bb6
Use external AsyncEngine for TaskStore
kthota-g dccd507
Merge main into sql-support
kthota-g fbf03f3
Refactoring and bug fixes
kthota-g a6be65a
fix types test
kthota-g 82236fd
Merge branch 'main' into sql-support
kthota-g 098e0a5
Address review comments
kthota-g 8ab33b4
Refactoring
kthota-g 13236e5
fix linting errors
kthota-g 6497590
fix linting errors
kthota-g ef5ab00
Remove overrides
kthota-g 60379e9
Fix linting errors
kthota-g 45aa3ed
fix project name
kthota-g 3427bee
Spelling/formatting
holtskinner 3b5f8d6
Update .github/workflows/unit-tests.yml
holtskinner a579d61
Merge branch 'main' into sql-support
holtskinner 23163ef
Merge branch 'sql-support' of https://github.com/a2aproject/a2a-pytho…
holtskinner 6e65327
Formatting
holtskinner d44c30f
Merge main into sql-support
kthota-g e572806
Merge branch 'main' into sql-support
kthota-g 9df61f1
Remove pg docker image
kthota-g File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| -- Create a dedicated user for the application | ||
| CREATE USER a2a WITH PASSWORD 'a2a_password'; | ||
|
|
||
| -- Create the tasks database | ||
| CREATE DATABASE a2a_tasks; | ||
kthota-g marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| GRANT ALL PRIVILEGES ON DATABASE a2a_test TO a2a; | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,210 @@ | ||
| from typing import TYPE_CHECKING, Any, Generic, TypeVar | ||
|
|
||
|
|
||
| if TYPE_CHECKING: | ||
| from typing_extensions import override | ||
| else: | ||
|
|
||
| def override(func): # noqa: ANN001, ANN201 | ||
| """Override decorator.""" | ||
| return func | ||
|
|
||
|
|
||
| from pydantic import BaseModel | ||
|
|
||
| from a2a.types import Artifact, Message, TaskStatus | ||
|
|
||
|
|
||
| try: | ||
| from sqlalchemy import JSON, Dialect, String | ||
| from sqlalchemy.orm import ( | ||
| DeclarativeBase, | ||
| Mapped, | ||
| declared_attr, | ||
| mapped_column, | ||
| ) | ||
| from sqlalchemy.types import TypeDecorator | ||
| except ImportError as e: | ||
| raise ImportError( | ||
| 'Database models require SQLAlchemy. ' | ||
| 'Install with one of: ' | ||
| "'pip install a2a-sdk[postgresql]', " | ||
| "'pip install a2a-sdk[mysql]', " | ||
| "'pip install a2a-sdk[sqlite]', " | ||
| "or 'pip install a2a-sdk[sql]'" | ||
| ) from e | ||
|
|
||
|
|
||
| T = TypeVar('T', bound=BaseModel) | ||
|
|
||
|
|
||
| class PydanticType(TypeDecorator[T], Generic[T]): | ||
| """SQLAlchemy type that handles Pydantic model serialization.""" | ||
|
|
||
| impl = JSON | ||
| cache_ok = True | ||
|
|
||
| def __init__(self, pydantic_type: type[T], **kwargs: dict[str, Any]): | ||
| """Initialize the PydanticType. | ||
|
|
||
| Args: | ||
| pydantic_type: The Pydantic model type to handle. | ||
| **kwargs: Additional arguments for TypeDecorator. | ||
| """ | ||
| self.pydantic_type = pydantic_type | ||
| super().__init__(**kwargs) | ||
|
|
||
| def process_bind_param( | ||
| self, value: T | None, dialect: Dialect | ||
| ) -> dict[str, Any] | None: | ||
| """Convert Pydantic model to a JSON-serializable dictionary for the database.""" | ||
| if value is None: | ||
| return None | ||
| return ( | ||
| value.model_dump(mode='json') | ||
| if isinstance(value, BaseModel) | ||
| else value | ||
| ) | ||
|
|
||
| def process_result_value( | ||
| self, value: dict[str, Any] | None, dialect: Dialect | ||
| ) -> T | None: | ||
| """Convert a JSON-like dictionary from the database back to a Pydantic model.""" | ||
| if value is None: | ||
| return None | ||
| return self.pydantic_type.model_validate(value) | ||
|
|
||
|
|
||
| class PydanticListType(TypeDecorator, Generic[T]): | ||
| """SQLAlchemy type that handles lists of Pydantic models.""" | ||
|
|
||
| impl = JSON | ||
| cache_ok = True | ||
|
|
||
| def __init__(self, pydantic_type: type[T], **kwargs: dict[str, Any]): | ||
| """Initialize the PydanticListType. | ||
|
|
||
| Args: | ||
| pydantic_type: The Pydantic model type for items in the list. | ||
| **kwargs: Additional arguments for TypeDecorator. | ||
| """ | ||
| self.pydantic_type = pydantic_type | ||
| super().__init__(**kwargs) | ||
|
|
||
| def process_bind_param( | ||
| self, value: list[T] | None, dialect: Dialect | ||
| ) -> list[dict[str, Any]] | None: | ||
| """Convert a list of Pydantic models to a JSON-serializable list for the DB.""" | ||
| if value is None: | ||
| return None | ||
| return [ | ||
| item.model_dump(mode='json') | ||
| if isinstance(item, BaseModel) | ||
| else item | ||
| for item in value | ||
| ] | ||
|
|
||
| def process_result_value( | ||
| self, value: list[dict[str, Any]] | None, dialect: Dialect | ||
| ) -> list[T] | None: | ||
| """Convert a JSON-like list from the DB back to a list of Pydantic models.""" | ||
| if value is None: | ||
| return None | ||
| return [self.pydantic_type.model_validate(item) for item in value] | ||
|
|
||
|
|
||
| # Base class for all database models | ||
| class Base(DeclarativeBase): | ||
| """Base class for declarative models in A2A SDK.""" | ||
|
|
||
|
|
||
| # TaskMixin that can be used with any table name | ||
| class TaskMixin: | ||
| """Mixin providing standard task columns with proper type handling.""" | ||
|
|
||
| id: Mapped[str] = mapped_column(String(36), primary_key=True, index=True) | ||
| contextId: Mapped[str] = mapped_column(String(36), nullable=False) # noqa: N815 | ||
| kind: Mapped[str] = mapped_column( | ||
| String(16), nullable=False, default='task' | ||
| ) | ||
|
|
||
| # Properly typed Pydantic fields with automatic serialization | ||
| status: Mapped[TaskStatus] = mapped_column(PydanticType(TaskStatus)) | ||
| artifacts: Mapped[list[Artifact] | None] = mapped_column( | ||
| PydanticListType(Artifact), nullable=True | ||
| ) | ||
| history: Mapped[list[Message] | None] = mapped_column( | ||
| PydanticListType(Message), nullable=True | ||
| ) | ||
|
|
||
| # Using declared_attr to avoid conflict with Pydantic's metadata | ||
| @declared_attr | ||
| @classmethod | ||
| def task_metadata(cls) -> Mapped[dict[str, Any] | None]: | ||
| """Define the 'metadata' column, avoiding name conflicts with Pydantic.""" | ||
| return mapped_column(JSON, nullable=True, name='metadata') | ||
|
|
||
| @override | ||
| def __repr__(self) -> str: | ||
| """Return a string representation of the task.""" | ||
| repr_template = ( | ||
| '<{CLS}(id="{ID}", contextId="{CTX_ID}", status="{STATUS}")>' | ||
| ) | ||
| return repr_template.format( | ||
| CLS=self.__class__.__name__, | ||
| ID=self.id, | ||
| CTX_ID=self.contextId, | ||
| STATUS=self.status, | ||
| ) | ||
|
|
||
|
|
||
| def create_task_model( | ||
| table_name: str = 'tasks', base: type[DeclarativeBase] = Base | ||
| ) -> type: | ||
| """Create a TaskModel class with a configurable table name. | ||
|
|
||
| Args: | ||
| table_name: Name of the database table. Defaults to 'tasks'. | ||
| base: Base declarative class to use. Defaults to the SDK's Base class. | ||
|
|
||
| Returns: | ||
| TaskModel class with the specified table name. | ||
|
|
||
| Example: | ||
| # Create a task model with default table name | ||
| TaskModel = create_task_model() | ||
|
|
||
| # Create a task model with custom table name | ||
| CustomTaskModel = create_task_model('my_tasks') | ||
|
|
||
| # Use with a custom base | ||
| from myapp.database import Base as MyBase | ||
| TaskModel = create_task_model('tasks', MyBase) | ||
| """ | ||
|
|
||
| class TaskModel(TaskMixin, base): | ||
| __tablename__ = table_name | ||
|
|
||
| @override | ||
| def __repr__(self) -> str: | ||
| """Return a string representation of the task.""" | ||
| repr_template = '<TaskModel[{TABLE}](id="{ID}", contextId="{CTX_ID}", status="{STATUS}")>' | ||
| return repr_template.format( | ||
| TABLE=table_name, | ||
| ID=self.id, | ||
| CTX_ID=self.contextId, | ||
| STATUS=self.status, | ||
| ) | ||
|
|
||
| # Set a dynamic name for better debugging | ||
| TaskModel.__name__ = f'TaskModel_{table_name}' | ||
| TaskModel.__qualname__ = f'TaskModel_{table_name}' | ||
|
|
||
| return TaskModel | ||
|
|
||
|
|
||
| # Default TaskModel for backward compatibility | ||
| class TaskModel(TaskMixin, Base): | ||
| """Default task model with standard table name.""" | ||
|
|
||
| __tablename__ = 'tasks' |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.