From e316cdf8fb5dfc39899f3034848096c77d64d859 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Mon, 8 Jul 2024 11:48:51 +0200 Subject: [PATCH 01/12] chore(ci): update ruff, again Signed-off-by: Yves Bastide --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 89ccd73d..bf892f12 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,7 +13,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.5.0 + rev: v0.5.1 hooks: # Run the linter. - id: ruff From 44172cacd57b00d813167c531477119b407b8588 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Mon, 8 Jul 2024 11:52:05 +0200 Subject: [PATCH 02/12] chore(ci): update pre-commit-hooks Signed-off-by: Yves Bastide --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index bf892f12..d495c09b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v4.6.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer From e04e5b491469eb30a684409a1111f6ea673aa7d1 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Sun, 30 Jun 2024 17:19:49 +0200 Subject: [PATCH 03/12] docs: update pirate/README.md Signed-off-by: Yves Bastide --- examples/pirate/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/pirate/README.md b/examples/pirate/README.md index a02a6831..c3ab4293 100644 --- a/examples/pirate/README.md +++ b/examples/pirate/README.md @@ -7,7 +7,7 @@ It can be executed like this: ``` export AWS_DEFAULT_REGION=eu-west-1 export SWF_DOMAIN=TestDomain -simpleflow standalone \ +PYTHONPATH="$PWD" simpleflow standalone \ --nb-deciders 1 \ --nb-workers 2 \ --input '{"kwargs":{"money_needed": 120}}' \ From f20f53d3fba74b688e544878ab0b276f70bdfe36 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Sun, 30 Jun 2024 17:30:57 +0200 Subject: [PATCH 04/12] feat: ConnectedSWFObject: remove boto3_client Remove unused `boto3_client` argument. Signed-off-by: Yves Bastide --- simpleflow/swf/mapper/core.py | 46 ++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/simpleflow/swf/mapper/core.py b/simpleflow/swf/mapper/core.py index 594c0d91..474ba8bf 100644 --- a/simpleflow/swf/mapper/core.py +++ b/simpleflow/swf/mapper/core.py @@ -37,18 +37,20 @@ class ConnectedSWFObject: delay=retry.exponential, on_exceptions=(TypeError, NoCredentialsError), ) - def __init__(self, *args, **kwargs): - self.region = SETTINGS.get("region") or kwargs.get("region") or DEFAULT_AWS_REGION + def __init__( + self, + *, + region: str | None = None, + ): + self.region = SETTINGS.get("region") or region or DEFAULT_AWS_REGION # Use settings-provided keys if available, otherwise pass empty # dictionary to boto SWF client, which will use its default credentials # chain provider. cred_keys = ["aws_access_key_id", "aws_secret_access_key"] - creds_ = {k: SETTINGS[k] for k in cred_keys if SETTINGS.get(k, None)} + creds_: dict[str, Any] = {k: SETTINGS[k] for k in cred_keys if SETTINGS.get(k, None)} - self.boto3_client = kwargs.pop("boto3_client", None) - if not self.boto3_client: - # raises EndpointConnectionError if region is wrong - self.boto3_client = get_or_create_boto3_client(region_name=self.region, service_name="swf", **creds_) + # raises EndpointConnectionError if region is wrong + self.boto3_client = get_or_create_boto3_client(region_name=self.region, service_name="swf", **creds_) logger.debug(f"initiated connection to region={self.region}") @@ -70,7 +72,7 @@ def list_open_workflow_executions( "domain": domain, "startTimeFilter": { "oldestDate": datetime.fromtimestamp(oldest_date), - "latestDate": datetime.fromtimestamp(latest_date) if latest_date is not None else None, + "latestDate": (datetime.fromtimestamp(latest_date) if latest_date is not None else None), }, "nextPageToken": next_page_token, "maximumPageSize": maximum_page_size, @@ -120,12 +122,12 @@ def list_closed_workflow_executions( if start_oldest_date is not None: kwargs["startTimeFilter"] = { "oldestDate": datetime.fromtimestamp(start_oldest_date), - "latestDate": datetime.fromtimestamp(start_latest_date) if start_latest_date is not None else None, + "latestDate": (datetime.fromtimestamp(start_latest_date) if start_latest_date is not None else None), } if close_oldest_date is not None: kwargs["closeTimeFilter"] = { "oldestDate": datetime.fromtimestamp(close_oldest_date), - "latestDate": datetime.fromtimestamp(close_latest_date) if close_latest_date is not None else None, + "latestDate": (datetime.fromtimestamp(close_latest_date) if close_latest_date is not None else None), } if close_status: kwargs["closeStatusFilter"] = { @@ -485,11 +487,13 @@ def list_activity_types( reverse_order: bool | None = None, ): kwargs = { - "activityType": { - "name": name, - } - if name - else None, + "activityType": ( + { + "name": name, + } + if name + else None + ), "maximumPageSize": maximum_page_size, "nextPageToken": next_page_token, "reverseOrder": reverse_order, @@ -537,11 +541,13 @@ def start_workflow_execution( task_start_to_close_timeout: str | None = None, ): kwargs = { - "taskList": { - "name": task_list, - } - if task_list - else None, + "taskList": ( + { + "name": task_list, + } + if task_list + else None + ), "childPolicy": child_policy, "executionStartToCloseTimeout": execution_start_to_close_timeout, "input": input if input is not None else "", From 8002d27514b5f9a3bd69b6c4e970b3ee92beb6dd Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Sun, 30 Jun 2024 17:32:22 +0200 Subject: [PATCH 05/12] style: fix typo Signed-off-by: Yves Bastide --- simpleflow/swf/mapper/settings.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/simpleflow/swf/mapper/settings.py b/simpleflow/swf/mapper/settings.py index 153287a3..55e080b5 100644 --- a/simpleflow/swf/mapper/settings.py +++ b/simpleflow/swf/mapper/settings.py @@ -99,9 +99,6 @@ def from_home(path: str | os.PathLike = ".swf") -> dict[str, str]: """Retrieves settings from home environment If HOME environment is applicable, search $HOME/path. - - :rtype: dict - """ if "HOME" in os.environ: swf_path = os.path.join(os.environ["HOME"], path) @@ -116,7 +113,7 @@ def get(path: str | os.PathLike = ".swf") -> dict[str, str]: First, it will try to retrieve settings from a *path* in the user's home directory. Other it tries to load the settings from the environment. - If both return an empty dict, it will also return a empty dict. + If both return an empty dict, it will also return an empty dict. """ return from_home(path) or from_env() From 91c78175295ed612640f10b27878bc85c302a916 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Sun, 30 Jun 2024 18:01:37 +0200 Subject: [PATCH 06/12] feat: add proxy Signed-off-by: Yves Bastide --- simpleflow/command.py | 12 +++ simpleflow/swf/process/proxy/__init__.py | 0 simpleflow/swf/process/proxy/command.py | 117 +++++++++++++++++++++++ 3 files changed, 129 insertions(+) create mode 100644 simpleflow/swf/process/proxy/__init__.py create mode 100644 simpleflow/swf/process/proxy/command.py diff --git a/simpleflow/command.py b/simpleflow/command.py index 040f09b2..f618836c 100644 --- a/simpleflow/command.py +++ b/simpleflow/command.py @@ -21,6 +21,7 @@ from simpleflow.settings import print_settings from simpleflow.swf import helpers from simpleflow.swf.process import decider, worker +from simpleflow.swf.process.proxy.command import start_proxy from simpleflow.swf.stats import pretty from simpleflow.swf.task import ActivityTask from simpleflow.swf.utils import get_workflow_execution, set_workflow_class_name @@ -784,6 +785,17 @@ def section(title): print(f"{key}={value}") +@click.option("--address", "-a", required=False, default="::1", help="Address to bind.") +@click.option("--port", "-p", required=False, type=int, default=4242, help="Port to bind.") +@cli.command("proxy.start", help="Start a proxy process to handle worker tasks.") +def proxy_start(address: str, port: int): + proxy = os.environ.get("SWF_PROXY") + if proxy: + address, sport = proxy.rsplit(":", 1) + port = int(sport) + start_proxy(address=address, port=port) + + @click.argument("locations", nargs=-1) @cli.command( "binaries.download", diff --git a/simpleflow/swf/process/proxy/__init__.py b/simpleflow/swf/process/proxy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/simpleflow/swf/process/proxy/command.py b/simpleflow/swf/process/proxy/command.py new file mode 100644 index 00000000..88980f8e --- /dev/null +++ b/simpleflow/swf/process/proxy/command.py @@ -0,0 +1,117 @@ +""" +Adapted (much simplified) from https://github.com/inaz2/proxy2. +""" + +from __future__ import annotations + +import itertools +import select +import socket +import ssl +import sys +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, HTTPServer + +import botocore.endpoint + +import simpleflow + + +class ProxyHTTPServer(HTTPServer): + address_family = socket.AF_INET6 + + def handle_error(self, request, client_address): + # suppress socket/ssl related errors + cls, e = sys.exc_info()[:2] + if cls is socket.error or cls is ssl.SSLError: + pass + else: + super().handle_error(request, client_address) + + +class ProxyRequestHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + server_version = "Simpleflow Proxy/" + simpleflow.__version__ + error_content_type = "application/json" + error_message_format = '{"code": %(code)d, "message": "%(message)s", "explain": "%(explain)s"}\n' + timeout = botocore.endpoint.DEFAULT_TIMEOUT + 1 + + def log_error(self, format, *args): + # suppress "Request timed out: timeout('timed out',)" + if isinstance(args[0], socket.timeout): + return + + self.log_message(format, *args) + + # https://en.wikipedia.org/wiki/List_of_Unicode_characters#Control_codes + _control_char_table = str.maketrans({c: rf"\x{c:02x}" for c in itertools.chain(range(0x20), range(0x7F, 0xA0))}) + _control_char_table[ord("\\")] = r"\\" + + def log_message(self, format, *args): + """Log an arbitrary message. + Copy-pasted from gh-100001 in case a vulnerable version of Python is used. + + This is used by all other logging functions. Override + it if you have specific logging wishes. + + The first argument, FORMAT, is a format string for the + message to be logged. If the format string contains + any % escapes requiring parameters, they should be + specified as subsequent arguments (it's just like + printf!). + + The client ip and current date/time are prefixed to + every message. + + Unicode control characters are replaced with escaped hex + before writing the output to stderr. + + """ + + message = format % args + sys.stderr.write( + f"{self.address_string()} - - [{self.log_date_time_string()}]" + f" {message.translate(self._control_char_table)}\n" + ) + + def do_CONNECT(self): + address = self.path.split(":", 1) + parsed_address = (address[0], int(address[1]) or 443) + try: + s = socket.create_connection(parsed_address, timeout=self.timeout) + except Exception: + self.send_error(HTTPStatus.BAD_GATEWAY) + return + self.send_response(HTTPStatus.OK, "Connection Established") + self.end_headers() + + self.proxy_connection(s) + + def proxy_connection(self, server_connection: socket.socket) -> None: + conns = (self.connection, server_connection) + self.close_connection = False + while not self.close_connection: + rlist, wlist, xlist = select.select(conns, [], conns, self.timeout) + if xlist or not rlist: + break + for r in rlist: + other = conns[1] if r is conns[0] else conns[0] + data = r.recv(8192) + if not data: + self.close_connection = True + break + other.sendall(data) + + def do_GET(self) -> None: + if self.path == "/status": + self.send_response(HTTPStatus.OK) + else: + self.send_error(HTTPStatus.NOT_FOUND) + + +def start_proxy(address: str = "::1", port: int = 4242) -> None: + server_address = (address, port) + httpd = ProxyHTTPServer(server_address, ProxyRequestHandler) + sa = httpd.socket.getsockname() + print(f"Serving HTTP Proxy on {sa[0]}:{sa[1]}") + httpd.serve_forever() From 75d2bc142ea559a11c8e4501d6c3b63a40676c86 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Sun, 30 Jun 2024 18:04:48 +0200 Subject: [PATCH 07/12] style: cleanups And remove no-ops --log-level Signed-off-by: Yves Bastide --- simpleflow/command.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/simpleflow/command.py b/simpleflow/command.py index f618836c..a6b05567 100644 --- a/simpleflow/command.py +++ b/simpleflow/command.py @@ -43,7 +43,7 @@ def comma_separated_list(value): @click.group() @click.option("--format") -@click.option("--header/--no-header", default=False) +@click.option("--header/--no-header", default=True) @click.option( "--color", type=click.Choice([log.ColorModes.AUTO, log.ColorModes.ALWAYS, log.ColorModes.NEVER]), @@ -395,14 +395,11 @@ def task_info(ctx, domain, workflow_id, task_id, details): @click.option("--nb-processes", "-N", type=int) -@click.option("--log-level", "-l") @click.option("--task-list", "-t") @click.option("--domain", "-d", envvar="SWF_DOMAIN", required=True, help="SWF Domain") @click.argument("workflows", nargs=-1, required=False) @cli.command("decider.start", help="Start a decider process to manage workflow executions.") -def start_decider(workflows, domain, task_list, log_level, nb_processes): - if log_level: - logger.warning("Deprecated: --log-level will be removed, use LOG_LEVEL environment variable instead") +def start_decider(workflows, domain, task_list, nb_processes): decider.command.start( workflows, domain, @@ -427,14 +424,12 @@ def start_decider(workflows, domain, task_list, log_level, nb_processes): help="Heartbeat interval in seconds (0 to disable heartbeating).", ) @click.option("--nb-processes", "-N", type=int) -@click.option("--log-level", "-l") @click.option("--task-list", "-t") @click.option("--domain", "-d", envvar="SWF_DOMAIN", required=True, help="SWF Domain") @cli.command("worker.start", help="Start a worker process to handle activity tasks.") def start_worker( domain, task_list, - log_level, nb_processes, heartbeat, one_task, @@ -442,9 +437,6 @@ def start_worker( middleware_pre_execution, middleware_post_execution, ): - if log_level: - logger.warning("Deprecated: --log-level will be removed, use LOG_LEVEL environment variable instead") - if not task_list and not poll_data: raise ValueError("Please provide a --task-list or some data via --poll-data") From edfca5affa502b7d3b570f2a4ac97020b0463a43 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Sun, 30 Jun 2024 18:07:12 +0200 Subject: [PATCH 08/12] feat: worker.start: add --proxy, -x Signed-off-by: Yves Bastide --- simpleflow/boto3_utils.py | 29 +++++++++++++++++++++++- simpleflow/command.py | 5 ++++ simpleflow/swf/mapper/actors/core.py | 4 ++-- simpleflow/swf/mapper/actors/worker.py | 4 ++-- simpleflow/swf/mapper/core.py | 4 ++++ simpleflow/swf/process/poller.py | 4 ++-- simpleflow/swf/process/worker/base.py | 25 +++++++++++++++----- simpleflow/swf/process/worker/command.py | 4 ++++ 8 files changed, 66 insertions(+), 13 deletions(-) diff --git a/simpleflow/boto3_utils.py b/simpleflow/boto3_utils.py index 658492ca..5c03f4c7 100644 --- a/simpleflow/boto3_utils.py +++ b/simpleflow/boto3_utils.py @@ -5,18 +5,45 @@ from typing import Any import boto3 +from botocore import config from simpleflow.utils import json_dumps _client_var: ContextVar[dict] = ContextVar("boto3_clients") +def clean_config(cfg: config.Config) -> dict[str, Any]: + rc = {} + for k in vars(cfg): + if k.startswith("_"): + continue + v = getattr(cfg, k) + if callable(v) or v is None: + continue + rc[k] = v + return rc + + +def clean_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]: + """ + We don't know how (or want) to serialize botocore.config.Config instances; + they currently only contain POD. + """ + rc = {} + for k, v in kwargs.items(): + if isinstance(v, config.Config): + v = clean_config(v) + rc[k] = v + return rc + + def get_or_create_boto3_client(*, region_name: str | None, service_name: str, **kwargs: Any): d = { "region_name": region_name, "service_name": service_name, } - d.update(kwargs) + cleaned_kwargs = clean_kwargs(kwargs) + d.update(cleaned_kwargs) key = hashlib.sha1(json_dumps(d).encode()).hexdigest() boto3_clients = _client_var.get({}) if not boto3_clients: diff --git a/simpleflow/command.py b/simpleflow/command.py index a6b05567..ed9c3357 100644 --- a/simpleflow/command.py +++ b/simpleflow/command.py @@ -409,6 +409,7 @@ def start_decider(workflows, domain, task_list, nb_processes): ) +@click.option("--proxy", "-x", envvar="SWF_PROXY", required=False, help="Proxy URL.") @click.option("--middleware-pre-execution", required=False, multiple=True) @click.option("--middleware-post-execution", required=False, multiple=True) @click.option( @@ -436,9 +437,12 @@ def start_worker( poll_data, middleware_pre_execution, middleware_post_execution, + proxy, ): if not task_list and not poll_data: raise ValueError("Please provide a --task-list or some data via --poll-data") + if poll_data and proxy: + raise ValueError("Please provide either --poll-data or --proxy, not both") middlewares = { "pre": middleware_pre_execution, @@ -453,6 +457,7 @@ def start_worker( heartbeat=heartbeat, one_task=one_task, poll_data=poll_data, + proxy=proxy, ) diff --git a/simpleflow/swf/mapper/actors/core.py b/simpleflow/swf/mapper/actors/core.py index fc2b1100..9826fed1 100644 --- a/simpleflow/swf/mapper/actors/core.py +++ b/simpleflow/swf/mapper/actors/core.py @@ -18,8 +18,8 @@ class Actor(ConnectedSWFObject): :ivar task_list: task list the Actor should watch for tasks on """ - def __init__(self, domain: Domain, task_list: str) -> None: - super().__init__() + def __init__(self, domain: Domain, task_list: str, **kwargs) -> None: + super().__init__(**kwargs) self._set_domain(domain) self.task_list = task_list diff --git a/simpleflow/swf/mapper/actors/worker.py b/simpleflow/swf/mapper/actors/worker.py index bbae3980..e312f8a3 100644 --- a/simpleflow/swf/mapper/actors/worker.py +++ b/simpleflow/swf/mapper/actors/worker.py @@ -41,8 +41,8 @@ class ActivityWorker(Actor): The form of this identity is user defined. """ - def __init__(self, domain: Domain, task_list: str, identity: str | None = None): - super().__init__(domain, task_list) + def __init__(self, domain: Domain, task_list: str, identity: str | None = None, **kwargs): + super().__init__(domain, task_list, **kwargs) self._identity = identity diff --git a/simpleflow/swf/mapper/core.py b/simpleflow/swf/mapper/core.py index 474ba8bf..0c65a8e3 100644 --- a/simpleflow/swf/mapper/core.py +++ b/simpleflow/swf/mapper/core.py @@ -9,6 +9,7 @@ from typing import Any import boto3 +from botocore import config from botocore.exceptions import NoCredentialsError # NB: import logger directly from simpleflow so we benefit from the logging @@ -41,6 +42,7 @@ def __init__( self, *, region: str | None = None, + proxy: str | None = None, ): self.region = SETTINGS.get("region") or region or DEFAULT_AWS_REGION # Use settings-provided keys if available, otherwise pass empty @@ -49,6 +51,8 @@ def __init__( cred_keys = ["aws_access_key_id", "aws_secret_access_key"] creds_: dict[str, Any] = {k: SETTINGS[k] for k in cred_keys if SETTINGS.get(k, None)} + if proxy: + creds_["config"] = config.Config(proxies={"https": proxy}) # raises EndpointConnectionError if region is wrong self.boto3_client = get_or_create_boto3_client(region_name=self.region, service_name="swf", **creds_) diff --git a/simpleflow/swf/process/poller.py b/simpleflow/swf/process/poller.py index 42cb87a1..f5582c06 100644 --- a/simpleflow/swf/process/poller.py +++ b/simpleflow/swf/process/poller.py @@ -21,11 +21,11 @@ class Poller(simpleflow.swf.mapper.actors.Actor, NamedMixin): """Multi-processing implementation of a SWF actor.""" - def __init__(self, domain: Domain, task_list: str | None = None) -> None: + def __init__(self, domain: Domain, task_list: str | None = None, **kwargs) -> None: self.is_alive = False self._named_mixin_properties = ["task_list"] - super().__init__(domain, task_list) + super().__init__(domain, task_list, **kwargs) def __repr__(self): return f"{self.__class__.__name__}(domain={self.domain.name}, task_list={self.task_list})" diff --git a/simpleflow/swf/process/worker/base.py b/simpleflow/swf/process/worker/base.py index 376ddae0..ace8ee54 100644 --- a/simpleflow/swf/process/worker/base.py +++ b/simpleflow/swf/process/worker/base.py @@ -50,10 +50,10 @@ def __init__( middlewares: dict[str, list[str]] | None = None, heartbeat: int = 60, poll_data: str | None = None, + proxy: str | None = None, ) -> None: """ :param middlewares: Paths to middleware functions to execute before and after any Activity - :param process_mode: Whether to process locally (default) """ self.nb_retries = 3 # heartbeat=0 is a special value to disable heartbeating. We want to @@ -63,7 +63,7 @@ def __init__( self.middlewares = middlewares self.poll_data = poll_data - super().__init__(domain, task_list) + super().__init__(domain, task_list, proxy=proxy) @property def name(self): @@ -96,7 +96,7 @@ def process(self, response: Response) -> None: """ Process a simpleflow.swf.mapper.actors.ActivityWorker poll response. """ - token = response.task_token + token: str = response.task_token task = response.activity_task spawn(self, token, task, self.middlewares, self._heartbeat) @@ -107,7 +107,11 @@ def complete(self, token: str, result: str | None = None) -> None: # noinspection PyMethodOverriding @with_state("failing") def fail( - self, token: str, task: ActivityTask, reason: str | None = None, details: str | None = None + self, + token: str, + task: ActivityTask, + reason: str | None = None, + details: str | None = None, ) -> dict[str, Any] | None: """ Fail the activity, log and ignore exceptions. @@ -132,7 +136,11 @@ def dispatch(self, task: ActivityTask) -> Activity: return self._dispatcher.dispatch_activity(name) def process( - self, poller: ActivityPoller, token: str, task: ActivityTask, middlewares: dict[str, list[str]] | None = None + self, + poller: ActivityPoller, + token: str, + task: ActivityTask, + middlewares: dict[str, list[str]] | None = None, ) -> Any: logger.debug("ActivityWorker.process()") try: @@ -180,7 +188,12 @@ def process( poller.fail_with_retry(token, task, reason) -def process_task(poller, token: str, task: ActivityTask, middlewares: dict[str, list[str]] | None = None) -> None: +def process_task( + poller: ActivityPoller, + token: str, + task: ActivityTask, + middlewares: dict[str, list[str]] | None = None, +) -> None: logger.debug("process_task()") format.JUMBO_FIELDS_MEMORY_CACHE.clear() worker = ActivityWorker() diff --git a/simpleflow/swf/process/worker/command.py b/simpleflow/swf/process/worker/command.py index f4b25771..b7430852 100644 --- a/simpleflow/swf/process/worker/command.py +++ b/simpleflow/swf/process/worker/command.py @@ -11,6 +11,7 @@ def make_worker_poller( middlewares: dict[str, list[str]] | None, heartbeat: int, poll_data: str, + proxy: str | None = None, ) -> ActivityPoller: """ Make a worker poller for the domain and task list. @@ -22,6 +23,7 @@ def make_worker_poller( middlewares=middlewares, heartbeat=heartbeat, poll_data=poll_data, + proxy=proxy, ) @@ -33,6 +35,7 @@ def start( heartbeat: int = 60, one_task: bool = False, poll_data: str | None = None, + proxy: str | None = None, ): """ Start a worker for the given domain and task_list. @@ -48,6 +51,7 @@ def start( middlewares=middlewares, heartbeat=heartbeat, poll_data=poll_data, + proxy=proxy, ) if poll_data: From d37f282fd798c5d090b7e516c01773b4d431520f Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Sun, 30 Jun 2024 18:09:03 +0200 Subject: [PATCH 09/12] misc. Signed-off-by: Yves Bastide --- simpleflow/swf/process/worker/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/simpleflow/swf/process/worker/base.py b/simpleflow/swf/process/worker/base.py index ace8ee54..e67662bf 100644 --- a/simpleflow/swf/process/worker/base.py +++ b/simpleflow/swf/process/worker/base.py @@ -150,6 +150,7 @@ def process( kwargs = input.get("kwargs", {}) context = sanitize_activity_context(task.context) context["domain_name"] = poller.domain.name + context["task_list"] = poller.task_list if input.get("meta", {}).get("binaries"): download_binaries(input["meta"]["binaries"]) result = ActivityTask( @@ -284,7 +285,7 @@ def worker_alive(): except simpleflow.swf.mapper.exceptions.RateLimitExceededError as error: # ignore rate limit errors: high chances the next heartbeat will be # ok anyway, so it would be stupid to break the task for that - logger.warning( + logger.info( f'got a "ThrottlingException / Rate exceeded" when heartbeating for task {task.activity_type.name}:' f" {error}" ) From 8d23671011e197d5036a21349b7e24a65beb931f Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Sun, 30 Jun 2024 23:24:43 +0200 Subject: [PATCH 10/12] docs: add/update Signed-off-by: Yves Bastide --- docs/mkdocs.yml | 1 + docs/src/features/canvas.md | 14 ++++++++---- docs/src/features/command_line.md | 11 +++++---- docs/src/features/continue_as_new.md | 5 ++-- docs/src/features/program_tasks.md | 2 +- docs/src/features/proxy.md | 34 ++++++++++++++++++++++++++++ docs/src/features/swf_layer.md | 8 +++---- docs/src/features/task_lists.md | 5 ++-- 8 files changed, 61 insertions(+), 19 deletions(-) create mode 100644 docs/src/features/proxy.md diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 5ed5fdaf..fd7c548e 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -76,6 +76,7 @@ nav: - Error Handling: features/error_handling.md - Continue As New: features/continue_as_new.md - Middleware: features/middleware.md + - Proxy: features/proxy.md - Development: development.md - Contributing: contributing.md - License: license.md diff --git a/docs/src/features/canvas.md b/docs/src/features/canvas.md index 6865d91a..baf5c177 100644 --- a/docs/src/features/canvas.md +++ b/docs/src/features/canvas.md @@ -139,12 +139,12 @@ from simpleflow.canvas import Chain, FuncGroup, Group @activity.with_attributes(task_list="quickstart", version="example") def partition_data(data_location): # Partition a list of things to do into parallelizable sub-parts - pass + ... @activity.with_attributes(task_list="quickstart", version="example") def execute_on_sub_part(sub_part): - pass + ... class AWorkflow(Workflow): @@ -169,5 +169,11 @@ return an empty Group. Since this has been a long-standing policy, a new `_allow_none` argument relaxes this constraint. !!! warning - This is a new experimental option: a better one might be to enforce - that nothing is returned. + This is a new experimental option: a better one may be to allow + a `None` return value. + +### Overriding future classes + +Both the `Group` and `Canvas` instances delegate their work to a +`GroupFuture` (resp. `ChainFuture`) instance by default. +Passing a `future_class` argument allows overriding this. diff --git a/docs/src/features/command_line.md b/docs/src/features/command_line.md index 9d06f468..bb668cb1 100644 --- a/docs/src/features/command_line.md +++ b/docs/src/features/command_line.md @@ -9,13 +9,14 @@ List Workflow Executions ------------------------ $ simpleflow workflow.list TestDomain + Workflow ID Workflow Type Status basic-example-1438722273 basic OPEN Workflow Execution Status ------------------------- - $ simpleflow --header workflow.info TestDomain basic-example-1438722273 + $ simpleflow workflow.info TestDomain basic-example-1438722273 domain workflow_type.name workflow_type.version task_list workflow_id run_id tag_list execution_time input TestDomain basic example basic-example-1438722273 22QFVi362TnCh6BdoFgkQFlocunh24zEOemo1L12Yl5Go= 1.70 {'args': [1], 'kwargs': {}} @@ -25,11 +26,11 @@ Tasks Status You can check the status of the workflow execution with: - $ simpleflow --header workflow.tasks DOMAIN WORKFLOW_ID [RUN_ID] --nb-tasks 3 - $ simpleflow --header workflow.tasks TestDomain basic-example-1438722273 + $ simpleflow workflow.tasks DOMAIN WORKFLOW_ID [RUN_ID] --nb-tasks 3 + $ simpleflow workflow.tasks TestDomain basic-example-1438722273 Tasks Last State Last State Time Scheduled Time examples.basic.increment scheduled 2015-08-04 23:04:34.510000 2015-08-04 23:04:34.510000 - $ simpleflow --header workflow.tasks TestDomain basic-example-1438722273 + $ simpleflow workflow.tasks TestDomain basic-example-1438722273 Tasks Last State Last State Time Scheduled Time examples.basic.double completed 2015-08-04 23:06:19.200000 2015-08-04 23:06:17.738000 examples.basic.delay completed 2015-08-04 23:08:18.402000 2015-08-04 23:06:17.738000 @@ -41,7 +42,7 @@ Profiling You can profile the execution of the workflow with: - $ simpleflow --header workflow.profile TestDomain basic-example-1438722273 + $ simpleflow workflow.profile TestDomain basic-example-1438722273 Task Last State Scheduled Time Scheduled Start Time Running End Percentage of total time activity-examples.basic.double-1 completed 2015-08-04 23:06 0.07 2015-08-04 23:06 1.39 2015-08-04 23:06 1.15 activity-examples.basic.increment-1 completed 2015-08-04 23:04 102.20 2015-08-04 23:06 0.79 2015-08-04 23:06 0.65 diff --git a/docs/src/features/continue_as_new.md b/docs/src/features/continue_as_new.md index b1ac8c34..d51f4732 100644 --- a/docs/src/features/continue_as_new.md +++ b/docs/src/features/continue_as_new.md @@ -1,7 +1,8 @@ # Continue As New -In long-running workflow executions, the history can hit the 25,000-events hard SWF limit. This causes execution -termination. To prevent this, the workflow can itself close the current execution and start another one by submitting +In long-running workflow executions, the history can hit the 25,000-events hard SWF limit. +This causes execution termination. +To prevent this, the workflow can itself close the current execution and start another one by submitting `self.continue_as_new(*args, **kwargs)`: it is then restarted with a new run ID and an empty history. See `examples/continue_as_new.py` for a demonstration of this pattern: diff --git a/docs/src/features/program_tasks.md b/docs/src/features/program_tasks.md index 36712216..83d0d2e1 100644 --- a/docs/src/features/program_tasks.md +++ b/docs/src/features/program_tasks.md @@ -1,7 +1,7 @@ Execution of Tasks as Programs ============================== -The `simpleflow.execute` module allows to define functions that will be +The `simpleflow.execute` module allows defining functions that will be executed as a program. There are two modes: diff --git a/docs/src/features/proxy.md b/docs/src/features/proxy.md new file mode 100644 index 00000000..36ab92c3 --- /dev/null +++ b/docs/src/features/proxy.md @@ -0,0 +1,34 @@ +# Worker Proxy Support + +When deploying a fleet of workers on an instance, their number +may cause the maximum number of connections to SWF to be reached +and let some of these workers unable to fetch activities. + +A proxy can be used to prevent this. Simpleflow can now use one, +and provides a simplistic single-threaded proxy. + +## Starting the proxy + +The `simpleflow proxy.start` command starts an HTTP proxy server. +It processes the CONNECT proxy method; the rest of the connection +is encrypted as before (it doesn't handle HTTPS to MITM the +communication). + +## Using the proxy + +The `simpleflow worker.start` command accepts a new `-x, --proxy` argument. + +## Example + +```shell +# Running the `pirate` example with multiple processes. +# Starting the decider, then the workflow, are left as exercises. +[screen 1] $ simpleflow proxy.start +Serving HTTP Proxy on ::1:4242 +[screen 2] $ SWF_PROXY=localhost:4242 PYTHONPATH=$PWD SWF_DOMAIN=TestDomain \ + simpleflow worker.start -N 1 -t pirate +``` + +## Environment setting + +Both commands honor a new `SWF_PROXY` environment variable. diff --git a/docs/src/features/swf_layer.md b/docs/src/features/swf_layer.md index a0ac4c5c..83dd6c94 100644 --- a/docs/src/features/swf_layer.md +++ b/docs/src/features/swf_layer.md @@ -22,7 +22,7 @@ Settings -------- !!! bug - The informations in this "Settings" section may be outdated, they need some love. + The information in this "Settings" section may be outdated, it needs some love. Optional: @@ -84,7 +84,7 @@ local and remote objects. retention_period=60 ) -# a Domain model local instance has been created, but nothing has been +# A Domain model local instance has been created, but nothing has been # sent to amazon. To do so, you have to save it. >>> D.save() ``` @@ -133,8 +133,8 @@ ModelDiff() ### QuerySets -Models can be retrieved and instantiated via querysets. To continue over the django comparison, -they’re behaving like django managers. +Models can be retrieved and instantiated via querysets. To continue over the Django comparison, +they’re behaving like Django managers. ```python # As querying for models needs a valid connection to amazon service, diff --git a/docs/src/features/task_lists.md b/docs/src/features/task_lists.md index dbdc74df..37548757 100644 --- a/docs/src/features/task_lists.md +++ b/docs/src/features/task_lists.md @@ -1,7 +1,6 @@ # Task Lists -Task lists are often used to route different tasks to specific groups -of workers. +Task lists are used to route different tasks to specific groups of workers. The decider and activity task lists are distinct, even if they have the same name. For SWF activities, the task list is typically specified with `@activity.with_attributes`: @@ -56,7 +55,7 @@ class MyWorkflow(Workflow): ... @classmethod - def get_task_list(cls, task_list, *args, **kwargs): + def get_task_list(cls, *args, task_list, **kwargs): return task_list def run(self, x, task_list, *args, **kwargs): From 57ef6bddda8c3ba31cc140c3d65e72bd75683395 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Mon, 8 Jul 2024 18:42:42 +0200 Subject: [PATCH 11/12] typing Signed-off-by: Yves Bastide --- simpleflow/swf/mapper/querysets/workflow.py | 29 +++++++++------------ 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/simpleflow/swf/mapper/querysets/workflow.py b/simpleflow/swf/mapper/querysets/workflow.py index e91de752..dbcb602c 100644 --- a/simpleflow/swf/mapper/querysets/workflow.py +++ b/simpleflow/swf/mapper/querysets/workflow.py @@ -401,7 +401,7 @@ class WorkflowExecutionQuerySet(BaseWorkflowQuerySet): _infos = "executionInfo" _infos_plural = "executionInfos" - def _is_valid_status_param(self, status, param): + def _is_valid_status_param(self, status: str, param: str) -> bool: statuses = { WorkflowExecution.STATUS_OPEN: {"oldest_date", "latest_date"}, WorkflowExecution.STATUS_CLOSED: { @@ -414,10 +414,10 @@ def _is_valid_status_param(self, status, param): } return param in statuses.get(status, set()) - def _validate_status_parameters(self, status, params): + def _validate_status_parameters(self, status: str, params: list[str]) -> list[str]: return [param for param in params if not self._is_valid_status_param(status, param)] - def list_workflow_executions(self, status, *args, **kwargs): + def list_workflow_executions(self, status: str, *args, **kwargs): statuses = { WorkflowExecution.STATUS_OPEN: "open", WorkflowExecution.STATUS_CLOSED: "closed", @@ -436,7 +436,7 @@ def list_workflow_executions(self, status, *args, **kwargs): method = f"list_{statuses[status]}_workflow_executions" return getattr(self, method)(*args, **kwargs) - def get_workflow_type(self, execution_info): + def get_workflow_type(self, execution_info: dict[str, Any]) -> WorkflowType: workflow_type = execution_info["workflowType"] workflow_type_qs = WorkflowTypeQuerySet(self.domain) @@ -467,7 +467,7 @@ def to_WorkflowExecution(self, domain: Domain, execution_info: dict[str, Any], * **kwargs, ) - def get(self, workflow_id, run_id, *args, **kwargs): + def get(self, workflow_id: str, run_id: str, *args, **kwargs): """ """ try: response = self.describe_workflow_execution(self.domain.name, run_id, workflow_id) @@ -496,11 +496,11 @@ def get(self, workflow_id, run_id, *args, **kwargs): def filter( self, - status=WorkflowExecution.STATUS_OPEN, - tag=None, - workflow_id=None, - workflow_type_name=None, - workflow_type_version=None, + status: str = WorkflowExecution.STATUS_OPEN, + tag: str | None = None, + workflow_id: str | None = None, + workflow_type_name: str | None = None, + workflow_type_version: str | None = None, *args, **kwargs, ): @@ -510,21 +510,16 @@ def filter( Valid values are: * ``simpleflow.swf.mapper.models.WorkflowExecution.STATUS_OPEN`` * ``simpleflow.swf.mapper.models.WorkflowExecution.STATUS_CLOSED`` - :type status: string :param tag: workflow executions containing the tag will be kept - :type tag: String :param workflow_id: workflow executions attached to the id will be kept - :type workflow_id: String :param workflow_type_name: workflow executions attached to the workflow type with provided name will be kept - :type workflow_type_name: String :param workflow_type_version: workflow executions attached to the workflow type of the provided version will be kept - :type workflow_type_version: String **Be aware that** querying over status allows the usage of statuses specific kwargs @@ -618,8 +613,8 @@ def _list(self, *args, **kwargs): def all( self, - status=WorkflowExecution.STATUS_OPEN, - start_oldest_date=MAX_WORKFLOW_AGE, + status: str = WorkflowExecution.STATUS_OPEN, + start_oldest_date: int = MAX_WORKFLOW_AGE, *args, **kwargs, ): From 157ade4399ec4b34da01cc83c7960edef8d40699 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Mon, 8 Jul 2024 18:43:18 +0200 Subject: [PATCH 12/12] =?UTF-8?q?fix:=20tagFilter.name=20=E2=86=92=20.tag?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Yves Bastide --- simpleflow/swf/mapper/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/simpleflow/swf/mapper/core.py b/simpleflow/swf/mapper/core.py index 0c65a8e3..1af8ac95 100644 --- a/simpleflow/swf/mapper/core.py +++ b/simpleflow/swf/mapper/core.py @@ -89,7 +89,7 @@ def list_open_workflow_executions( } if tag: kwargs["tagFilter"] = { - "name": tag, + "tag": tag, } if workflow_id: kwargs["executionFilter"] = { @@ -144,7 +144,7 @@ def list_closed_workflow_executions( } if tag: kwargs["tagFilter"] = { - "name": tag, + "tag": tag, } if workflow_id: kwargs["executionFilter"] = {