Python: Add Cosmos DB NoSQL Checkpoint Storage for Python Workflows#4916
Python: Add Cosmos DB NoSQL Checkpoint Storage for Python Workflows#4916aayush3011 wants to merge 8 commits intomicrosoft:mainfrom
Conversation
Add native Cosmos DB NoSQL support for workflow checkpoint storage in the Python agent-framework-azure-cosmos package, achieving parity with the existing .NET CosmosCheckpointStore. New files: - _checkpoint_storage.py: CosmosCheckpointStorage implementing the CheckpointStorage protocol with 6 methods (save, load, list_checkpoints, delete, get_latest, list_checkpoint_ids) - test_cosmos_checkpoint_storage.py: Unit and integration tests - workflow_checkpointing.py: Sample demonstrating Cosmos DB-backed workflow checkpoint/resume Auth support: - Managed identity / RBAC via Azure credential objects (DefaultAzureCredential, ManagedIdentityCredential, etc.) - Key-based auth via account key string or AZURE_COSMOS_KEY env var - Pre-created CosmosClient or ContainerProxy Key design decisions: - Partition key: /workflow_name for efficient per-workflow queries - Serialization: Reuses encode/decode_checkpoint_value for full Python object fidelity (hybrid JSON + pickle approach) - Container auto-creation via create_container_if_not_exists Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds a Cosmos DB (NoSQL) checkpoint storage backend to the Python agent-framework-azure-cosmos package to enable durable workflow pause/resume (feature-parity with the .NET Cosmos checkpoint store).
Changes:
- Introduces
CosmosCheckpointStorageimplementing workflow checkpoint persistence in Cosmos DB (auto-creates DB/container, partitions byworkflow_name). - Adds unit + integration tests covering the checkpoint storage behavior.
- Adds runnable samples + README updates showing Cosmos-backed workflow checkpointing (standalone and Azure AI Foundry).
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| python/packages/azure-cosmos/agent_framework_azure_cosmos/_checkpoint_storage.py | Implements Cosmos-backed checkpoint storage (save/load/list/delete/latest/ids). |
| python/packages/azure-cosmos/agent_framework_azure_cosmos/init.py | Exposes CosmosCheckpointStorage from the package. |
| python/packages/azure-cosmos/tests/test_cosmos_checkpoint_storage.py | Adds unit tests and an integration round-trip test for the new storage. |
| python/packages/azure-cosmos/samples/cosmos_workflow_checkpointing.py | Standalone workflow sample using Cosmos-backed checkpointing. |
| python/packages/azure-cosmos/samples/cosmos_workflow_checkpointing_foundry.py | Foundry multi-agent workflow sample using Cosmos checkpoint storage. |
| python/packages/azure-cosmos/samples/README.md | Documents the new samples. |
| python/packages/azure-cosmos/README.md | Documents CosmosCheckpointStorage usage and configuration. |
| python/packages/azure-cosmos/pyproject.toml | Extends the integration test task to include the new integration test. |
python/packages/azure-cosmos/agent_framework_azure_cosmos/_checkpoint_storage.py
Show resolved
Hide resolved
python/packages/azure-cosmos/agent_framework_azure_cosmos/_checkpoint_storage.py
Outdated
Show resolved
Hide resolved
python/packages/azure-cosmos/samples/cosmos_workflow_checkpointing_foundry.py
Outdated
Show resolved
Hide resolved
python/packages/azure-cosmos/samples/cosmos_workflow_checkpointing.py
Outdated
Show resolved
Hide resolved
Python Test Coverage Report •
Python Unit Test Overview
|
||||||||||||||||||||||||||||||
|
@markwallace-microsoft , please review the above PR. |
moonbox3
left a comment
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 91%
✗ Correctness
The core CosmosCheckpointStorage implementation is well-structured and correctly implements the CheckpointStorage protocol. The unit tests are thorough and cover the key scenarios. However, there is one blocking correctness bug in the cosmos_checkpoint_foundry.py sample: the async with CosmosCheckpointStorage(...) block exits (closing the Cosmos client) before the storage is actually used for workflow runs and checkpoint queries. All subsequent calls to checkpoint_storage will operate on a closed client. Additionally, there is a minor inconsistency in the test helper _checkpoint_to_cosmos_document which uses a plain checkpoint_id as the document id rather than the composite workflow_name_checkpoint_id, but this doesn't affect test correctness since _document_to_checkpoint strips the id field.
✗ Security Reliability
The new
CosmosCheckpointStorageimplementation is well-structured and follows the same patterns as the existingCosmosHistoryProvider. All Cosmos DB queries use parameterized values (preventing injection), client lifecycle management is correct, and the pickle security risk is pre-existing and documented. However, thecosmos_checkpoint_foundry.pysample has a use-after-close bug wherecheckpoint_storageis used after theasync with CosmosCheckpointStorage(...)block has exited and closed the underlying Cosmos client, which will cause runtime failures for users who try the sample.
✓ Test Coverage
The test file provides solid coverage of all six CheckpointStorage protocol methods (save, load, list_checkpoints, delete, get_latest, list_checkpoint_ids), initialization modes, context manager lifecycle, and a save/load round-trip. There are a few notable gaps: (1)
list_checkpointssilently swallows decode failures with alogger.warningbut there's no test verifying that a malformed document is skipped rather than raising; (2)_ensure_container_proxyraisesRuntimeErrorwhen_cosmos_client is Nonebut this path has no unit test; (3) the__aexit__behavior whenclose()raises and there's no original exception is untested (it should re-raise). These are minor gaps in an otherwise thorough test suite.
✗ Design Approach
The implementation of
CosmosCheckpointStorageis well-structured and correctly follows theCheckpointStorageprotocol. The core implementation file is clean, the test coverage is thorough, and the authentication pattern mirrorsCosmosHistoryProviderappropriately. One blocking structural bug exists in the Foundry sample: theasync with CosmosCheckpointStorage(...)block closes the storage before the workflow/agent code and all subsequentcheckpoint_storagereferences execute, meaning users following this sample will hit errors at runtime. No other fundamental design issues exist — the cross-partition query inload()is an unavoidable consequence of the protocol signature (checkpoint_id without workflow_name), and the lazy container initialization without a lock is benign since both creation calls are idempotent.
Automated review by moonbox3's agents
| credential=cosmos_credential, | ||
| database_name=cosmos_database_name, | ||
| container_name=cosmos_container_name, | ||
| ) as checkpoint_storage: | ||
| # Create Azure AI Foundry agents | ||
| client = AzureOpenAIResponsesClient( | ||
| project_endpoint=project_endpoint, | ||
| deployment_name=deployment_name, | ||
| credential=azure_credential, | ||
| ) | ||
|
|
||
| assistant = client.as_agent( | ||
| name="assistant", | ||
| instructions="You are a helpful assistant. Keep responses brief.", | ||
| ) | ||
|
|
||
| reviewer = client.as_agent( | ||
| name="reviewer", | ||
| instructions="You are a reviewer. Provide a one-sentence summary of the assistant's response.", | ||
| ) | ||
|
|
||
| # Build a sequential workflow and wrap it as an agent | ||
| workflow = SequentialBuilder(participants=[assistant, reviewer]).build() | ||
| agent = workflow.as_agent(name="FoundryCheckpointedAgent") | ||
|
|
||
| # --- First run: execute with Cosmos DB checkpointing --- | ||
| print("=== First Run ===\n") | ||
|
|
||
| session = agent.create_session() | ||
| query = "What are the benefits of renewable energy?" | ||
| print(f"User: {query}") | ||
|
|
||
| response = await agent.run(query, session=session, checkpoint_storage=checkpoint_storage) | ||
|
|
||
| for msg in response.messages: | ||
| speaker = msg.author_name or msg.role | ||
| print(f"[{speaker}]: {msg.text}") | ||
|
|
||
| # Show checkpoints persisted in Cosmos DB | ||
| checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name) | ||
| print(f"\nCheckpoints in Cosmos DB: {len(checkpoints)}") | ||
| for i, cp in enumerate(checkpoints[:5], 1): | ||
| print(f" {i}. {cp.checkpoint_id} (iteration={cp.iteration_count})") | ||
|
|
||
| # --- Second run: continue conversation with checkpoint history --- | ||
| print("\n=== Second Run (continuing conversation) ===\n") | ||
|
|
||
| query2 = "Can you elaborate on the economic benefits?" | ||
| print(f"User: {query2}") | ||
|
|
||
| response2 = await agent.run(query2, session=session, checkpoint_storage=checkpoint_storage) | ||
|
|
||
| for msg in response2.messages: | ||
| speaker = msg.author_name or msg.role | ||
| print(f"[{speaker}]: {msg.text}") | ||
|
|
||
| # Show total checkpoints | ||
| all_checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name) | ||
| print(f"\nTotal checkpoints after two runs: {len(all_checkpoints)}") | ||
|
|
||
| # Get latest checkpoint | ||
| latest = await checkpoint_storage.get_latest(workflow_name=workflow.name) | ||
| if latest: | ||
| print(f"Latest checkpoint: {latest.checkpoint_id}") | ||
| print(f" iteration_count: {latest.iteration_count}") | ||
| print(f" timestamp: {latest.timestamp}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
There was a problem hiding this comment.
Use-after-close bug: The async with CosmosCheckpointStorage(...) block ends after creating agents, but all subsequent code — workflow build, agent.run(), list_checkpoints(), get_latest() — uses checkpoint_storage after close() has already been called on the underlying Cosmos client. Move all workflow/agent logic inside the async with CosmosCheckpointStorage(...) block so the client remains open for the entire usage scope.
| credential=cosmos_credential, | |
| database_name=cosmos_database_name, | |
| container_name=cosmos_container_name, | |
| ) as checkpoint_storage: | |
| # Create Azure AI Foundry agents | |
| client = AzureOpenAIResponsesClient( | |
| project_endpoint=project_endpoint, | |
| deployment_name=deployment_name, | |
| credential=azure_credential, | |
| ) | |
| assistant = client.as_agent( | |
| name="assistant", | |
| instructions="You are a helpful assistant. Keep responses brief.", | |
| ) | |
| reviewer = client.as_agent( | |
| name="reviewer", | |
| instructions="You are a reviewer. Provide a one-sentence summary of the assistant's response.", | |
| ) | |
| # Build a sequential workflow and wrap it as an agent | |
| workflow = SequentialBuilder(participants=[assistant, reviewer]).build() | |
| agent = workflow.as_agent(name="FoundryCheckpointedAgent") | |
| # --- First run: execute with Cosmos DB checkpointing --- | |
| print("=== First Run ===\n") | |
| session = agent.create_session() | |
| query = "What are the benefits of renewable energy?" | |
| print(f"User: {query}") | |
| response = await agent.run(query, session=session, checkpoint_storage=checkpoint_storage) | |
| for msg in response.messages: | |
| speaker = msg.author_name or msg.role | |
| print(f"[{speaker}]: {msg.text}") | |
| # Show checkpoints persisted in Cosmos DB | |
| checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name) | |
| print(f"\nCheckpoints in Cosmos DB: {len(checkpoints)}") | |
| for i, cp in enumerate(checkpoints[:5], 1): | |
| print(f" {i}. {cp.checkpoint_id} (iteration={cp.iteration_count})") | |
| # --- Second run: continue conversation with checkpoint history --- | |
| print("\n=== Second Run (continuing conversation) ===\n") | |
| query2 = "Can you elaborate on the economic benefits?" | |
| print(f"User: {query2}") | |
| response2 = await agent.run(query2, session=session, checkpoint_storage=checkpoint_storage) | |
| for msg in response2.messages: | |
| speaker = msg.author_name or msg.role | |
| print(f"[{speaker}]: {msg.text}") | |
| # Show total checkpoints | |
| all_checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name) | |
| print(f"\nTotal checkpoints after two runs: {len(all_checkpoints)}") | |
| # Get latest checkpoint | |
| latest = await checkpoint_storage.get_latest(workflow_name=workflow.name) | |
| if latest: | |
| print(f"Latest checkpoint: {latest.checkpoint_id}") | |
| print(f" iteration_count: {latest.iteration_count}") | |
| print(f" timestamp: {latest.timestamp}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
| async with CosmosCheckpointStorage( | |
| endpoint=cosmos_endpoint, | |
| credential=cosmos_credential, | |
| database_name=cosmos_database_name, | |
| container_name=cosmos_container_name, | |
| ) as checkpoint_storage: | |
| # Create Azure AI Foundry agents | |
| client = AzureOpenAIResponsesClient( | |
| project_endpoint=project_endpoint, | |
| deployment_name=deployment_name, | |
| credential=azure_credential, | |
| ) | |
| assistant = client.as_agent( | |
| name="assistant", | |
| instructions="You are a helpful assistant. Keep responses brief.", | |
| ) | |
| reviewer = client.as_agent( | |
| name="reviewer", | |
| instructions="You are a reviewer. Provide a one-sentence summary of the assistant's response.", | |
| ) | |
| # Build a sequential workflow and wrap it as an agent | |
| workflow = SequentialBuilder(participants=[assistant, reviewer]).build() | |
| agent = workflow.as_agent(name="FoundryCheckpointedAgent") | |
| # --- First run: execute with Cosmos DB checkpointing --- | |
| print("=== First Run ===\n") | |
| session = agent.create_session() | |
| query = "What are the benefits of renewable energy?" | |
| print(f"User: {query}") | |
| response = await agent.run(query, session=session, checkpoint_storage=checkpoint_storage) | |
| for msg in response.messages: | |
| speaker = msg.author_name or msg.role | |
| print(f"[{speaker}]: {msg.text}") | |
| # Show checkpoints persisted in Cosmos DB | |
| checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name) | |
| print(f"\nCheckpoints in Cosmos DB: {len(checkpoints)}") | |
| for i, cp in enumerate(checkpoints[:5], 1): | |
| print(f" {i}. {cp.checkpoint_id} (iteration={cp.iteration_count})") | |
| # --- Second run: continue conversation with checkpoint history --- | |
| print("\n=== Second Run (continuing conversation) ===\n") | |
| query2 = "Can you elaborate on the economic benefits?" | |
| print(f"User: {query2}") | |
| response2 = await agent.run(query2, session=session, checkpoint_storage=checkpoint_storage) | |
| for msg in response2.messages: | |
| speaker = msg.author_name or msg.role | |
| print(f"[{speaker}]: {msg.text}") | |
| # Show total checkpoints | |
| all_checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name) | |
| print(f"\nTotal checkpoints after two runs: {len(all_checkpoints)}") | |
| # Get latest checkpoint | |
| latest = await checkpoint_storage.get_latest(workflow_name=workflow.name) | |
| if latest: | |
| print(f"Latest checkpoint: {latest.checkpoint_id}") | |
| print(f" iteration_count: {latest.iteration_count}") | |
| print(f" timestamp: {latest.timestamp}") |
| # Cosmos system properties | ||
| "_rid": "abc", | ||
| "_self": "dbs/abc/colls/def/docs/ghi", | ||
| "_etag": '"00000000-0000-0000-0000-000000000000"', |
There was a problem hiding this comment.
Minor: This uses checkpoint.checkpoint_id as the document id, but save() creates a composite ID via _make_document_id() (f"{workflow_name}_{checkpoint_id}"). This doesn't break tests since _document_to_checkpoint strips the id field, but using the composite format would make the fixture more realistic.
| "_etag": '"00000000-0000-0000-0000-000000000000"', | |
| doc: dict[str, Any] = { | |
| "id": f"{checkpoint.workflow_name}_{checkpoint.checkpoint_id}", |
| result = await storage.delete("cp-del") | ||
|
|
||
| assert result is True | ||
| mock_container.delete_item.assert_awaited_once_with( |
There was a problem hiding this comment.
Missing test: list_checkpoints silently skips documents that fail to decode (catches Exception and logs a warning). Add a test with a malformed document mixed with a valid one to verify the bad document is skipped and the valid one is returned.
| ]) | ||
|
|
||
| storage = CosmosCheckpointStorage(container_client=mock_container) | ||
| ids = await storage.list_checkpoint_ids(workflow_name="test-workflow") |
There was a problem hiding this comment.
Missing test: The __aexit__ branch where close() raises and there is no original exception (so it re-raises) is untested. The existing test_context_manager_preserves_original_exception only covers the case where an inner exception suppresses the close error.
Motivation and Context
The .NET implementation of the Agent Framework already ships a native CosmosCheckpointStore for workflow checkpointing, but the Python side only supports in-memory and file-based storage. Cosmos DB customers building agents on Azure AI Foundry have been asking for native Cosmos DB checkpoint support so they can durably pause and resume workflows across process restarts without writing custom storage adapters.
This PR adds CosmosCheckpointStorage to the existing agent-framework-azure-cosmos Python package, achieving feature parity with .NET and enabling Cosmos DB customers to use workflow checkpointing out-of-the-box.
Description
Core implementation (_checkpoint_storage.py):
Tests (test_cosmos_checkpoint_storage.py):
Samples (reorganized into history_provider/, checkpoint_storage/, and combined):
Contribution Checklist