diff --git a/CHANGELOG.md b/CHANGELOG.md index cfb0b355e..c58b84100 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Fixed - [Localhost] Fix shutil.Error caused by existing __pycache__ directory when copying files in the runner +- [Executor] Make retry count configurable in RetryingFunctionExecutor ## [v3.6.1] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8baf2f6f4..bfd81394a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -16,12 +16,12 @@ To contribute a patch: 1. Break your work into small, single-purpose patches if possible. It's much harder to merge in a large change with a lot of disjoint features. 2. Submit the patch as a GitHub pull request against the master branch. -3. Make sure that your code passes the functional tests. See the [Functional testing](#functional-testing) section below. +3. Make sure that your code passes the tests. 4. Make sure that your code passes the linter. Install `flake8` with `pip3 install flake8` and run the next command until you don't see any linitng error: ```bash flake8 lithops --count --max-line-length=180 --statistics --ignore W605,W503 ``` -6. Add new unit tests for your code. +5. Add new tests for your code. Testing diff --git a/config/README.md b/config/README.md index bea697740..dbf4fd1d7 100644 --- a/config/README.md +++ b/config/README.md @@ -144,18 +144,19 @@ if __name__ == '__main__': ## Summary of configuration keys for Lithops -|Group|Key|Default|Mandatory|Additional info| -|---|---|---|---|---| -|lithops | backend | aws_lambda | no | Compute backend implementation. `localhost` is the default if no config or config file is provided| -|lithops | storage | aws_s3 | no | Storage backend implementation. `localhost` is the default if no config or config file is provided| -|lithops | data_cleaner | True | no |If set to True, then the cleaner will automatically delete all the temporary data that was written into `storage_bucket/lithops.jobs`| -|lithops | monitoring | storage | no | Monitoring system implementation. One of: **storage** or **rabbitmq** | -|lithops | monitoring_interval | 2 | no | Monitoring check interval in seconds in case of **storage** monitoring | -|lithops | data_limit | 4 | no | Max (iter)data size (in MB). Set to False for unlimited size | -|lithops | execution_timeout | 1800 | no | Functions will be automatically killed if they exceed this execution time (in seconds). Alternatively, it can be set in the `call_async()`, `map()` or `map_reduce()` calls using the `timeout` parameter.| -|lithops | include_modules | [] | no | Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None | -|lithops | exclude_modules | [] | no | Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules | -|lithops | log_level | INFO |no | Logging level. One of: WARNING, INFO, DEBUG, ERROR, CRITICAL, Set to None to disable logging | -|lithops | log_format | "%(asctime)s [%(levelname)s] %(name)s -- %(message)s" |no | Logging format string | -|lithops | log_stream | ext://sys.stderr |no | Logging stream. eg.: ext://sys.stderr, ext://sys.stdout| -|lithops | log_filename | |no | Path to a file. log_filename has preference over log_stream. | +| Group | Key | Default | Mandatory | Additional info | +|---------|---------------------|--------------|-----------|--------------------------------------------------------------------------------------------------| +| lithops | backend | aws_lambda | no | Compute backend implementation. `localhost` is the default if no config or config file is provided. | +| lithops | storage | aws_s3 | no | Storage backend implementation. `localhost` is the default if no config or config file is provided. | +| lithops | data_cleaner | True | no | If True, automatically deletes temporary data written to `storage_bucket/lithops.jobs`. | +| lithops | monitoring | storage | no | Monitoring system implementation. Options: **storage** or **rabbitmq**. | +| lithops | monitoring_interval | 2 | no | Interval in seconds for monitoring checks when using **storage** monitoring. | +| lithops | data_limit | 4 | no | Maximum size (in MB) for iterator data chunks. Set to False for unlimited size. | +| lithops | execution_timeout | 1800 | no | Maximum execution time in seconds for functions. Functions exceeding this time are terminated. Can also be set per call via the `timeout` parameter. | +| lithops | include_modules | [] | no | List of dependencies to explicitly include for pickling. If empty, all required dependencies are included. If set to None, no dependencies are included. | +| lithops | exclude_modules | [] | no | List of dependencies to exclude from pickling. Ignored if `include_modules` is set. | +| lithops | log_level | INFO | no | Logging level. Options: WARNING, INFO, DEBUG, ERROR, CRITICAL. Set to None to disable logging. | +| lithops | log_format | "%(asctime)s [%(levelname)s] %(name)s -- %(message)s" | no | Format string for log messages. | +| lithops | log_stream | ext://sys.stderr | no | Logging output stream, e.g., ext://sys.stderr or ext://sys.stdout. | +| lithops | log_filename | (empty) | no | File path for logging output. Overrides `log_stream` if set. | +| lithops | retries | 0 | no | Number of retries for failed function invocations when using the `RetryingFunctionExecutor`. Default is 0. Can be overridden per API call. | diff --git a/docs/index.rst b/docs/index.rst index b427d607e..762d48d15 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -73,7 +73,7 @@ To start using Lithops: pip install lithops -2. Configure your cloud credentials (see the full guide in :doc:`/config`) +2. Configure your cloud credentials (see the :doc:`Configuration Guide `) 3. Write and run your first parallel job: diff --git a/docs/source/api_futures.rst b/docs/source/api_futures.rst index e374a2651..b42999795 100644 --- a/docs/source/api_futures.rst +++ b/docs/source/api_futures.rst @@ -7,7 +7,8 @@ The core abstraction in Lithops is the **executor**, responsible for orchestrati To get started, you typically import `lithops` and create an executor instance to run your code. Lithops provides a flexible set of executors to suit different needs. -### Primary Executors +Primary Executors +----------------- * **FunctionExecutor** (`lithops.FunctionExecutor()`): The main, generic executor that automatically selects its execution mode based on the provided configuration. @@ -17,7 +18,8 @@ To get started, you typically import `lithops` and create an executor instance t A robust wrapper around `FunctionExecutor` that transparently handles retries on failed tasks. It supports all features of `FunctionExecutor` with added automatic retry logic, improving fault tolerance and reliability for unstable or transient failure-prone environments. -### Secondary Executors +Secondary Executors +------------------- For more specialized use cases, Lithops also provides explicit executors for each execution mode: @@ -30,14 +32,12 @@ For more specialized use cases, Lithops also provides explicit executors for eac * **StandaloneExecutor** (`lithops.StandaloneExecutor()`): Runs jobs on standalone compute backends such as clusters or virtual machines, suitable for long-running or resource-heavy tasks. ---- -### Configuration and Initialization +Configuration and Initialization +================================ By default, executors load configuration from the Lithops configuration file (e.g., `lithops_config.yaml`). You can also supply configuration parameters programmatically via a Python dictionary when creating an executor instance. Parameters passed explicitly override those in the config file, allowing for flexible customization on the fly. ---- - This layered executor design lets Lithops provide a powerful, unified API for parallel function execution — from local development to multi-cloud production deployments with fault tolerance and retries built-in. diff --git a/docs/source/comparing_lithops.rst b/docs/source/comparing_lithops.rst index 64bd843b7..07565ce7b 100644 --- a/docs/source/comparing_lithops.rst +++ b/docs/source/comparing_lithops.rst @@ -1,77 +1,61 @@ -Comparing Lithops with other distributed computing frameworks +Comparing Lithops with Other Distributed Computing Frameworks ============================================================= -In a nutshell, Lithops differs from other distributed computing frameworks in that Lithops leverages serverless -functions to compute massively parallel computations. +Lithops introduces a novel approach to distributed computing by leveraging **serverless functions** for massively parallel computations. Unlike traditional frameworks that require managing a cluster of nodes, Lithops utilizes Function-as-a-Service (FaaS) platforms to dynamically scale execution resources — down to zero when idle and massively up when needed. -In addition, Lithops provides a simple and easy-to-use interface to access and process data stored in Object Storage -from your serverless functions. - -Moreover, Lithops abstract design allows seamlessly portability between clouds and FaaS services, avoiding vendor -lock-in. +In addition, Lithops offers a simple and consistent programming interface to transparently process data stored in **Object Storage** from within serverless functions. Its **modular and cloud-agnostic architecture** enables seamless portability across different cloud providers and FaaS platforms, effectively avoiding vendor lock-in. PyWren ------ -.. image:: https://www.faasification.com/assets/img/tools/pywren-logo-big.png - :align: center - :width: 250 +`PyWren `_ is the precursor to Lithops. Initially designed to run exclusively on AWS Lambda using a Conda runtime and supporting only Python 2.7, it served as a proof of concept for using serverless functions in scientific computing. +In 2018, the Lithops team forked PyWren to adapt it for **IBM Cloud Functions**, which offered a Docker-based runtime. This evolution also introduced support for **Object Storage as a primary data source** and opened the door to more advanced use cases such as Big Data analytics. -`PyWren `_ is Lithops' "father" project. PyWren was only designed to run in AWS Lambda with a -Conda environment and only supported Python 2.7. In 2018, Lithops' creators forked PyWren and adapted it to IBM Cloud -Functions, which, in contrast, uses a Docker runtime. The authors also explored new usages for PyWren, like processing Big Data from -Object Storage. Then, on September 2020, IBM PyWren authors decided that the project had evolved enough to no longer be -considered a simple fork of PyWren for IBM cloud and became Lithops. With this change, the project would no longer be -tied to the old PyWren model and could move to more modern features such as mulit-cloud support or the transparent -multiprocessing interface. +By September 2020, the IBM PyWren fork had diverged significantly. The maintainers rebranded the project as **Lithops**, reflecting its broader goals — including multi-cloud compatibility, improved developer experience, and support for modern Python environments and distributed computing patterns. -You can read more about PyWren IBM Cloud at the Middleware'18 industry paper `Serverless Data Analytics in the IBM Cloud `_. +For more details, refer to the Middleware'18 industry paper: +`Serverless Data Analytics in the IBM Cloud `_. Ray and Dask ------------ -.. image:: https://warehouse-camo.ingress.cmh1.psfhosted.org/98ae79911b7a91517ba16ef2dc7dc3b972214820/68747470733a2f2f6769746875622e636f6d2f7261792d70726f6a6563742f7261792f7261772f6d61737465722f646f632f736f757263652f696d616765732f7261795f6865616465725f6c6f676f2e706e67 - :align: center +.. image:: https://github.com/ray-project/ray/raw/master/doc/source/images/ray_logo.png :width: 250 - .. image:: https://docs.dask.org/en/stable/_images/dask_horizontal.svg - :align: center :width: 250 -In comparison with Lithops, both `Ray `_ and `Dask `_ leverage a cluster of nodes for distributed computing, while Lithops -mainly leverages serverless functions. This restraint makes Ray much less flexible than Lithops in terms of scalability. +`Ray `_ and `Dask `_ are distributed computing frameworks designed to operate on a **predefined cluster of nodes** (typically virtual machines). In contrast, Lithops relies on **serverless runtimes**, which allows for *elastic and fine-grained scaling* — including scaling to zero — with no idle infrastructure costs. -Although Dask and Ray can scale and adapt the resources to the amount of computation needed, they don't scale to zero since -they must keep a "head node" or "master" that controls the cluster and must be kept up. +While Ray and Dask provide dynamic task scheduling and can autoscale within an IaaS environment, they always require a **centralized "head node" or controller** to manage the cluster, making them less suitable for ephemeral and cost-efficient cloud-native computing. -In any case, the capacity and scalability of Ray or Dask in IaaS using virtual machines is not comparable to that of serverless functions. +Additionally, the performance and elasticity of Ray and Dask in IaaS environments are not directly comparable to Lithops' **fully serverless model**, which benefits from the near-infinite parallelism offered by cloud functions. PySpark ------- .. image:: https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/2560px-Apache_Spark_logo.svg.png - :align: center :width: 250 +`PySpark `_ is the Python interface for Apache Spark, a well-established distributed computing engine. Spark is typically deployed on a **static cluster of machines**, either on-premises or in cloud environments using HDFS or cloud-native file systems. -Much like Ray or Dask, PySpark is a distributed computing framework that uses cluster technologies. PySpark provides Python bindings for Spark. -Spark is designed to work with a fixed-size node cluster, and it is typically used to process data from on-prem HDFS -and analyze it using SparkSQL and Spark DataFrame. - +PySpark is optimized for **batch analytics** using DataFrames and SparkSQL, but it lacks native integration with FaaS models. Its operational model is not inherently elastic and requires continuous management of a Spark cluster, which may not align with modern, fully managed, or serverless computing paradigms. Serverless Framework -------------------- .. image:: https://cdn.diegooo.com/media/20210606183353/serverless-framework-icon.png - :align: center :width: 250 +`Serverless Framework `_ is a deployment toolchain designed primarily for **building and deploying serverless web applications**, especially on AWS, GCP, and Azure. It is widely used to manage HTTP APIs, event-driven services, and infrastructure-as-code (IaC) for cloud-native apps. -Serverless Framework is a tool to develop serverless applications (mainly NodeJS) and deploy them seemlessly on AWS, GCP -or Azure. +Although both Lithops and Serverless Framework leverage **serverless functions**, their objectives are fundamentally different: + +- **Serverless Framework** focuses on application deployment (e.g., microservices, REST APIs). +- **Lithops** targets **parallel and data-intensive workloads**, enabling large-scale execution of Python functions over scientific datasets, data lakes, and unstructured data in object storage. + +Summary +------- -Although both Serverless Framework and Lithops use serverless functions, their objective is completely different: -Serverless Framework aims to provide an easy-to-use tool to develop applications related to web services, like HTTP APIs, -while Lithops aims to develop applications related to highly parallel scientific computation and Big Data processing. +Lithops stands out as a **cloud-native, serverless-first framework** purpose-built for **parallel computing, data analytics, and scientific workloads**. By abstracting away infrastructure management and providing built-in object storage integration, it delivers a unique balance of **simplicity**, **performance**, and **multi-cloud compatibility** — distinguishing it from traditional cluster-based frameworks and generic serverless tools alike. diff --git a/docs/source/compute_config/kubernetes_rabbitmq.md b/docs/source/compute_config/kubernetes_rabbitmq.md index c74c27326..33db5533c 100644 --- a/docs/source/compute_config/kubernetes_rabbitmq.md +++ b/docs/source/compute_config/kubernetes_rabbitmq.md @@ -4,7 +4,7 @@ All of these changes are **ideal** for pipelines where launching **hundreds of parallel tasks as quickly as possible** is a critical requirement, in a fixed size heterogeneous cluster. -### Changes of K8s RabbitMQ +## Changes of K8s RabbitMQ * **Utilization of RabbitMQ:** Within this architecture, RabbitMQ is employed to launch group invocations in a single call, avoiding the need for multiple calls for each function execution. Additionally, it enables data exchange between the client and running pods, bypassing the Storage Backend as an intermediary, which is slower. This accelerates and streamlines communication significantly. @@ -74,25 +74,25 @@ All of these tests consist of running 225 functions on a 2-node cluster, each wi In this scenario, it is evident that the invocation time is consistently reduced by a factor of **up to 5x** on cold start and **up to 7x** on warm start. This represents a significant enhancement for parallel function execution. -#### Plot 1: Kubernetes K8s original. +- Plot 1: Kubernetes K8s original. *Elapsed time = 16,9 sec.* ![Kubernetes K8s original plot](../images/plots_kubernetes/k8s_original_histogram.png) -#### Plot 2: Kubernetes K8s original with master on Warm Start. +- Plot 2: Kubernetes K8s original with master on Warm Start. *Elapsed time = 8,1 sec.* ![Kubernetes K8s original with Warm Start plot](../images/plots_kubernetes/k8s_original_warm_start_histogram.png) -#### Plot 3: Kubernetes K8s RabbitMQ. +- Plot 3: Kubernetes K8s RabbitMQ. *Elapsed time = 8 sec.* ![Kubernetes K8s RabbitMQ plot](../images/plots_kubernetes/rabbitmq_histogram.png) -#### Plot 4: Kubernetes K8s RabbitMQ with workers on Warm Start. +- Plot 4: Kubernetes K8s RabbitMQ with workers on Warm Start. *Elapsed time = 5,9 sec.* diff --git a/docs/source/contributing.rst b/docs/source/contributing.rst index d0bb84ec4..01aa4e074 100644 --- a/docs/source/contributing.rst +++ b/docs/source/contributing.rst @@ -18,24 +18,39 @@ To contribute a patch 1. Break your work into small, single-purpose patches if possible. It's much harder to merge in a large change with a lot of disjoint features. 2. Submit the patch as a GitHub pull request against the master branch. -3. Make sure that your code passes the unit tests. -4. Make sure that your code passes the linter. -5. Add new unit tests for your code. - - -Unit testing ------------- - -To test that all is working as expected, run either: - -.. code:: - - $ lithops test - - -.. code:: - - $ python3 -m lithops.tests.tests_main - - -Please follow the guidelines in :ref:`testing` for more details. \ No newline at end of file +3. Make sure that your code passes the tests. +4. Make sure that your code passes the linter. Install `flake8` with `pip3 install flake8` and run the next command until you don't see any linitng error: + ```bash + flake8 lithops --count --max-line-length=180 --statistics --ignore W605,W503 + ``` +5. Add new tests for your code. + + +Testing +------- + +To test that all is working as expected, you must install `pytest`, navigate to the tests folder `lithops/tests/`, and execute: +```bash +pytest -v +``` + +If you made changes to a specific backend, please run tests on that backend. +For example, if you made changes to the AWS Lambda backend, execute the tests with: +```bash +pytest -v --backend aws_lambda --storage aws_s3 +``` + +You can list all the available tests using: +```bash +pytest --collect-only +``` + +To run a specific test or group of tests, use the `-k` parameter, for example: +```bash +pytest -v --backend localhost --storage localhost -k test_map +``` + +To view all the Lithops logs during the tests, and in DEBUG mode, execute: +```bash +pytest -o log_cli=true --log-cli-level=DEBUG --backend localhost --storage localhost +``` diff --git a/docs/source/lithops_config_keys.csv b/docs/source/lithops_config_keys.csv index caddcb99c..330453304 100644 --- a/docs/source/lithops_config_keys.csv +++ b/docs/source/lithops_config_keys.csv @@ -1,14 +1,15 @@ Group;Key;Default;Mandatory;Additional info -lithops;backend;``aws_lambda``;no;Compute backend implementation. AWS Lambda is the default. -lithops;storage;``aws_s3``;no;Storage backend implementation. AWS S3 is the default. -lithops;data_cleaner;``True``;no;If set to True, then the cleaner will automatically delete all the temporary data that was written into `storage_bucket/lithops.jobs`. -lithops;monitoring;``storage``;no;Monitoring system implementation. One of: **storage** or **rabbitmq**. -lithops;monitoring_interval;``2``;no;Monitoring check interval in seconds in case of **storage** monitoring. -lithops;data_limit;``4``;no;Max (iter)data size (in MB). Set to False for unlimited size. -lithops;execution_timeout;``1800``;no;Functions will be automatically killed if they exceed this execution time (in seconds). Alternatively, it can be set in the `call_async()`, `map()` or `map_reduce()` calls using the `timeout` parameter. -lithops;include_modules;``[]``;no;Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None. -lithops;exclude_modules;``[]``;no;Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules. -lithops;log_level;``INFO``;no;Logging level. One of: WARNING, INFO, DEBUG, ERROR, CRITICAL, Set to None to disable logging. -lithops;log_format;``%(asctime)s [%(levelname)s] %(name)s -- %(message)``;no; Logging format string. -lithops;log_stream;``ext://sys.stderr``;no;Logging stream. eg.: ext://sys.stderr, ext://sys.stdout -lithops;log_filename;```` ;no;Path to a file. log_filename has preference over log_stream. \ No newline at end of file +lithops;backend;`aws_lambda`;no;Compute backend implementation. Default is AWS Lambda. +lithops;storage;`aws_s3`;no;Storage backend implementation. Default is AWS S3. +lithops;data_cleaner;`True`;no;If True, automatically deletes temporary data written to `storage_bucket/lithops.jobs`. +lithops;monitoring;`storage`;no;Monitoring system implementation. Options: **storage** or **rabbitmq**. +lithops;monitoring_interval;`2`;no;Interval in seconds for monitoring checks (used if monitoring is set to **storage**). +lithops;data_limit;`4`;no;Maximum size (in MB) for iterator data chunks. Set to False for unlimited size. +lithops;execution_timeout;`1800`;no;Maximum execution time for functions in seconds. Functions exceeding this time are killed. Can also be set per call using the `timeout` parameter. +lithops;include_modules;`[]`;no;List of dependencies to explicitly include for pickling. If empty, all required dependencies are included. If set to None, no dependencies are included. +lithops;exclude_modules;`[]`;no;List of dependencies to explicitly exclude from pickling. Ignored if `include_modules` is set. +lithops;log_level;`INFO`;no;Logging level. Options: WARNING, INFO, DEBUG, ERROR, CRITICAL. Set to None to disable logging. +lithops;log_format;`%(asctime)s [%(levelname)s] %(name)s -- %(message)s`;no;Format string for log messages. +lithops;log_stream;`ext://sys.stderr`;no;Logging output stream, e.g., ext://sys.stderr or ext://sys.stdout. +lithops;log_filename;``;no;File path for logging output. Takes precedence over `log_stream` if set. +lithops;retries;`0`;no;Number of retries for failed function invocations when using the `RetryingFunctionExecutor`. Default is 0. Can be overridden per API call. diff --git a/examples/retry.py b/examples/retry.py new file mode 100644 index 000000000..c6aac1e43 --- /dev/null +++ b/examples/retry.py @@ -0,0 +1,46 @@ +import lithops +from lithops.storage import Storage +from lithops import RetryingFunctionExecutor + +# Dictionary of known failures: how many times each input should fail before succeeding +# This must be available to each function at runtime, so hardcoded or passed in as data +FAILURE_MAP = { + 0: 1, # fail once + 1: 2, # fail twice + 2: 0, # succeed immediately + 3: 3, # fail three times (requires at least retries=3) +} + +bucket = 'storage' + + +def my_retry_function(x): + storage = Storage() + + key = f"retries-demo/input-{x}" + try: + count = int(storage.get_object(bucket, key)) + except Exception: + count = 0 + + print(f"[Input {x}] Attempt #{count + 1}") + + if count < FAILURE_MAP.get(x, 0): + # Store updated count before failing + storage.put_object(bucket, key, str(count + 1)) + raise RuntimeError(f"Deliberate failure for input {x}, attempt {count + 1}") + + return x + 100 + + +if __name__ == "__main__": + iterdata = [0, 1, 2, 3] + + with lithops.FunctionExecutor() as fexec: + with RetryingFunctionExecutor(fexec) as retry_exec: + futures = retry_exec.map(my_retry_function, iterdata) + done, not_done = retry_exec.wait(futures, throw_except=False) + outputs = set(f.result() for f in done) + + Storage().delete_objects(bucket, [f"retries-demo/input-{x}" for x in iterdata]) + print("Final results:", outputs) diff --git a/lithops/retries.py b/lithops/retries.py index f22b676af..57e7e480e 100644 --- a/lithops/retries.py +++ b/lithops/retries.py @@ -182,6 +182,7 @@ class RetryingFunctionExecutor: def __init__(self, executor: FunctionExecutor): self.executor = executor + self.config = executor.config def __enter__(self): """ @@ -231,6 +232,13 @@ def map( :return: A list of RetryingFuture objects, one for each function activation. """ + + retries_to_use = ( + retries + if retries is not None + else self.config.get('lithops', {}).get('retries', 0) + ) + futures_list = self.executor.map( map_function, map_iterdata, @@ -250,7 +258,7 @@ def map( f, map_function=map_function, input=i, - retries=retries, + retries=retries_to_use, chunksize=chunksize, extra_args=extra_args, extra_env=extra_env,