diff --git a/lightx2v/deploy/server/__main__.py b/lightx2v/deploy/server/__main__.py index 9d5e95ea..a6d3ca57 100644 --- a/lightx2v/deploy/server/__main__.py +++ b/lightx2v/deploy/server/__main__.py @@ -331,7 +331,7 @@ async def prepare_subtasks(task_id): def format_task(task): - task["status"] = task["status"].name + task["status"] = task["status"].name if task["status"] != TaskStatus.REJECT else TaskStatus.FAILED.name task["model_cls"] = model_pipelines.outer_model_name(task["model_cls"]) @@ -430,6 +430,8 @@ async def api_v1_task_list(request: Request, user=Depends(verify_user_access)): query_params = {"user_id": user_id} if status_filter and status_filter != "ALL": + if status_filter.upper() == TaskStatus.REJECT.name: + status_filter = TaskStatus.FAILED.name query_params["status"] = TaskStatus[status_filter.upper()] total_tasks = await task_manager.list_tasks(count=True, **query_params) @@ -694,7 +696,7 @@ async def api_v1_worker_report(request: Request, valid=Depends(verify_worker_acc ret = await task_manager.finish_subtasks(task_id, status, worker_identity=identity, worker_name=worker_name, fail_msg=fail_msg, should_running=True) # not all subtasks finished, prepare new ready subtasks - if ret not in [TaskStatus.SUCCEED, TaskStatus.FAILED]: + if ret not in FinishedStatus: await prepare_subtasks(task_id) # all subtasks succeed, delete temp data @@ -710,6 +712,10 @@ async def api_v1_worker_report(request: Request, valid=Depends(verify_worker_acc elif ret == TaskStatus.FAILED: logger.warning(f"Task {task_id} failed") + elif ret == TaskStatus.CANCEL: + logger.warning(f"Task {task_id} cancel") + elif ret == TaskStatus.REJECT: + logger.warning(f"Task {task_id} reject") return {"msg": "ok"} diff --git a/lightx2v/deploy/server/monitor.py b/lightx2v/deploy/server/monitor.py index 591b1475..b709e62f 100644 --- a/lightx2v/deploy/server/monitor.py +++ b/lightx2v/deploy/server/monitor.py @@ -6,7 +6,7 @@ from loguru import logger from lightx2v.deploy.common.utils import class_try_catch_async -from lightx2v.deploy.task_manager import TaskStatus +from lightx2v.deploy.task_manager import ActiveStatus, FinishedStatus, TaskStatus class WorkerStatus(Enum): @@ -252,13 +252,12 @@ async def check_user_busy(self, user_id, active_new_task=False): if active_new_task: # check if user has too many active tasks - active_statuses = [TaskStatus.RUNNING, TaskStatus.PENDING, TaskStatus.CREATED] - active_tasks = await self.task_manager.list_tasks(status=active_statuses, user_id=user_id) + active_tasks = await self.task_manager.list_tasks(status=ActiveStatus, user_id=user_id) if len(active_tasks) >= self.user_max_active_tasks: return f"User {user_id} has too many active tasks, {len(active_tasks)} vs {self.user_max_active_tasks}" # check if user has too many daily tasks - daily_statuses = active_statuses + [TaskStatus.SUCCEED, TaskStatus.CANCEL, TaskStatus.FAILED] + daily_statuses = ActiveStatus + FinishedStatus daily_tasks = await self.task_manager.list_tasks(status=daily_statuses, user_id=user_id, start_created_t=cur_t - 86400, include_delete=True) if len(daily_tasks) >= self.user_max_daily_tasks: return f"User {user_id} has too many daily tasks, {len(daily_tasks)} vs {self.user_max_daily_tasks}" diff --git a/lightx2v/deploy/task_manager/__init__.py b/lightx2v/deploy/task_manager/__init__.py index 7cb6f6c5..31506365 100644 --- a/lightx2v/deploy/task_manager/__init__.py +++ b/lightx2v/deploy/task_manager/__init__.py @@ -14,10 +14,11 @@ class TaskStatus(Enum): SUCCEED = 4 FAILED = 5 CANCEL = 6 + REJECT = 7 ActiveStatus = [TaskStatus.CREATED, TaskStatus.PENDING, TaskStatus.RUNNING] -FinishedStatus = [TaskStatus.SUCCEED, TaskStatus.FAILED, TaskStatus.CANCEL] +FinishedStatus = [TaskStatus.SUCCEED, TaskStatus.FAILED, TaskStatus.CANCEL, TaskStatus.REJECT] class BaseTaskManager: diff --git a/lightx2v/deploy/task_manager/local_task_manager.py b/lightx2v/deploy/task_manager/local_task_manager.py index fcf1076e..89668115 100644 --- a/lightx2v/deploy/task_manager/local_task_manager.py +++ b/lightx2v/deploy/task_manager/local_task_manager.py @@ -182,7 +182,7 @@ async def run_subtasks(self, cands, worker_identity): task_id = cand["task_id"] worker_name = cand["worker_name"] task, subtasks = self.load(task_id) - if task["status"] in [TaskStatus.SUCCEED, TaskStatus.FAILED, TaskStatus.CANCEL]: + if task["status"] in FinishedStatus: continue for sub in subtasks: if sub["worker_name"] == worker_name: @@ -225,7 +225,7 @@ async def finish_subtasks(self, task_id, status, worker_identity=None, worker_na pre = subs[0]["worker_identity"] assert pre == worker_identity, f"worker identity not matched: {pre} vs {worker_identity}" - assert status in [TaskStatus.SUCCEED, TaskStatus.FAILED], f"invalid finish status: {status}" + assert status in FinishedStatus, f"invalid finish status: {status}" for sub in subs: if sub["status"] not in FinishedStatus: if should_running and sub["status"] != TaskStatus.RUNNING: @@ -240,6 +240,11 @@ async def finish_subtasks(self, task_id, status, worker_identity=None, worker_na self.metrics_commit(records) return TaskStatus.CANCEL + if task["status"] == TaskStatus.REJECT: + self.save(task, subtasks) + self.metrics_commit(records) + return TaskStatus.REJECT + running_subs = [] failed_sub = False for sub in subtasks: diff --git a/lightx2v/deploy/task_manager/sql_task_manager.py b/lightx2v/deploy/task_manager/sql_task_manager.py index d40fc079..ff1fa30e 100644 --- a/lightx2v/deploy/task_manager/sql_task_manager.py +++ b/lightx2v/deploy/task_manager/sql_task_manager.py @@ -622,7 +622,7 @@ async def run_subtasks(self, cands, worker_identity): worker_name = cand["worker_name"] task, subs = await self.load(conn, task_id, worker_name=worker_name) assert len(subs) == 1, f"task {task_id} has multiple subtasks: {subs} with worker_name: {worker_name}" - if task["status"] in [TaskStatus.SUCCEED, TaskStatus.FAILED, TaskStatus.CANCEL]: + if task["status"] in FinishedStatus: continue self.mark_subtask_change(records, subs[0], subs[0]["status"], TaskStatus.RUNNING) @@ -682,7 +682,7 @@ async def finish_subtasks(self, task_id, status, worker_identity=None, worker_na pre = subs[0]["worker_identity"] assert pre == worker_identity, f"worker identity not matched: {pre} vs {worker_identity}" - assert status in [TaskStatus.SUCCEED, TaskStatus.FAILED], f"invalid finish status: {status}" + assert status in FinishedStatus, f"invalid finish status: {status}" for sub in subs: if sub["status"] not in FinishedStatus: if should_running and sub["status"] != TaskStatus.RUNNING: @@ -703,6 +703,10 @@ async def finish_subtasks(self, task_id, status, worker_identity=None, worker_na self.metrics_commit(records) return TaskStatus.CANCEL + if task["status"] == TaskStatus.REJECT: + self.metrics_commit(records) + return TaskStatus.REJECT + running_subs = [] failed_sub = False for sub in subtasks: