diff --git a/specifyweb/backend/accounts/views.py b/specifyweb/backend/accounts/views.py index 6dc56aab016..22f1e477401 100644 --- a/specifyweb/backend/accounts/views.py +++ b/specifyweb/backend/accounts/views.py @@ -1,5 +1,4 @@ import base64 -from datetime import timedelta import hashlib import hmac import json @@ -17,7 +16,7 @@ from django.db.models import Max from django.shortcuts import render from django.template.response import TemplateResponse -from django.utils import crypto, timezone +from django.utils import crypto from django.utils.http import url_has_allowed_host_and_scheme, urlencode from django.views.decorators.cache import never_cache from typing import cast @@ -295,7 +294,7 @@ def choose_collection(request) -> http.HttpResponse: id to the user if one is provided. """ from specifyweb.backend.context.views import set_collection_cookie, users_collections_for_sp7 - from specifyweb.backend.setup_tool.api import is_config_task_running + from specifyweb.backend.setup_tool.api import filter_ready_collections_for_config_tasks from specifyweb.specify.api.serializers import obj_to_data, toJson @@ -320,14 +319,7 @@ def choose_collection(request) -> http.HttpResponse: ) available_collections = users_collections_for_sp7(request.specify_user.id) - if is_config_task_running(): - # If config tasks are running, filter out newly created collections - fifteen_minutes_ago = timezone.now() - timedelta(minutes=15) - available_collections = [ - c - for c in available_collections - if c.timestampcreated is None or c.timestampcreated <= fifteen_minutes_ago - ] + available_collections = filter_ready_collections_for_config_tasks(available_collections) if len(available_collections) == 1: set_collection_cookie(redirect_resp, available_collections[0].id) @@ -588,4 +580,3 @@ def set_admin_status(request, userid): else: user.clear_admin() return http.HttpResponse('false', content_type='text/plain') - diff --git a/specifyweb/backend/context/views.py b/specifyweb/backend/context/views.py index 373d599f3f6..87861acc126 100644 --- a/specifyweb/backend/context/views.py +++ b/specifyweb/backend/context/views.py @@ -5,7 +5,6 @@ import json import os import re -from datetime import timedelta from typing import List from django.conf import settings @@ -38,9 +37,12 @@ from .remote_prefs import get_remote_prefs from .schema_localization import get_schema_languages, get_schema_localization from .viewsets import get_views -from specifyweb.backend.setup_tool.api import get_config_progress, is_config_task_running - - +from specifyweb.backend.setup_tool.api import ( + get_config_progress, + filter_ready_collections_for_config_tasks, + filter_ready_disciplines_for_config_tasks, +) + def set_collection_cookie(response, collection_id): # pragma: no cover response.set_cookie('collection', str(collection_id), max_age=365*24*60*60) @@ -616,8 +618,7 @@ def viewsets(request): file not in FORM_RESOURCE_EXCLUDED_LST, all_files)) return HttpResponse(json.dumps(viewsets), content_type="application/json") - - + def view_helper(request, limit): if 'collectionid' in request.GET: # Allow a URL parameter to override the logged in collection. @@ -644,21 +645,13 @@ def remote_prefs(request): @require_http_methods(['GET', 'HEAD']) def get_server_time(request): return JsonResponse({"server_time": timezone.now().isoformat()}) - - + def _filter_collections_not_ready_for_config_task(collections): - if not is_config_task_running(): - return collections - - # If config tasks are running, filter out newly created collections. - fifteen_minutes_ago = timezone.now() - timedelta(minutes=15) - return [ - c - for c in collections - if c.timestampcreated is None or c.timestampcreated <= fifteen_minutes_ago - ] - - + return filter_ready_collections_for_config_tasks(collections) + +def _filter_disciplines_not_ready_for_config_task(disciplines): + return filter_ready_disciplines_for_config_tasks(disciplines) + def _build_system_data(*, filter_not_ready_collections: bool): institution = Institution.objects.get() divisions = list(Division.objects.all()) @@ -666,6 +659,7 @@ def _build_system_data(*, filter_not_ready_collections: bool): collections = list(Collection.objects.all()) if filter_not_ready_collections: + disciplines = _filter_disciplines_not_ready_for_config_task(disciplines) collections = _filter_collections_not_ready_for_config_task(collections) discipline_map = {} @@ -806,8 +800,7 @@ def get_endpoint_tags(endpoint): list = [endpoint[method]['tags'] for method in methods] return [item for sublist in list for item in sublist] # flatten the list - - + def get_tags(endpoints): tag_names = [get_endpoint_tags(endpoint) for endpoint in endpoints.values()] @@ -934,8 +927,7 @@ def get_endpoints( prefix + path, preparams + params ) - - + def generate_openapi_for_endpoints(all_endpoints=False): # pragma: no cover """Returns a JSON description of endpoints. diff --git a/specifyweb/backend/setup_tool/api.py b/specifyweb/backend/setup_tool/api.py index 60a63ab8a91..e06ba640f65 100644 --- a/specifyweb/backend/setup_tool/api.py +++ b/specifyweb/backend/setup_tool/api.py @@ -5,11 +5,9 @@ import json from typing import Optional -from datetime import timedelta from django.http import (JsonResponse) from django.db.models import Max from django.db import transaction -from django.utils import timezone from specifyweb.backend.permissions.models import UserPolicy @@ -32,6 +30,10 @@ ) from specifyweb.celery_tasks import MissingWorkerError, get_running_worker_task_names from specifyweb.backend.setup_tool.tree_defaults import start_default_tree_from_configuration, update_tree_scoping +from specifyweb.backend.setup_tool.task_tracking import ( + is_collection_ready_for_config_tasks, + is_discipline_ready_for_config_tasks, +) from specifyweb.specify.models import Institution, Discipline from specifyweb.backend.businessrules.uniqueness_rules import apply_default_uniqueness_rules from specifyweb.specify.management.commands.run_key_migration_functions import fix_cots, fix_schema_config @@ -41,7 +43,6 @@ APP_VERSION = "7" SCHEMA_VERSION = "2.10" -CONFIG_TASK_COLLECTION_BLOCK_WINDOW = timedelta(minutes=15) class SetupError(Exception): """Raised by any setup tasks.""" @@ -352,7 +353,7 @@ def create_collection(data, run_fix_schema_config_async: bool = True): # Create picklists create_default_picklists(new_collection, discipline.type) if run_fix_schema_config_async: - queue_fix_schema_config_background() + queue_fix_schema_config_background(collection_id=new_collection.id) else: fix_schema_config() @@ -527,6 +528,34 @@ def get_config_resource_progress(running_task_names: Optional[list[str]] = None) active_task_names = set(running_task_names or []) return _get_config_resource_progress_from_active_names(active_task_names) +def is_collection_busy_for_config_tasks( + collection_id: int, + discipline_id: Optional[int] = None, +) -> bool: + if discipline_id is None: + collection = models.Collection.objects.filter(id=collection_id).only("discipline_id").first() + if collection is None: + return False + discipline_id = collection.discipline_id + return not is_collection_ready_for_config_tasks(collection_id, discipline_id) + +def is_discipline_busy_for_config_tasks(discipline_id: int) -> bool: + return not is_discipline_ready_for_config_tasks(discipline_id) + +def filter_ready_collections_for_config_tasks(collections: list) -> list: + return [ + collection + for collection in collections + if not is_collection_busy_for_config_tasks(collection.id, collection.discipline_id) + ] + +def filter_ready_disciplines_for_config_tasks(disciplines: list) -> list: + return [ + discipline + for discipline in disciplines + if not is_discipline_busy_for_config_tasks(discipline.id) + ] + def get_config_progress(collection_id: Optional[int] = None) -> dict: """Returns a dict of the status of config/setup related background tasks""" try: @@ -534,9 +563,11 @@ def get_config_progress(collection_id: Optional[int] = None) -> dict: except MissingWorkerError: running_task_names = [] - busy = is_config_task_running(running_task_names) - if busy and collection_id is not None: - busy = _is_new_collection_in_time_window(collection_id) + busy = ( + is_config_task_running(running_task_names) + if collection_id is None + else is_collection_busy_for_config_tasks(collection_id) + ) last_error = None completed_resources = get_config_resource_progress(running_task_names) @@ -545,15 +576,3 @@ def get_config_progress(collection_id: Optional[int] = None) -> dict: "last_error": last_error, "busy": busy, } - -def _is_new_collection_in_time_window(collection_id: int) -> bool: - """Return True when a newly created collection is still in the new collection time window.""" - from specifyweb.specify.models import Collection - - collection = Collection.objects.filter(id=collection_id).only("timestampcreated").first() - if collection is None: - return False - if collection.timestampcreated is None: - return False - - return collection.timestampcreated > timezone.now() - CONFIG_TASK_COLLECTION_BLOCK_WINDOW diff --git a/specifyweb/backend/setup_tool/redis.py b/specifyweb/backend/setup_tool/redis.py index 1a70c241a9c..259effa029e 100644 --- a/specifyweb/backend/setup_tool/redis.py +++ b/specifyweb/backend/setup_tool/redis.py @@ -2,5 +2,10 @@ # Also defined separately in setup_tool/apps.py ACTIVE_TASK_REDIS_KEY = "specify:{database}:setup:active_task_id" ACTIVE_TASK_TTL = 60*60*2 # setup should be less than 2 hours + # Keep track of last error. -LAST_ERROR_REDIS_KEY = "specify:{database}:setup:last_error" \ No newline at end of file +LAST_ERROR_REDIS_KEY = "specify:{database}:setup:last_error" + +# Track async setup/config tasks by resource scope. +COLLECTION_TASKS_REDIS_KEY = "specify:{database}:setup:collection:{collection_id}:tasks" +DISCIPLINE_TASKS_REDIS_KEY = "specify:{database}:setup:discipline:{discipline_id}:tasks" diff --git a/specifyweb/backend/setup_tool/schema_defaults.py b/specifyweb/backend/setup_tool/schema_defaults.py index e5e50fbb4d2..791ac736ac5 100644 --- a/specifyweb/backend/setup_tool/schema_defaults.py +++ b/specifyweb/backend/setup_tool/schema_defaults.py @@ -2,6 +2,7 @@ from specifyweb.specify.migration_utils.update_schema_config import update_table_schema_config_with_defaults from specifyweb.celery_tasks import app from .utils import load_json_from_file +from .task_tracking import queue_discipline_background_task, finish_discipline_background_task from specifyweb.specify.models import Discipline from django.db import transaction from celery.exceptions import MaxRetriesExceededError @@ -68,11 +69,15 @@ def queue_apply_schema_defaults_background(discipline_id: int) -> str: task_id = str(uuid4()) # Dispatch only after the discipline row is committed so workers can read it. - transaction.on_commit( - lambda: apply_schema_defaults_task.apply_async( + def enqueue() -> None: + async_result = apply_schema_defaults_task.apply_async( args=[discipline_id], task_id=task_id, ) + queue_discipline_background_task(discipline_id, async_result.id) + + transaction.on_commit( + enqueue ) return task_id @@ -93,5 +98,9 @@ def apply_schema_defaults_task(self, discipline_id: int): discipline_id, SCHEMA_DEFAULTS_MISSING_DISCIPLINE_MAX_RETRIES, ) + finish_discipline_background_task(discipline_id, self.request.id) return - apply_schema_defaults(discipline) + try: + apply_schema_defaults(discipline) + finally: + finish_discipline_background_task(discipline_id, self.request.id) diff --git a/specifyweb/backend/setup_tool/setup_tasks.py b/specifyweb/backend/setup_tool/setup_tasks.py index bc6bf0dd1d9..3618acbdc55 100644 --- a/specifyweb/backend/setup_tool/setup_tasks.py +++ b/specifyweb/backend/setup_tool/setup_tasks.py @@ -14,6 +14,10 @@ from specifyweb.celery_tasks import is_worker_alive, MissingWorkerError from specifyweb.backend.redis_cache.store import set_string, get_string from specifyweb.backend.setup_tool.redis import ACTIVE_TASK_REDIS_KEY, ACTIVE_TASK_TTL, LAST_ERROR_REDIS_KEY +from specifyweb.backend.setup_tool.task_tracking import ( + queue_collection_background_task, + finish_collection_background_task, +) from uuid import uuid4 import logging @@ -38,9 +42,12 @@ def setup_database_background(data: dict) -> str: return task.id -def queue_fix_schema_config_background() -> str: +def queue_fix_schema_config_background(collection_id: Optional[int] = None) -> str: """Queue fix_schema_config to run asynchronously and return the task id""" - task = fix_schema_config_task.apply_async() + args = [collection_id] if collection_id is not None else [] + task = fix_schema_config_task.apply_async(args=args) + if collection_id is not None: + queue_collection_background_task(collection_id, task.id) return task.id def get_active_setup_task() -> Tuple[Optional[AsyncResult], bool]: @@ -171,9 +178,13 @@ def update_progress(): raise @app.task(bind=True) -def fix_schema_config_task(self): +def fix_schema_config_task(self, collection_id: Optional[int] = None): """Run schema config migration fixups in a background worker""" - fix_schema_config() + try: + fix_schema_config() + finally: + if collection_id is not None: + finish_collection_background_task(collection_id, self.request.id) def get_last_setup_error() -> Optional[str]: err = get_string(LAST_ERROR_REDIS_KEY) @@ -258,4 +269,4 @@ def create_discipline_and_trees_task(data: dict): )) except Exception as e: logger.exception(f'Error creating discipline: {e}') - raise \ No newline at end of file + raise diff --git a/specifyweb/backend/setup_tool/task_tracking.py b/specifyweb/backend/setup_tool/task_tracking.py new file mode 100644 index 00000000000..a7190612b87 --- /dev/null +++ b/specifyweb/backend/setup_tool/task_tracking.py @@ -0,0 +1,130 @@ +from typing import Optional +import logging + +from specifyweb.backend.redis_cache.store import add_to_set, delete_key, remove_from_set, set_members +from specifyweb.celery_tasks import CELERY_TASK_STATE, app +from specifyweb.backend.setup_tool.redis import ( + COLLECTION_TASKS_REDIS_KEY, + DISCIPLINE_TASKS_REDIS_KEY, +) + +logger = logging.getLogger(__name__) + +ACTIVE_TASK_STATES = frozenset( + { + CELERY_TASK_STATE.PENDING, + CELERY_TASK_STATE.RECEIVED, + CELERY_TASK_STATE.STARTED, + CELERY_TASK_STATE.RETRY, + "PROGRESS", + "RUNNING", + } +) + +TERMINAL_TASK_STATES = frozenset( + { + CELERY_TASK_STATE.SUCCESS, + CELERY_TASK_STATE.FAILURE, + CELERY_TASK_STATE.REVOKED, + } +) + +def _collection_tasks_key(collection_id: int) -> str: + return COLLECTION_TASKS_REDIS_KEY.replace("{collection_id}", str(collection_id)) + +def _discipline_tasks_key(discipline_id: int) -> str: + return DISCIPLINE_TASKS_REDIS_KEY.replace("{discipline_id}", str(discipline_id)) + +def _remove_task_ids_and_delete_empty_key(key: str, *task_ids: str) -> None: + remove_from_set(key, *task_ids) + if len(set_members(key)) == 0: + delete_key(key) + +def queue_collection_background_task(collection_id: int, task_id: str) -> None: + try: + add_to_set(_collection_tasks_key(collection_id), task_id) + except Exception: + logger.warning( + "Failed to track collection task %s for collection %s.", + task_id, + collection_id, + ) + +def finish_collection_background_task(collection_id: int, task_id: str) -> None: + try: + _remove_task_ids_and_delete_empty_key(_collection_tasks_key(collection_id), task_id) + except Exception: + logger.warning( + "Failed to clear tracked collection task %s for collection %s.", + task_id, + collection_id, + ) + +def queue_discipline_background_task(discipline_id: int, task_id: str) -> None: + try: + add_to_set(_discipline_tasks_key(discipline_id), task_id) + except Exception: + logger.warning( + "Failed to track discipline task %s for discipline %s.", + task_id, + discipline_id, + ) + +def finish_discipline_background_task(discipline_id: int, task_id: str) -> None: + try: + _remove_task_ids_and_delete_empty_key(_discipline_tasks_key(discipline_id), task_id) + except Exception: + logger.warning( + "Failed to clear tracked discipline task %s for discipline %s.", + task_id, + discipline_id, + ) + +def _active_task_ids_from_redis_key(key: str) -> set[str]: + try: + task_ids = set_members(key) + if not task_ids: + return set() + + active_task_ids: set[str] = set() + finished_task_ids: list[str] = [] + for task_id in task_ids: + task_state = app.AsyncResult(task_id).state + if task_state in ACTIVE_TASK_STATES: + active_task_ids.add(task_id) + continue + if task_state in TERMINAL_TASK_STATES: + finished_task_ids.append(task_id) + continue + # Unknown states should block readiness until they transition. + active_task_ids.add(task_id) + + if finished_task_ids: + _remove_task_ids_and_delete_empty_key(key, *finished_task_ids) + + return active_task_ids + except Exception: + logger.warning("Failed to read task tracking key %s.", key) + return set() + +def get_active_collection_background_tasks(collection_id: int) -> set[str]: + return _active_task_ids_from_redis_key(_collection_tasks_key(collection_id)) + +def get_active_discipline_background_tasks(discipline_id: int) -> set[str]: + return _active_task_ids_from_redis_key(_discipline_tasks_key(discipline_id)) + +def has_collection_background_tasks(collection_id: int) -> bool: + return len(get_active_collection_background_tasks(collection_id)) > 0 + +def has_discipline_background_tasks(discipline_id: int) -> bool: + return len(get_active_discipline_background_tasks(discipline_id)) > 0 + +def is_collection_ready_for_config_tasks(collection_id: int, discipline_id: Optional[int] = None) -> bool: + if has_collection_background_tasks(collection_id): + return False + if discipline_id is not None and has_discipline_background_tasks(discipline_id): + return False + return True + +def is_discipline_ready_for_config_tasks(discipline_id: int) -> bool: + return not has_discipline_background_tasks(discipline_id)