From 7083571debccb1942a412c02e6a8caff51cea67e Mon Sep 17 00:00:00 2001 From: MariusWirtz Date: Mon, 8 Sep 2025 17:32:37 +0200 Subject: [PATCH] Allow passing timeout & cancel_at_timeout on task Solves #92 --- rushti.py | 61 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/rushti.py b/rushti.py index c9b1cee..98058c6 100644 --- a/rushti.py +++ b/rushti.py @@ -74,6 +74,7 @@ UNIQUE_STRING = uuid.uuid4().hex[:8].upper() TRUE_VALUES = ["1", "y", "yes", "true", "t"] +TIMEOUT_PARAMS = ["timeout", "cancel_at_timeout"] if not os.path.isfile(LOGGING_CONFIG): raise ValueError("{config} does not exist".format(config=LOGGING_CONFIG)) @@ -178,18 +179,20 @@ def extract_task_or_wait_from_line(line: str) -> Union[Task, Wait]: return extract_task_from_line(line, task_class=Task) + def extract_task_from_line_type_opt(line: str) -> OptimizedTask: return extract_task_from_line(line, task_class=OptimizedTask) + def extract_task_from_line(line: str, task_class: Union[Type[Task], Type[OptimizedTask]]) -> Union[Task, OptimizedTask]: line_arguments = parse_line_arguments(line) - + if task_class == OptimizedTask: task_id = line_arguments.pop("id") predecessors = line_arguments.pop("predecessors", []) require_predecessor_success = line_arguments.pop("require_predecessor_success", False) succeed_on_minor_errors = line_arguments.pop("succeed_on_minor_errors", False) - + return OptimizedTask( task_id=task_id, instance_name=line_arguments.pop("instance"), @@ -205,19 +208,20 @@ def extract_task_from_line(line: str, task_class: Union[Type[Task], Type[Optimiz process_name=line_arguments.pop("process"), parameters=line_arguments) + def parse_line_arguments(line: str) -> Dict[str, Any]: line_arguments = {} - + # Use shlex to split the line with posix=True for proper escaping parts = shlex.split(line, posix=True) - + for part in parts: if '=' not in part: continue - + # Split on the first '=' to get argument and value argument, value = part.split('=', 1) - + # Handle specific keys with logic key_lower = argument.lower() if key_lower in ["process", "instance", "id"]: @@ -235,6 +239,7 @@ def parse_line_arguments(line: str) -> Dict[str, Any]: return line_arguments + def expand_task( tm1_services: Dict[str, TM1Service], task: Union[Task, OptimizedTask]) -> List[Union[Task, OptimizedTask]]: @@ -260,9 +265,10 @@ def expand_task( result.append(OptimizedTask(task.id, task.instance_name, task.process_name, parameters=expanded_params, predecessors=task.predecessors, require_predecessor_success=task.require_predecessor_success, - succeed_on_minor_errors = task.succeed_on_minor_errors)) + succeed_on_minor_errors=task.succeed_on_minor_errors)) elif isinstance(task, Task): - result.append(Task(task.instance_name, task.process_name, parameters=expanded_params, succeed_on_minor_errors=task.succeed_on_minor_errors)) + result.append(Task(task.instance_name, task.process_name, parameters=expanded_params, + succeed_on_minor_errors=task.succeed_on_minor_errors)) return result @@ -478,26 +484,41 @@ def get_ordered_tasks_and_waits( tm1_services) +def str_to_bool(value): + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.strip().lower() in TRUE_VALUES + return False + + def execute_process_with_retries(tm1: TM1Service, task: Task, retries: int): for attempt in range(retries + 1): try: + # timeout must be passed as numeric + timeout = task.parameters.get("timeout", None) + if timeout: + cancel_at_timeout = task.parameters.get("cancel_at_timeout", False) + task.parameters["timeout"] = int(timeout) + task.parameters["cancel_at_timeout"] = str_to_bool(cancel_at_timeout) + # Execute the process and unpack results success, status, error_log_file = tm1.processes.execute_with_return( - process_name=task.process_name, + process_name=task.process_name, **task.parameters) # Handle minor errors if not success and task.succeed_on_minor_errors and status == 'HasMinorErrors': success = True msg = MSG_PROCESS_HAS_MINOR_ERRORS.format( - process=task.process_name, - parameters=task.parameters, - status=status, - retries=retries, - instance=task.instance_name, - error_file=error_log_file) + process=task.process_name, + parameters=task.parameters, + status=status, + retries=retries, + instance=task.instance_name, + error_file=error_log_file) logging.warning(msg) - + if success: return success, status, error_log_file, attempt @@ -650,7 +671,12 @@ def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bo process_params = [param['Name'] for param in tm1.processes.get(task.process_name).parameters] # check for missing parameter names - missing_params = [param for param in task_params if param not in process_params] + missing_params = [ + param + for param + in task_params + if param not in process_params and param not in TIMEOUT_PARAMS + ] if len(missing_params) > 0: msg = MSG_PROCESS_PARAMS_INCORRECT.format( process=task.process_name, @@ -895,4 +921,3 @@ def exit_rushti( start_time=start, end_time=end, elapsed_time=duration) - \ No newline at end of file