Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions alphatrion/storage/sqlstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,25 +779,41 @@ def delete_experiments(self, experiment_ids: list[uuid.UUID]) -> int:
Also deletes all associated runs.
Returns the number of experiments successfully deleted.
Caller must ensure user has permission to delete these experiments.

Status transitions on deletion:
- PENDING experiments -> ABORTED
- RUNNING experiments -> CANCELLED
- Other statuses remain unchanged
"""
from sqlalchemy import case

session = self._session()
# Delete the experiments
# if experiment is running, skip deletion for that experiment
filtered_exps = (
session.query(Experiment.uuid)
.filter(
Experiment.uuid.in_(experiment_ids),
Experiment.is_del == 0,
Experiment.status != Status.RUNNING,
)
.all()
)
filtered_exp_ids = [exp_id for (exp_id,) in filtered_exps] # unpack tuples

# Update status based on current state: PENDING -> ABORTED, RUNNING -> CANCELLED
deleted_count = (
session.query(Experiment)
.filter(Experiment.uuid.in_(filtered_exp_ids))
.update({Experiment.is_del: 1}, synchronize_session=False)
.update(
{
Experiment.is_del: 1,
Experiment.status: case(
(Experiment.status == Status.PENDING, Status.ABORTED),
(Experiment.status == Status.RUNNING, Status.CANCELLED),
else_=Experiment.status,
),
},
synchronize_session=False,
)
)
# Delete all runs associated with these experiments
session.query(Run).filter(Run.experiment_id.in_(filtered_exp_ids)).update(
Expand Down
127 changes: 85 additions & 42 deletions tests/integration/server/test_graphql_mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,10 @@ def test_delete_running_experiment_fails(
assert exp.status == Status.RUNNING


def test_delete_experiments_skips_running(
def test_delete_experiments_with_running_and_pending(
execute_graphql, test_org_id, test_user_id, test_team_id
):
"""Test that batch delete skips running experiments"""
"""Test that batch delete handles running (CANCELLED) and pending (ABORTED) experiments"""
runtime.init()
metadb = runtime.storage_runtime().metadb

Expand All @@ -961,8 +961,6 @@ def test_delete_experiments_skips_running(
name="Completed Experiment",
)
metadb.update_experiment(
org_id=test_org_id,
team_id=test_team_id,
experiment_id=exp_id_1,
status=Status.COMPLETED,
)
Expand All @@ -974,8 +972,6 @@ def test_delete_experiments_skips_running(
name="Running Experiment",
)
metadb.update_experiment(
org_id=test_org_id,
team_id=test_team_id,
experiment_id=exp_id_2,
status=Status.RUNNING,
)
Expand All @@ -984,12 +980,21 @@ def test_delete_experiments_skips_running(
org_id=test_org_id,
team_id=test_team_id,
user_id=test_user_id,
name="Failed Experiment",
name="Pending Experiment",
)
metadb.update_experiment(
experiment_id=exp_id_3,
status=Status.PENDING,
)

exp_id_4 = metadb.create_experiment(
org_id=test_org_id,
team_id=test_team_id,
experiment_id=exp_id_3,
user_id=test_user_id,
name="Failed Experiment",
)
metadb.update_experiment(
experiment_id=exp_id_4,
status=Status.FAILED,
)

Expand All @@ -1012,16 +1017,23 @@ def test_delete_experiments_skips_running(
user_id=test_user_id,
experiment_id=exp_id_3,
)
run_id_4 = metadb.create_run(
org_id=test_org_id,
team_id=test_team_id,
user_id=test_user_id,
experiment_id=exp_id_4,
)

# Verify all experiments exist
assert metadb.get_experiment(experiment_id=exp_id_1) is not None
assert metadb.get_experiment(experiment_id=exp_id_2) is not None
assert metadb.get_experiment(experiment_id=exp_id_3) is not None
assert metadb.get_experiment(experiment_id=exp_id_4) is not None

# Try to batch delete all experiments
# Batch delete all experiments
mutation = f"""
mutation {{
deleteExperiments(experimentIds: ["{exp_id_1}", "{exp_id_2}", "{exp_id_3}"])
deleteExperiments(experimentIds: ["{exp_id_1}", "{exp_id_2}", "{exp_id_3}", "{exp_id_4}"])
}}
"""
response = execute_graphql(
Expand All @@ -1030,44 +1042,55 @@ def test_delete_experiments_skips_running(
user_id=test_user_id,
)
assert response.errors is None
# Should only delete 2 experiments (skipped the running one)
assert response.data["deleteExperiments"] == 2

# Verify running experiment still exists
exp_2 = metadb.get_experiment(experiment_id=exp_id_2)
assert exp_2 is not None
assert exp_2.status == Status.RUNNING
# Should delete all 4 experiments
assert response.data["deleteExperiments"] == 4

# Verify non-running experiments are deleted
# Verify all experiments are deleted (get_experiment filters out deleted ones)
assert metadb.get_experiment(experiment_id=exp_id_1) is None
assert metadb.get_experiment(experiment_id=exp_id_2) is None
assert metadb.get_experiment(experiment_id=exp_id_3) is None

# Verify runs of deleted experiments are also deleted
assert metadb.get_experiment(experiment_id=exp_id_4) is None

# Verify status changes by querying directly from database
from alphatrion.storage.sql_models import Experiment

with metadb._session() as session:
exp_1 = session.query(Experiment).filter(Experiment.uuid == exp_id_1).first()
exp_2 = session.query(Experiment).filter(Experiment.uuid == exp_id_2).first()
exp_3 = session.query(Experiment).filter(Experiment.uuid == exp_id_3).first()
exp_4 = session.query(Experiment).filter(Experiment.uuid == exp_id_4).first()

# COMPLETED stays COMPLETED
assert exp_1.status == Status.COMPLETED
# RUNNING becomes CANCELLED
assert exp_2.status == Status.CANCELLED
# PENDING becomes ABORTED
assert exp_3.status == Status.ABORTED
# FAILED stays FAILED
assert exp_4.status == Status.FAILED

# Verify all runs are deleted
assert metadb.get_run(run_id=run_id_1) is None
assert metadb.get_run(run_id=run_id_2) is None
assert metadb.get_run(run_id=run_id_3) is None
assert metadb.get_run(run_id=run_id_4) is None

# Verify run of running experiment still exists
run_2 = metadb.get_run(run_id=run_id_2)
assert run_2 is not None


def test_delete_experiments_all_running(
def test_delete_experiments_all_running_and_pending(
execute_graphql, test_org_id, test_user_id, test_team_id
):
"""Test that batch delete returns 0 when all experiments are running"""
"""Test that batch delete handles all running and pending experiments correctly"""
runtime.init()
metadb = runtime.storage_runtime().metadb

# Create multiple running experiments
# Create running and pending experiments
exp_id_1 = metadb.create_experiment(
org_id=test_org_id,
team_id=test_team_id,
user_id=test_user_id,
name="Running Experiment 1",
)
metadb.update_experiment(
org_id=test_org_id,
team_id=test_team_id,
experiment_id=exp_id_1,
status=Status.RUNNING,
)
Expand All @@ -1079,16 +1102,25 @@ def test_delete_experiments_all_running(
name="Running Experiment 2",
)
metadb.update_experiment(
org_id=test_org_id,
team_id=test_team_id,
experiment_id=exp_id_2,
status=Status.RUNNING,
)

# Try to batch delete all running experiments
exp_id_3 = metadb.create_experiment(
org_id=test_org_id,
team_id=test_team_id,
user_id=test_user_id,
name="Pending Experiment 1",
)
metadb.update_experiment(
experiment_id=exp_id_3,
status=Status.PENDING,
)

# Batch delete all experiments
mutation = f"""
mutation {{
deleteExperiments(experimentIds: ["{exp_id_1}", "{exp_id_2}"])
deleteExperiments(experimentIds: ["{exp_id_1}", "{exp_id_2}", "{exp_id_3}"])
}}
"""
response = execute_graphql(
Expand All @@ -1097,16 +1129,27 @@ def test_delete_experiments_all_running(
user_id=test_user_id,
)
assert response.errors is None
# Should delete 0 experiments (all are running)
assert response.data["deleteExperiments"] == 0
# Should delete all 3 experiments
assert response.data["deleteExperiments"] == 3

# Verify all experiments are deleted
assert metadb.get_experiment(experiment_id=exp_id_1) is None
assert metadb.get_experiment(experiment_id=exp_id_2) is None
assert metadb.get_experiment(experiment_id=exp_id_3) is None

# Verify status changes by querying directly from database
from alphatrion.storage.sql_models import Experiment

with metadb._session() as session:
exp_1 = session.query(Experiment).filter(Experiment.uuid == exp_id_1).first()
exp_2 = session.query(Experiment).filter(Experiment.uuid == exp_id_2).first()
exp_3 = session.query(Experiment).filter(Experiment.uuid == exp_id_3).first()

# Verify all experiments still exist
exp_1 = metadb.get_experiment(experiment_id=exp_id_1)
exp_2 = metadb.get_experiment(experiment_id=exp_id_2)
assert exp_1 is not None
assert exp_2 is not None
assert exp_1.status == Status.RUNNING
assert exp_2.status == Status.RUNNING
# RUNNING experiments become CANCELLED
assert exp_1.status == Status.CANCELLED
assert exp_2.status == Status.CANCELLED
# PENDING experiment becomes ABORTED
assert exp_3.status == Status.ABORTED


def test_create_experiment_mutation(
Expand Down
12 changes: 9 additions & 3 deletions tests/integration/test_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,9 @@ async def versioned_workflow():
agent_spans = [s for s in spans if s[1] == "agent"]
tool_spans = [s for s in spans if s[1] == "tool"]

assert len(agent_spans) >= 1, f"Expected at least 1 agent span, got {len(agent_spans)}"
assert len(agent_spans) >= 1, (
f"Expected at least 1 agent span, got {len(agent_spans)}"
)
assert len(tool_spans) >= 1, f"Expected at least 1 tool span, got {len(tool_spans)}"

# Check that function names appear in span names
Expand All @@ -717,8 +719,12 @@ async def versioned_workflow():
agent_version = agent_spans[0][2]
tool_version = tool_spans[0][2]

assert agent_version == "3", f"Agent version not tracked, expected '3', got '{agent_version}'"
assert tool_version == "2", f"Tool version not tracked, expected '2', got '{tool_version}'"
assert agent_version == "3", (
f"Agent version not tracked, expected '3', got '{agent_version}'"
)
assert tool_version == "2", (
f"Tool version not tracked, expected '2', got '{tool_version}'"
)


@pytest.mark.asyncio
Expand Down
Loading
Loading