Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions treeherder/etl/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ def _remove_existing_jobs(data):
# - no backwards transitions (running -> pending, pending/running -> unscheduled)
# Allowed: unscheduled -> pending/running/completed, pending -> running/completed, running -> completed
if (
current_state == "completed"
or (job["state"] == "pending" and current_state == "running")
or (job["state"] == "unscheduled" and current_state in ("pending", "running"))
current_state == Job.JobState.COMPLETED
or (job["state"] == Job.JobState.PENDING and current_state == Job.JobState.RUNNING)
or (
job["state"] == Job.JobState.UNSCHEDULED
and current_state in (Job.JobState.PENDING, Job.JobState.RUNNING)
)
):
continue
new_data.append(datum)
Expand Down Expand Up @@ -513,5 +516,5 @@ def store_job_data(repository, original_data):
if superseded_job_guid_placeholders:
for job_guid, superseded_by_guid in superseded_job_guid_placeholders:
Job.objects.filter(guid=superseded_by_guid).update(
result="superseded", state="completed"
result="superseded", state=Job.JobState.COMPLETED
)
31 changes: 17 additions & 14 deletions treeherder/etl/taskcluster_pulse/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from treeherder.etl.schema import get_json_schema
from treeherder.etl.taskcluster_pulse.parse_route import parse_route
from treeherder.model.models import Job

env = environ.Env()
logger = logging.getLogger(__name__)
Expand All @@ -20,12 +21,12 @@

# Build a mapping from exchange name to task status
EXCHANGE_EVENT_MAP = {
"exchange/taskcluster-queue/v1/task-defined": "unscheduled",
"exchange/taskcluster-queue/v1/task-pending": "pending",
"exchange/taskcluster-queue/v1/task-running": "running",
"exchange/taskcluster-queue/v1/task-completed": "completed",
"exchange/taskcluster-queue/v1/task-failed": "failed",
"exchange/taskcluster-queue/v1/task-exception": "exception",
"exchange/taskcluster-queue/v1/task-defined": Job.JobState.UNSCHEDULED,
"exchange/taskcluster-queue/v1/task-pending": Job.JobState.PENDING,
"exchange/taskcluster-queue/v1/task-running": Job.JobState.RUNNING,
"exchange/taskcluster-queue/v1/task-completed": Job.JobState.COMPLETED,
"exchange/taskcluster-queue/v1/task-failed": Job.JobState.FAILED,
"exchange/taskcluster-queue/v1/task-exception": Job.JobState.EXCEPTION,
}


Expand All @@ -36,13 +37,15 @@ class PulseHandlerError(Exception):


def state_from_run(job_run):
return "completed" if job_run["state"] in ("exception", "failed") else job_run["state"]
return (
Job.JobState.COMPLETED if job_run["state"] in ("exception", "failed") else job_run["state"]
)


def result_from_run(job_run):
run_to_result = {
"completed": "success",
"failed": "fail",
Job.JobState.COMPLETED: "success",
Job.JobState.FAILED: "fail",
}
state = job_run["state"]
if state in list(run_to_result.keys()):
Expand Down Expand Up @@ -202,15 +205,15 @@ async def handle_message(message, task_definition=None):

if not task_type:
raise Exception("Unknown exchange: {exchange}".format(exchange=message["exchange"]))
elif task_type == "unscheduled":
elif task_type == Job.JobState.UNSCHEDULED:
jobs.append(handle_task_defined(parsed_route, task, message))
elif task_type == "pending":
elif task_type == Job.JobState.PENDING:
jobs.append(handle_task_pending(parsed_route, task, message))
elif task_type == "running":
elif task_type == Job.JobState.RUNNING:
jobs.append(handle_task_running(parsed_route, task, message))
elif task_type in ("completed", "failed"):
elif task_type in (Job.JobState.COMPLETED, Job.JobState.FAILED):
jobs.append(await handle_task_completed(parsed_route, task, message, session))
elif task_type == "exception":
elif task_type == Job.JobState.EXCEPTION:
jobs.append(await handle_task_exception(parsed_route, task, message, session))

return jobs
Expand Down
29 changes: 29 additions & 0 deletions treeherder/model/migrations/0049_alter_job_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 5.1.15 on 2025-12-18 12:39

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("model", "0048_alter_failureline_action"),
]

operations = [
migrations.AlterField(
model_name="job",
name="state",
field=models.CharField(
choices=[
("pending", "Pending"),
("running", "Running"),
("completed", "Completed"),
("failed", "Failed"),
("exception", "Exception"),
("unscheduled", "Unscheduled"),
],
default="pending",
max_length=25,
),
),
]
12 changes: 11 additions & 1 deletion treeherder/model/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,16 @@ class Job(models.Model):
This class represents a build or test job in Treeherder
"""

class JobState(models.TextChoices):
"""A representation of Job State."""

PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
EXCEPTION = "exception"
UNSCHEDULED = "unscheduled"

failures = FailuresQuerySet.as_manager()
objects = JobManager()

Expand Down Expand Up @@ -560,7 +570,7 @@ class Job(models.Model):
who = models.CharField(max_length=50)
reason = models.CharField(max_length=125)
result = models.CharField(max_length=25)
state = models.CharField(max_length=25)
state = models.CharField(max_length=25, choices=JobState.choices, default=JobState.PENDING)

submit_time = models.DateTimeField()
start_time = models.DateTimeField()
Expand Down