Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions lightx2v/deploy/server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ async def prepare_subtasks(task_id):


def format_task(task):
task["status"] = task["status"].name
task["status"] = task["status"].name if task["status"] != TaskStatus.REJECT else TaskStatus.FAILED.name
task["model_cls"] = model_pipelines.outer_model_name(task["model_cls"])


Expand Down Expand Up @@ -430,6 +430,8 @@ async def api_v1_task_list(request: Request, user=Depends(verify_user_access)):

query_params = {"user_id": user_id}
if status_filter and status_filter != "ALL":
if status_filter.upper() == TaskStatus.REJECT.name:
status_filter = TaskStatus.FAILED.name
query_params["status"] = TaskStatus[status_filter.upper()]

total_tasks = await task_manager.list_tasks(count=True, **query_params)
Expand Down Expand Up @@ -694,7 +696,7 @@ async def api_v1_worker_report(request: Request, valid=Depends(verify_worker_acc
ret = await task_manager.finish_subtasks(task_id, status, worker_identity=identity, worker_name=worker_name, fail_msg=fail_msg, should_running=True)

# not all subtasks finished, prepare new ready subtasks
if ret not in [TaskStatus.SUCCEED, TaskStatus.FAILED]:
if ret not in FinishedStatus:
await prepare_subtasks(task_id)

# all subtasks succeed, delete temp data
Expand All @@ -710,6 +712,10 @@ async def api_v1_worker_report(request: Request, valid=Depends(verify_worker_acc

elif ret == TaskStatus.FAILED:
logger.warning(f"Task {task_id} failed")
elif ret == TaskStatus.CANCEL:
logger.warning(f"Task {task_id} cancel")
elif ret == TaskStatus.REJECT:
logger.warning(f"Task {task_id} reject")

return {"msg": "ok"}

Expand Down
7 changes: 3 additions & 4 deletions lightx2v/deploy/server/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from loguru import logger

from lightx2v.deploy.common.utils import class_try_catch_async
from lightx2v.deploy.task_manager import TaskStatus
from lightx2v.deploy.task_manager import ActiveStatus, FinishedStatus, TaskStatus


class WorkerStatus(Enum):
Expand Down Expand Up @@ -252,13 +252,12 @@ async def check_user_busy(self, user_id, active_new_task=False):

if active_new_task:
# check if user has too many active tasks
active_statuses = [TaskStatus.RUNNING, TaskStatus.PENDING, TaskStatus.CREATED]
active_tasks = await self.task_manager.list_tasks(status=active_statuses, user_id=user_id)
active_tasks = await self.task_manager.list_tasks(status=ActiveStatus, user_id=user_id)
if len(active_tasks) >= self.user_max_active_tasks:
return f"User {user_id} has too many active tasks, {len(active_tasks)} vs {self.user_max_active_tasks}"

# check if user has too many daily tasks
daily_statuses = active_statuses + [TaskStatus.SUCCEED, TaskStatus.CANCEL, TaskStatus.FAILED]
daily_statuses = ActiveStatus + FinishedStatus
daily_tasks = await self.task_manager.list_tasks(status=daily_statuses, user_id=user_id, start_created_t=cur_t - 86400, include_delete=True)
if len(daily_tasks) >= self.user_max_daily_tasks:
return f"User {user_id} has too many daily tasks, {len(daily_tasks)} vs {self.user_max_daily_tasks}"
Expand Down
3 changes: 2 additions & 1 deletion lightx2v/deploy/task_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ class TaskStatus(Enum):
SUCCEED = 4
FAILED = 5
CANCEL = 6
REJECT = 7


ActiveStatus = [TaskStatus.CREATED, TaskStatus.PENDING, TaskStatus.RUNNING]
FinishedStatus = [TaskStatus.SUCCEED, TaskStatus.FAILED, TaskStatus.CANCEL]
FinishedStatus = [TaskStatus.SUCCEED, TaskStatus.FAILED, TaskStatus.CANCEL, TaskStatus.REJECT]


class BaseTaskManager:
Expand Down
9 changes: 7 additions & 2 deletions lightx2v/deploy/task_manager/local_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async def run_subtasks(self, cands, worker_identity):
task_id = cand["task_id"]
worker_name = cand["worker_name"]
task, subtasks = self.load(task_id)
if task["status"] in [TaskStatus.SUCCEED, TaskStatus.FAILED, TaskStatus.CANCEL]:
if task["status"] in FinishedStatus:
continue
for sub in subtasks:
if sub["worker_name"] == worker_name:
Expand Down Expand Up @@ -225,7 +225,7 @@ async def finish_subtasks(self, task_id, status, worker_identity=None, worker_na
pre = subs[0]["worker_identity"]
assert pre == worker_identity, f"worker identity not matched: {pre} vs {worker_identity}"

assert status in [TaskStatus.SUCCEED, TaskStatus.FAILED], f"invalid finish status: {status}"
assert status in FinishedStatus, f"invalid finish status: {status}"
for sub in subs:
if sub["status"] not in FinishedStatus:
if should_running and sub["status"] != TaskStatus.RUNNING:
Expand All @@ -240,6 +240,11 @@ async def finish_subtasks(self, task_id, status, worker_identity=None, worker_na
self.metrics_commit(records)
return TaskStatus.CANCEL

if task["status"] == TaskStatus.REJECT:
self.save(task, subtasks)
self.metrics_commit(records)
return TaskStatus.REJECT

running_subs = []
failed_sub = False
for sub in subtasks:
Expand Down
8 changes: 6 additions & 2 deletions lightx2v/deploy/task_manager/sql_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ async def run_subtasks(self, cands, worker_identity):
worker_name = cand["worker_name"]
task, subs = await self.load(conn, task_id, worker_name=worker_name)
assert len(subs) == 1, f"task {task_id} has multiple subtasks: {subs} with worker_name: {worker_name}"
if task["status"] in [TaskStatus.SUCCEED, TaskStatus.FAILED, TaskStatus.CANCEL]:
if task["status"] in FinishedStatus:
continue

self.mark_subtask_change(records, subs[0], subs[0]["status"], TaskStatus.RUNNING)
Expand Down Expand Up @@ -682,7 +682,7 @@ async def finish_subtasks(self, task_id, status, worker_identity=None, worker_na
pre = subs[0]["worker_identity"]
assert pre == worker_identity, f"worker identity not matched: {pre} vs {worker_identity}"

assert status in [TaskStatus.SUCCEED, TaskStatus.FAILED], f"invalid finish status: {status}"
assert status in FinishedStatus, f"invalid finish status: {status}"
for sub in subs:
if sub["status"] not in FinishedStatus:
if should_running and sub["status"] != TaskStatus.RUNNING:
Expand All @@ -703,6 +703,10 @@ async def finish_subtasks(self, task_id, status, worker_identity=None, worker_na
self.metrics_commit(records)
return TaskStatus.CANCEL

if task["status"] == TaskStatus.REJECT:
self.metrics_commit(records)
return TaskStatus.REJECT

running_subs = []
failed_sub = False
for sub in subtasks:
Expand Down