Skip to content

Python: Add Cosmos DB NoSQL Checkpoint Storage for Python Workflows#4916

Open
aayush3011 wants to merge 8 commits intomicrosoft:mainfrom
aayush3011:cosmos_checkpointer
Open

Python: Add Cosmos DB NoSQL Checkpoint Storage for Python Workflows#4916
aayush3011 wants to merge 8 commits intomicrosoft:mainfrom
aayush3011:cosmos_checkpointer

Conversation

@aayush3011
Copy link

@aayush3011 aayush3011 commented Mar 25, 2026

Motivation and Context

The .NET implementation of the Agent Framework already ships a native CosmosCheckpointStore for workflow checkpointing, but the Python side only supports in-memory and file-based storage. Cosmos DB customers building agents on Azure AI Foundry have been asking for native Cosmos DB checkpoint support so they can durably pause and resume workflows across process restarts without writing custom storage adapters.

This PR adds CosmosCheckpointStorage to the existing agent-framework-azure-cosmos Python package, achieving feature parity with .NET and enabling Cosmos DB customers to use workflow checkpointing out-of-the-box.

Description

Core implementation (_checkpoint_storage.py):

  • CosmosCheckpointStorage implementing the CheckpointStorage protocol with all 6 methods (save, load, list_checkpoints, delete, get_latest, list_checkpoint_ids)
  • Authentication: supports both managed identity / RBAC (DefaultAzureCredential, ManagedIdentityCredential) and key-based auth, following the same pattern as CosmosHistoryProvider - Auto-creation of database and container on first use via create_database_if_not_exists
  • Partition key /workflow_name for efficient per-workflow queries
  • Reuses existing encode/decode_checkpoint_value for full Python object serialization fidelity

Tests (test_cosmos_checkpoint_storage.py):

  • 26 unit tests covering all protocol methods, auth modes, error handling, and save/load round-trip
  • Integration test validated against live Cosmos DB

Samples (reorganized into history_provider/, checkpoint_storage/, and combined):

  • history_provider/cosmos_history_basic.py — basic multi-turn conversation
  • history_provider/cosmos_history_conversation_persistence.py — persist/resume conversations across restarts
  • history_provider/cosmos_history_messages.py — message CRUD operations
  • history_provider/cosmos_history_sessions.py — multi-session/tenant management
  • checkpoint_storage/cosmos_checkpoint_workflow.py — standalone workflow checkpoint/resume
  • checkpoint_storage/cosmos_checkpoint_foundry.py — Azure AI Foundry agents + checkpointing
  • cosmos_e2e_foundry.py — both history + checkpointing in one Foundry app

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

Aayush Kataria and others added 2 commits March 25, 2026 14:13
Add native Cosmos DB NoSQL support for workflow checkpoint storage in the
Python agent-framework-azure-cosmos package, achieving parity with the
existing .NET CosmosCheckpointStore.

New files:
- _checkpoint_storage.py: CosmosCheckpointStorage implementing the
  CheckpointStorage protocol with 6 methods (save, load, list_checkpoints,
  delete, get_latest, list_checkpoint_ids)
- test_cosmos_checkpoint_storage.py: Unit and integration tests
- workflow_checkpointing.py: Sample demonstrating Cosmos DB-backed
  workflow checkpoint/resume

Auth support:
- Managed identity / RBAC via Azure credential objects
  (DefaultAzureCredential, ManagedIdentityCredential, etc.)
- Key-based auth via account key string or AZURE_COSMOS_KEY env var
- Pre-created CosmosClient or ContainerProxy

Key design decisions:
- Partition key: /workflow_name for efficient per-workflow queries
- Serialization: Reuses encode/decode_checkpoint_value for full Python
  object fidelity (hybrid JSON + pickle approach)
- Container auto-creation via create_container_if_not_exists

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings March 25, 2026 22:16
@markwallace-microsoft markwallace-microsoft added documentation Improvements or additions to documentation python labels Mar 25, 2026
@github-actions github-actions bot changed the title Add Cosmos DB NoSQL Checkpoint Storage for Python Workflows Python: Add Cosmos DB NoSQL Checkpoint Storage for Python Workflows Mar 25, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a Cosmos DB (NoSQL) checkpoint storage backend to the Python agent-framework-azure-cosmos package to enable durable workflow pause/resume (feature-parity with the .NET Cosmos checkpoint store).

Changes:

  • Introduces CosmosCheckpointStorage implementing workflow checkpoint persistence in Cosmos DB (auto-creates DB/container, partitions by workflow_name).
  • Adds unit + integration tests covering the checkpoint storage behavior.
  • Adds runnable samples + README updates showing Cosmos-backed workflow checkpointing (standalone and Azure AI Foundry).

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
python/packages/azure-cosmos/agent_framework_azure_cosmos/_checkpoint_storage.py Implements Cosmos-backed checkpoint storage (save/load/list/delete/latest/ids).
python/packages/azure-cosmos/agent_framework_azure_cosmos/init.py Exposes CosmosCheckpointStorage from the package.
python/packages/azure-cosmos/tests/test_cosmos_checkpoint_storage.py Adds unit tests and an integration round-trip test for the new storage.
python/packages/azure-cosmos/samples/cosmos_workflow_checkpointing.py Standalone workflow sample using Cosmos-backed checkpointing.
python/packages/azure-cosmos/samples/cosmos_workflow_checkpointing_foundry.py Foundry multi-agent workflow sample using Cosmos checkpoint storage.
python/packages/azure-cosmos/samples/README.md Documents the new samples.
python/packages/azure-cosmos/README.md Documents CosmosCheckpointStorage usage and configuration.
python/packages/azure-cosmos/pyproject.toml Extends the integration test task to include the new integration test.

@markwallace-microsoft
Copy link
Member

markwallace-microsoft commented Mar 26, 2026

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
packages/azure-cosmos/agent_framework_azure_cosmos
   _checkpoint_storage.py135497%263–264, 389, 396
TOTAL28147341687% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
5481 20 💤 0 ❌ 0 🔥 1m 25s ⏱️

@aayush3011
Copy link
Author

@markwallace-microsoft , please review the above PR.

Copy link
Contributor

@moonbox3 moonbox3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review

Reviewers: 4 | Confidence: 91%

✗ Correctness

The core CosmosCheckpointStorage implementation is well-structured and correctly implements the CheckpointStorage protocol. The unit tests are thorough and cover the key scenarios. However, there is one blocking correctness bug in the cosmos_checkpoint_foundry.py sample: the async with CosmosCheckpointStorage(...) block exits (closing the Cosmos client) before the storage is actually used for workflow runs and checkpoint queries. All subsequent calls to checkpoint_storage will operate on a closed client. Additionally, there is a minor inconsistency in the test helper _checkpoint_to_cosmos_document which uses a plain checkpoint_id as the document id rather than the composite workflow_name_checkpoint_id, but this doesn't affect test correctness since _document_to_checkpoint strips the id field.

✗ Security Reliability

The new CosmosCheckpointStorage implementation is well-structured and follows the same patterns as the existing CosmosHistoryProvider. All Cosmos DB queries use parameterized values (preventing injection), client lifecycle management is correct, and the pickle security risk is pre-existing and documented. However, the cosmos_checkpoint_foundry.py sample has a use-after-close bug where checkpoint_storage is used after the async with CosmosCheckpointStorage(...) block has exited and closed the underlying Cosmos client, which will cause runtime failures for users who try the sample.

✓ Test Coverage

The test file provides solid coverage of all six CheckpointStorage protocol methods (save, load, list_checkpoints, delete, get_latest, list_checkpoint_ids), initialization modes, context manager lifecycle, and a save/load round-trip. There are a few notable gaps: (1) list_checkpoints silently swallows decode failures with a logger.warning but there's no test verifying that a malformed document is skipped rather than raising; (2) _ensure_container_proxy raises RuntimeError when _cosmos_client is None but this path has no unit test; (3) the __aexit__ behavior when close() raises and there's no original exception is untested (it should re-raise). These are minor gaps in an otherwise thorough test suite.

✗ Design Approach

The implementation of CosmosCheckpointStorage is well-structured and correctly follows the CheckpointStorage protocol. The core implementation file is clean, the test coverage is thorough, and the authentication pattern mirrors CosmosHistoryProvider appropriately. One blocking structural bug exists in the Foundry sample: the async with CosmosCheckpointStorage(...) block closes the storage before the workflow/agent code and all subsequent checkpoint_storage references execute, meaning users following this sample will hit errors at runtime. No other fundamental design issues exist — the cross-partition query in load() is an unavoidable consequence of the protocol signature (checkpoint_id without workflow_name), and the lazy container initialization without a lock is benign since both creation calls are idempotent.


Automated review by moonbox3's agents

Comment on lines +73 to +142
credential=cosmos_credential,
database_name=cosmos_database_name,
container_name=cosmos_container_name,
) as checkpoint_storage:
# Create Azure AI Foundry agents
client = AzureOpenAIResponsesClient(
project_endpoint=project_endpoint,
deployment_name=deployment_name,
credential=azure_credential,
)

assistant = client.as_agent(
name="assistant",
instructions="You are a helpful assistant. Keep responses brief.",
)

reviewer = client.as_agent(
name="reviewer",
instructions="You are a reviewer. Provide a one-sentence summary of the assistant's response.",
)

# Build a sequential workflow and wrap it as an agent
workflow = SequentialBuilder(participants=[assistant, reviewer]).build()
agent = workflow.as_agent(name="FoundryCheckpointedAgent")

# --- First run: execute with Cosmos DB checkpointing ---
print("=== First Run ===\n")

session = agent.create_session()
query = "What are the benefits of renewable energy?"
print(f"User: {query}")

response = await agent.run(query, session=session, checkpoint_storage=checkpoint_storage)

for msg in response.messages:
speaker = msg.author_name or msg.role
print(f"[{speaker}]: {msg.text}")

# Show checkpoints persisted in Cosmos DB
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
print(f"\nCheckpoints in Cosmos DB: {len(checkpoints)}")
for i, cp in enumerate(checkpoints[:5], 1):
print(f" {i}. {cp.checkpoint_id} (iteration={cp.iteration_count})")

# --- Second run: continue conversation with checkpoint history ---
print("\n=== Second Run (continuing conversation) ===\n")

query2 = "Can you elaborate on the economic benefits?"
print(f"User: {query2}")

response2 = await agent.run(query2, session=session, checkpoint_storage=checkpoint_storage)

for msg in response2.messages:
speaker = msg.author_name or msg.role
print(f"[{speaker}]: {msg.text}")

# Show total checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
print(f"\nTotal checkpoints after two runs: {len(all_checkpoints)}")

# Get latest checkpoint
latest = await checkpoint_storage.get_latest(workflow_name=workflow.name)
if latest:
print(f"Latest checkpoint: {latest.checkpoint_id}")
print(f" iteration_count: {latest.iteration_count}")
print(f" timestamp: {latest.timestamp}")


if __name__ == "__main__":
asyncio.run(main())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use-after-close bug: The async with CosmosCheckpointStorage(...) block ends after creating agents, but all subsequent code — workflow build, agent.run(), list_checkpoints(), get_latest() — uses checkpoint_storage after close() has already been called on the underlying Cosmos client. Move all workflow/agent logic inside the async with CosmosCheckpointStorage(...) block so the client remains open for the entire usage scope.

Suggested change
credential=cosmos_credential,
database_name=cosmos_database_name,
container_name=cosmos_container_name,
) as checkpoint_storage:
# Create Azure AI Foundry agents
client = AzureOpenAIResponsesClient(
project_endpoint=project_endpoint,
deployment_name=deployment_name,
credential=azure_credential,
)
assistant = client.as_agent(
name="assistant",
instructions="You are a helpful assistant. Keep responses brief.",
)
reviewer = client.as_agent(
name="reviewer",
instructions="You are a reviewer. Provide a one-sentence summary of the assistant's response.",
)
# Build a sequential workflow and wrap it as an agent
workflow = SequentialBuilder(participants=[assistant, reviewer]).build()
agent = workflow.as_agent(name="FoundryCheckpointedAgent")
# --- First run: execute with Cosmos DB checkpointing ---
print("=== First Run ===\n")
session = agent.create_session()
query = "What are the benefits of renewable energy?"
print(f"User: {query}")
response = await agent.run(query, session=session, checkpoint_storage=checkpoint_storage)
for msg in response.messages:
speaker = msg.author_name or msg.role
print(f"[{speaker}]: {msg.text}")
# Show checkpoints persisted in Cosmos DB
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
print(f"\nCheckpoints in Cosmos DB: {len(checkpoints)}")
for i, cp in enumerate(checkpoints[:5], 1):
print(f" {i}. {cp.checkpoint_id} (iteration={cp.iteration_count})")
# --- Second run: continue conversation with checkpoint history ---
print("\n=== Second Run (continuing conversation) ===\n")
query2 = "Can you elaborate on the economic benefits?"
print(f"User: {query2}")
response2 = await agent.run(query2, session=session, checkpoint_storage=checkpoint_storage)
for msg in response2.messages:
speaker = msg.author_name or msg.role
print(f"[{speaker}]: {msg.text}")
# Show total checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
print(f"\nTotal checkpoints after two runs: {len(all_checkpoints)}")
# Get latest checkpoint
latest = await checkpoint_storage.get_latest(workflow_name=workflow.name)
if latest:
print(f"Latest checkpoint: {latest.checkpoint_id}")
print(f" iteration_count: {latest.iteration_count}")
print(f" timestamp: {latest.timestamp}")
if __name__ == "__main__":
asyncio.run(main())
async with CosmosCheckpointStorage(
endpoint=cosmos_endpoint,
credential=cosmos_credential,
database_name=cosmos_database_name,
container_name=cosmos_container_name,
) as checkpoint_storage:
# Create Azure AI Foundry agents
client = AzureOpenAIResponsesClient(
project_endpoint=project_endpoint,
deployment_name=deployment_name,
credential=azure_credential,
)
assistant = client.as_agent(
name="assistant",
instructions="You are a helpful assistant. Keep responses brief.",
)
reviewer = client.as_agent(
name="reviewer",
instructions="You are a reviewer. Provide a one-sentence summary of the assistant's response.",
)
# Build a sequential workflow and wrap it as an agent
workflow = SequentialBuilder(participants=[assistant, reviewer]).build()
agent = workflow.as_agent(name="FoundryCheckpointedAgent")
# --- First run: execute with Cosmos DB checkpointing ---
print("=== First Run ===\n")
session = agent.create_session()
query = "What are the benefits of renewable energy?"
print(f"User: {query}")
response = await agent.run(query, session=session, checkpoint_storage=checkpoint_storage)
for msg in response.messages:
speaker = msg.author_name or msg.role
print(f"[{speaker}]: {msg.text}")
# Show checkpoints persisted in Cosmos DB
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
print(f"\nCheckpoints in Cosmos DB: {len(checkpoints)}")
for i, cp in enumerate(checkpoints[:5], 1):
print(f" {i}. {cp.checkpoint_id} (iteration={cp.iteration_count})")
# --- Second run: continue conversation with checkpoint history ---
print("\n=== Second Run (continuing conversation) ===\n")
query2 = "Can you elaborate on the economic benefits?"
print(f"User: {query2}")
response2 = await agent.run(query2, session=session, checkpoint_storage=checkpoint_storage)
for msg in response2.messages:
speaker = msg.author_name or msg.role
print(f"[{speaker}]: {msg.text}")
# Show total checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
print(f"\nTotal checkpoints after two runs: {len(all_checkpoints)}")
# Get latest checkpoint
latest = await checkpoint_storage.get_latest(workflow_name=workflow.name)
if latest:
print(f"Latest checkpoint: {latest.checkpoint_id}")
print(f" iteration_count: {latest.iteration_count}")
print(f" timestamp: {latest.timestamp}")

# Cosmos system properties
"_rid": "abc",
"_self": "dbs/abc/colls/def/docs/ghi",
"_etag": '"00000000-0000-0000-0000-000000000000"',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: This uses checkpoint.checkpoint_id as the document id, but save() creates a composite ID via _make_document_id() (f"{workflow_name}_{checkpoint_id}"). This doesn't break tests since _document_to_checkpoint strips the id field, but using the composite format would make the fixture more realistic.

Suggested change
"_etag": '"00000000-0000-0000-0000-000000000000"',
doc: dict[str, Any] = {
"id": f"{checkpoint.workflow_name}_{checkpoint.checkpoint_id}",

result = await storage.delete("cp-del")

assert result is True
mock_container.delete_item.assert_awaited_once_with(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test: list_checkpoints silently skips documents that fail to decode (catches Exception and logs a warning). Add a test with a malformed document mixed with a valid one to verify the bad document is skipped and the valid one is returned.

])

storage = CosmosCheckpointStorage(container_client=mock_container)
ids = await storage.list_checkpoint_ids(workflow_name="test-workflow")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test: The __aexit__ branch where close() raises and there is no original exception (so it re-raises) is untested. The existing test_context_manager_preserves_original_exception only covers the case where an inner exception suppresses the close error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants