From c2e24de2646ab28f12028ad5b02be6fb776c8710 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Sat, 26 Jul 2025 13:55:36 +0200 Subject: [PATCH] [docs] Update docs --- CHANGELOG.md | 3 +- README.md | 16 +- docs/api_futures.md | 425 -------------------------------- docs/api_storage.md | 322 ------------------------ docs/index.rst | 119 ++++++--- docs/source/api_futures.rst | 45 +++- docs/source/data_processing.rst | 280 +++++++++++++++------ docs/user_guide.md | 40 --- lithops/retries.py | 128 +++++++++- 9 files changed, 465 insertions(+), 913 deletions(-) delete mode 100644 docs/api_futures.md delete mode 100644 docs/api_storage.md delete mode 100644 docs/user_guide.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e18f20b2..cfb0b355e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ - ### Fixed -- +- [Localhost] Fix shutil.Error caused by existing __pycache__ directory when copying files in the runner + ## [v3.6.1] diff --git a/README.md b/README.md index 6ca619ee0..b874bf94b 100644 --- a/README.md +++ b/README.md @@ -141,25 +141,27 @@ You can find more usage examples in the [examples](/examples) folder. For documentation on using Lithops, see [latest release documentation](https://lithops-cloud.github.io/docs/) or [current github docs](docs/user_guide.md). If you are interested in contributing, see [CONTRIBUTING.md](./CONTRIBUTING.md). - ## Additional resources ### Blogs and Talks + +* [How to run Lithops over EC2 VMs using the new K8s backend](https://danielalecoll.medium.com/how-to-run-lithops-over-ec2-vms-using-the-new-k8s-backend-4b0a4377c4e9) * [Simplify the developer experience with OpenShift for Big Data processing by using Lithops framework](https://medium.com/@gvernik/simplify-the-developer-experience-with-openshift-for-big-data-processing-by-using-lithops-framework-d62a795b5e1c) * [Speed-up your Python applications using Lithops and Serverless Cloud resources](https://itnext.io/speed-up-your-python-applications-using-lithops-and-serverless-cloud-resources-a64beb008bb5) -* [Serverless Without Constraints](https://www.ibm.com/blog/serverless-without-constraints) +* [Serverless Without Constraints](https://www.ibm.com/cloud/blog/serverless-without-constraints) * [Lithops, a Multi-cloud Serverless Programming Framework](https://itnext.io/lithops-a-multi-cloud-serverless-programming-framework-fd97f0d5e9e4) * [CNCF Webinar - Toward Hybrid Cloud Serverless Transparency with Lithops Framework](https://www.youtube.com/watch?v=-uS-wi8CxBo) -* [Using Serverless to Run Your Python Code on 1000 Cores by Changing Two Lines of Code](https://www.ibm.com/blog/using-serverless-to-run-your-python-code-on-1000-cores-by-changing-two-lines-of-code) -* [Decoding dark molecular matter in spatial metabolomics with IBM Cloud Functions](https://www.ibm.com/blog/decoding-dark-molecular-matter-in-spatial-metabolomics-with-ibm-cloud-functions) +* [Using Serverless to Run Your Python Code on 1000 Cores by Changing Two Lines of Code](https://www.ibm.com/cloud/blog/using-serverless-to-run-your-python-code-on-1000-cores-by-changing-two-lines-of-code) +* [Decoding dark molecular matter in spatial metabolomics with IBM Cloud Functions](https://www.ibm.com/cloud/blog/decoding-dark-molecular-matter-in-spatial-metabolomics-with-ibm-cloud-functions) * [Your easy move to serverless computing and radically simplified data processing](https://www.slideshare.net/gvernik/your-easy-move-to-serverless-computing-and-radically-simplified-data-processing-238929020) Strata Data Conference, NY 2019. See video of Lithops usage [here](https://www.youtube.com/watch?v=EYa95KyYEtg&list=PLpR7f3Www9KCjYisaG7AMaR0C2GqLUh2G&index=3&t=0s) and the example of Monte Carlo [here](https://www.youtube.com/watch?v=vF5HI2q5VKw&list=PLpR7f3Www9KCjYisaG7AMaR0C2GqLUh2G&index=2&t=0s) * [Speed up data pre-processing with Lithops in deep learning](https://developer.ibm.com/patterns/speed-up-data-pre-processing-with-pywren-in-deep-learning/) -* [Predicting the future with Monte Carlo simulations over IBM Cloud Functions](https://www.ibm.com/blog/monte-carlo-simulations-with-ibm-cloud-functions) -* [Process large data sets at massive scale with Lithops over IBM Cloud Functions](https://www.ibm.com/blog/process-large-data-sets-massive-scale-pywren-ibm-cloud-functions) +* [Predicting the future with Monte Carlo simulations over IBM Cloud Functions](https://www.ibm.com/cloud/blog/monte-carlo-simulations-with-ibm-cloud-functions) +* [Process large data sets at massive scale with Lithops over IBM Cloud Functions](https://www.ibm.com/cloud/blog/process-large-data-sets-massive-scale-pywren-ibm-cloud-functions) * [Industrial project in Technion on Lithops](http://www.cs.technion.ac.il/~cs234313/projects_sites/W19/04/site/) ### Papers - +* [Serverful Functions: Leveraging Servers in Complex Serverless Workflows](https://dl.acm.org/doi/10.1145/3700824.3701095) - ACM Middleware Industrial Track 2024 +* [Transparent serverless execution of Python multiprocessing applications](https://dl.acm.org/doi/10.1016/j.future.2022.10.038) - Elsevier Future Generation Computer Systems 2023 * [Outsourcing Data Processing Jobs with Lithops](https://ieeexplore.ieee.org/document/9619947) - IEEE Transactions on Cloud Computing 2022 * [Towards Multicloud Access Transparency in Serverless Computing](https://www.computer.org/csdl/magazine/so/5555/01/09218932/1nMMkpZ8Ko8) - IEEE Software 2021 * [Primula: a Practical Shuffle/Sort Operator for Serverless Computing](https://dl.acm.org/doi/10.1145/3429357.3430522) - ACM/IFIP International Middleware Conference 2020. [See presentation here](https://www.youtube.com/watch?v=v698iu5YfWM) diff --git a/docs/api_futures.md b/docs/api_futures.md deleted file mode 100644 index 4eb0d4798..000000000 --- a/docs/api_futures.md +++ /dev/null @@ -1,425 +0,0 @@ -# Lithops Futures API Details - -## Executor -The primary object in Lithops is the executor. The standard way to get everything set up is to import `lithops`, and create an instance of one of the available modes of executions. - -Lithops is shipped with 3 modes of execution: **Localhost**, **Serverless** and **Standalone**. In this sense, each mode of execution has its own executor class: - -* `lithops.LocalhostExecutor()`: Executor that uses local processes to run jobs in the local machine. -* `lithops.ServerlessExecutor()`: Executor to run jobs in one of the available serverless compute backends. -* `lithops.StandaloneExecutor()`: Executor to run jobs in one of the available standalone compute backends. - -Additionally, Lithops includes a top-level function executor, which encompasses all three previous executors: - -* `lithops.FunctionExecutor()`: Generic executor that will use the configuration to determine its mode of execution, i.e., based on the configuration it will be **localhost**, **serverless** or **standalone**. - - -By default, the executor load the configuration from the config file. Alternatively, you can pass the configuration with a python dictionary. In any case, note that all the parameters set in the executor will overwrite those set in the configuration. - - -The available calls within an executor are: - -|API Call| Type | Description| -|---|---|---| -|[call_async()](api_futures.md#executorcall_async) | Async. | Method used to spawn one function activation | -|[map()](api_futures.md#executormap) | Async. | Method used to spawn multiple function activations | -|[map_reduce()](api_futures.md#executormap_reduce) | Async. | Method used to spawn multiple function activations with one (or multiple) reducers| -|[wait()](api_futures.md#executorwait) | Sync. | Wait for the function activations to complete. It blocks the local execution until all the function activations finished their execution (configurable)| -|[get_result()](api_futures.md#executorget_result) | Sync. | Method used to retrieve the results of all function activations. The results are returned within an ordered list, where each element of the list is the result of one activation| -|[plot()](api_futures.md#executorplot) | Sync. | Method used to create execution plots | -|[job_summary()](api_futures.md#jobsummary) | Sync. | Method used to create a summary file of the executed jobs. It includes times and money | -|[clean()](api_futures.md#executorclean) | Async. | Method used to clean the temporary data generated by Lithops| - - -**LocalhostExecutor(\*\*kwargs)** - -Initialize and return Localhost executor object. - -|Parameter | Default | Description| -|---|---|---| -|config | None | Settings passed in here will override those in lithops_config| -|runtime | None | Name of the docker image to run the functions | -|workers | cpu_count | Max number of parallel workers | -|storage | localhost | Storage backend to store temp data| -|monitoring | storage | Monitoring system implementation. One of: storage, rabbitmq | -|log_level | INFO | Log level printing (INFO, DEBUG, ...). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled| - -Usage: - -```python -import lithops -fexec = lithops.LocalhostExecutor() -``` - -**ServerlessExecutor(\*\*kwargs)** - -Initialize and return a Serverless executor object. - -|Parameter | Default | Description| -|---|---|---| -|config | None | Settings passed in here will override those in lithops_config| -|backend | ibm_cf | Serverless compute backend to run the functions| -|runtime | None | Name of the docker image to run the functions | -|runtime_memory | 256 | Memory (in MB) to use to run the functions | -|storage | ibm_cos | Storage backend to store temp data| -|workers | *depends of the backend* | Max number of parallel workers | -|monitoring | storage | Monitoring system implementation. One of: storage, rabbitmq | -|remote_invoker | False | Spawn a function that will perform the actual job invocation (True/False) | -|log_level | INFO | Log level printing (INFO, DEBUG, ...). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled| - -Usage: - -```python -import lithops -fexec = lithops.ServerlessExecutor() -``` - -**StandaloneExecutor(\*\*kwargs)** - -Initialize and return an Standalone executor object. - -|Parameter | Default | Description| -|---|---|---| -|config | None | Settings passed in here will override those in lithops_config| -|backend | ibm_vpc | Standalone compute backend to run the functions| -|runtime | python3 | Name of the runtime to run the functions. It can be a docker image or *python3* | -|workers | cpu_count | Max number of parallel workers | -|storage | ibm_cos | Storage backend to store temp data| -|monitoring | storage | Monitoring system implementation. One of: storage, rabbitmq | -|log_level | INFO | Log level printing (INFO, DEBUG, ...). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled| - -Usage: - -```python -import lithops -fexec = lithops.StandaloneExecutor() -``` - -**FunctionExecutor(\*\*kwargs)** - -Initialize and return a generic function executor. - -|Parameter | Default | Description| -|---|---|---| -|mode | serverless | Execution mode. One of: localhost, serverless or standalone| -|config | None | Settings passed in here will override those in lithops_config| -|backend | None | Compute backend to run the functions| -|runtime | None | Name of the runtime to run the functions. | -|runtime_memory | None | Memory (in MB) to use to run the functions | -|workers | None | Max number of parallel workers | -|storage | ibm_cos | Storage backend to store temp data| -|monitoring | storage | Monitoring system implementation. One of: storage, rabbitmq | -|remote_invoker | False | Spawn a function that will perform the actual job invocation (True/False) | -|log_level | INFO | Log level printing (INFO, DEBUG, ...). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled| - -Usage: - -```python -import lithops -fexec = lithops.FunctionExecutor() -``` - - -## Executor.call_async() - -Spawn only one function activation. - -**call_async**(func, data, \*\*kwargs) - -|Parameter | Default |Description| -|---|---|---| -|func | |The function to map over the data | -|data | |A single value of data | -|extra_env| None |Additional environment variables for CF environment| -|runtime_memory| 256 |Memory (in MB) to use to run the functions| -|timeout| 600 |Max time per function activation (seconds)| -|include_modules| [] |Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None | -|exclude_modules| [] |Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules | - -* **Returns**: One future for each job (Futures are also internally stored by Lithops). - -* **Usage**: - - ```python - future = fexec.call_async(foo, data) - ``` - -* **Code example**: [call_async.py](../examples/call_async.py) - -## Executor.map() - -Spawn multiple function activations based on the items of an input list. - -**map**(map_function, map_iterdata, \*\*kwargs) - -|Parameter| Default |Description| -|---|---|---| -|map_function | |The function to map over the data | -|map_iterdata | |An iterable of input data (e.g python list) | -|chunksize | 1 | Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk | -|worker_processes | 1 | Number of concurrent/parallel processes in each worker| -|extra_args| None | Additional arguments to pass to each map_function activation | -|extra_env| None |Additional environment variables for CF environment | -|runtime_memory| 256 |Memory (in MB) to use to run the functions | -|timeout| 600 |Max time per function activation (seconds) | -|include_modules| [] |Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None | -|exclude_modules| [] |Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules | -|obj_chunk_size| None | Used for data_processing. Chunk size to split each object in bytes. Must be >= 1MiB. 'None' for processing the whole file in one function activation| -|obj_chunk_number| None | Used for data_processing. Number of chunks to split each object. 'None' for processing the whole file in one function activation. chunk_n has prevalence over chunk_size if both parameters are set| -|obj_newline| '\n' | New line character for keeping line integrity of partitions. 'None' for disabling line integrity logic and get partitions of the exact same size in the functions| - -* **Returns**: A list with size len(map_iterdata) of futures for each job (Futures are also internally stored by Lithops). - -* **Usage**: - - ```python - iterdata = [1, 2, 3, 4] - futures = fexec.map(foo, iterdata) - ``` - -* **Code example**: [map.py](../examples/map.py) - -## Executor.map_reduce() - -Spawn multiple *map_function* activations, based on the items of an input list, eventually spawning one (or multiple) *reduce_function* activations over the results of the map phase. - -**map_reduce**(map_function, map_iterdata, reduce_function, \*\*kwargs) - -|Parameter| Default |Description| -|---|---|---| -|map_function| |The function to map over the data | -|map_iterdata | |An iterable of input data (e.g python list)| -|chunksize | 1 | Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk | -|worker_processes | 1 | Number of concurrent/parallel processes in each worker| -|extra_args| None | Additional arguments to pass to each map_function activation | -|reduce_function| |The function to map over the results of map_function | -|spawn_reducer| 20 | Percentage of done map functions before spawning the reduce function. By default the reducer is spawned when 20% of the map activations are done. | -|extra_env| None | Additional environment variables for CF environment| -|map_runtime_memory| 256 | Memory (in MB) to use to run the map_function| -|reduce_runtime_memory| 256| Memory (in MB) to use to run the reduce_function| -|timeout| 600 | Max time per function activation (seconds)| -|include_modules| [] |Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None | -|exclude_modules| [] |Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules | -|obj_chunk_size| None | Used for data_processing. Chunk size to split each object in bytes. Must be >= 1MiB. 'None' for processing the whole file in one function activation| -|obj_chunk_number| None | Used for data_processing. Number of chunks to split each object. 'None' for processing the whole file in one function activation. chunk_n has prevalence over chunk_size if both parameters are set| -|obj_newline| '\n' | New line character for keeping line integrity of partitions. 'None' for disabling line integrity logic and get partitions of the exact same size in the functions| -|obj_reduce_by_key| False| Used for data_processing. Set one reducer per object after running the partitioner (reduce-by-key) | - - -* **Returns**: A list with size len(map_iterdata) of futures for each job (Futures are also internally stored by Lithops). - -* **Usage**: - - ```python - iterdata = [1, 2, 3, 4] - futures = fexec.map_reduce(foo, iterdata, bar) - ``` - -* **Code example**: [map_reduce.py](../examples/map_reduce.py) - - -## Executor.wait() - -Waits for the function activations to finish. - -**wait**(\*\*kwargs) - -|Parameter| Default |Description| -|---|---|---| -|fs| None | List of futures to wait. If None, Lithops uses the internally stored futures | -|throw_except | True | Re-raise exception if call raised| -|return_when| ALL_COMPLETED | One of 'ALL_COMPLETED', 'ANY_COMPLETED', 'ALWAYS' | -|download_results| False | Whether or not download the results while monitoring activations | -|timeout| None | Timeout of waiting for results (in seconds)| -|THREADPOOL_SIZE| 128 | Number of threads to use waiting for results| -|WAIT_DUR_SEC| 1 | Time interval between each check (seconds) if no rabbitmq_monitor activated | -|show_progressbar| True | whether or not to show the progress bar | - - -* **Returns**: `(fs_done, fs_notdone)` where `fs_done` is a list of futures that have completed and `fs_notdone` is a list of futures that have not completed. - -* **Usage**: - - ```python - iterdata = [1, 2, 3, 4] - futures = fexec.map(foo, iterdata) - fexec.wait() - ``` - -* **Code example**: [wait.py](../examples/wait.py) - -## Executor.get_result() - -Gets the results from all the function activations. It internally makes use of the `Executor.wait()` method. - -**get_result**(\*\*kwargs) - -|Parameter| Default |Description| -|---|---|---| -|fs| None | List of futures to get the results. If None, Lithops uses the internally stored futures | -|throw_except | True | Re-raise exception if call raised| -|timeout| None | Timeout of waiting for results (in seconds)| -|THREADPOOL_SIZE| 128 | Number of threads to use waiting for results| -|WAIT_DUR_SEC| 1 | Time interval between each check (seconds) if no rabbitmq_monitor activated | -|show_progressbar| True | whether or not to show the progress bar | - -* **Returns**: The results are returned within an ordered list, where each element of the list is the result of one activation. - -* **Usage**: - - ```python - iterdata = [1, 2, 3, 4] - futures = fexec.map(foo, iterdata) - results = fexec.get_result() - ``` - -* **Code example**: [call_async.py](../examples/call_async.py), [map.py](../examples/map.py), [map_reduce.py](../examples/map_reduce.py) - -## Executor.plot() - -Creates 2 detailed execution plots: A timeline plot and a histogram plot. - -**plot**(\*\*kwargs) - -|Parameter| Default |Description| -|---|---|---| -|fs| None | List of futures to plot. If None, Lithops uses the internally stored futures| -|dst| None | Path to destination file, either absolute or relative. If set, you must specify the path + the file prefix (see example below), then lithops will generate the *prefix*_histogram.png and *prefix*_timeline.png files. If None, Lithops will create a new folder called *plots* in the current directory and use the current timestamp as file *prefix* | - -* **Returns**: *Nothing*. It stores 2 different plots in the selected `dst` path. - -* **Usage**: - - ```python - iterdata = [1, 2, 3, 4] - fexec.map(foo, iterdata) - results = fexec.get_result() # or fexec.wait() - # The next command will generate test_timeline.png and test_histogram.png in ~/lithops_plots - fexec.plot(dst='~/lithops_plots/test') - ``` - -* **Example**: - -

- - -

- -## Executor.clean() - -Cleans the temporary data generated by Lithops in IBM COS. This process runs asynchronously to the main execution since Lithops starts another process to do the task. If `data_cleaner=True` (default), this method is executed automatically after calling `get_result()`. - -**clean**(\*\*kwargs) - -|Parameter| Default |Description| -|---|---|---| -|fs| None | List of futures to clean temp data. If None, Lithops uses the internally stored futures | -|cs| None | List of cloudobjects to clean | -|clean_cloudobjects| True | Clean or not the cloudobjects generated in the executor | -|spawn_cleaner| True | Spawn cleaner process. If false it stores the data to be cleaned in a tmp dir | - -* **Returns**: *Nothing*. - -* **Usage**: - - ```python - iterdata = [1, 2, 3, 4] - futures = fexec.map(foo, iterdata) - results = fexec.get_result() - fexec.clean() - ``` - -* **Code example**: [map.py](../examples/map.py) - - -# Function chaining - -Function chaining is a pattern where multiple functions are called on the same executor consecutively. Using the same `lithops.FunctionExecutor` object reference, multiple functions can be invoked. It increases the readability of the code and means less redundancy. This means we chain multiple functions together with the same element reference. It’s not necessary to attach the `lithops.FunctionExecutor` reference multiple times for each function call. - -This patter is specially useful when the output of one invocation is the input of another invocation. In this case, Lithops does not download the intermediate results to the local client, instead, the intermediate results are directly read from the next function. - -It currently works with the Futures API, and you can chain the `map()`, `map_reuce()`, `wait()` and `get_result()` methods. Note that the returning value of one function must match the signature of the next function when chaining multiple `map()` calls. View the next examples: - - -Getting the result from a single `map()` call: - -```python -import lithops - -def my_func1(x): - return x*2 - -iterdata = [1, 2, 3] - -fexec = lithops.FunctionExecutor() -res = fexec.map(my_func1, iterdata).get_result() -print(res) -``` - - -Chain multiple map() calls and get the final result: - -```python -import lithops - - -def my_func1(x): - return x*2, 5 - -def my_func2(x, y): - return x+y - -iterdata = [1, 2, 3] - -fexec = lithops.FunctionExecutor() -res = fexec.map(my_func1, iterdata).map(my_func2).get_result() -print(res) -``` - -There is no limit in the number of map() calls that can be chained: - -```python -def my_func1(x): - return x+2, 5 - - -def my_func2(x, y): - return x+y, 5, 2 - - -def my_func3(x, y, z): - return x+y+z - - -iterdata = [1, 2, 3] - -fexec = lithops.FunctionExecutor() -res = fexec.map(my_func1, iterdata).map(my_func2).map(my_func3).get_result() -print(res) -``` - -Alternatively, you can pass the `futures` generated in a `map()` or `map_reduce()` call to the `iterdata` parameter with the same effect. Not that in this case you will only get the results of the last `map()` execution. Results of intermediate `map()`s are never downloaded: - -```python -def my_func1(x): - return x+2, 5 - - -def my_func2(x, y): - return x+y, 5, 2 - - -def my_func3(x, y, z): - return x+y+z - - -iterdata = [1, 2, 3] - -fexec = lithops.FunctionExecutor() -futures1 = fexec.map(my_func1, iterdata) -futures2 = fexec.map(my_func2, futures1) -futures3 = fexec.map(my_func3, futures2) -final_result = fexec.get_result() - -print(final_result) -``` \ No newline at end of file diff --git a/docs/api_storage.md b/docs/api_storage.md deleted file mode 100644 index 5d126201c..000000000 --- a/docs/api_storage.md +++ /dev/null @@ -1,322 +0,0 @@ -# Lithops Storage API Details - -Lithops allows to create a **Storage** instance and abstract away the backend implementation details. The standard way to get a Storage object set up is to import the lithops `Storage` class and create an instance. - - -**Storage(\*\*kwargs)** - -Initialize and return a Storage object. - -|Parameter | Default | Description| -|---|---|---| -|config | None | Lithops configuration dictionary | -|backend | None | Name of the backend | - - - -By default, the configuration is loaded from the lithops config file, so there is no need to provide any parameter to create a Storage instance: - -```python -from lithops import Storage - -storage = Storage() -``` - -Alternatively, you can pass the lithops configuration through a dictionary. In this case, it will load the storage backend set in the `storage` key of the `lithops` section: - -```python -from lithops import Storage - -config = {'lithops' : {'storage' : 'ibm_cos'}, - 'ibm_cos': {'region': 'REGION', 'api_key': 'API_KEY'}} - -storage = Storage(config=config) -``` - -In case you have multiple storage set in your configuration, you can force the storage backend by using the `backend` parameter: - -```python -from lithops import Storage - -storage = Storage(backend='redis') # this will create a redis Storage instance -``` - -or: - -```python -from lithops import Storage - -config = {'lithops' : {'storage' : 'ibm_cos'}, - 'ibm_cos': {'region': 'REGION', 'api_key': 'API_KEY'}} - 'redis': {'host': 'HOST', 'port':'PORT'}} - - -storage = Storage(config=config) # this will create an ibm_cos Storage instance -storage = Storage(config=config, backend='redis') # this will create a redis Storage instance -``` - -## Storage API Calls - -### `Storage.put_object()` - -Adds an object to a bucket of the storage backend. - -**put_object**(bucket, key, data) - -|Parameter | Description| -|---|---| -|bucket | Name of the bucket (String)| -|key | Name of the object (String)| -|data| Object data (bytes/string or seekable file-like object)| - -* **Usage**: - - ```python - storage = Storage() - # Bytes/string data - storage.put_object('my_bucket', 'test.txt', 'Hello World') - ``` - - ```python - storage = Storage() - # Seekable file-like object - with open('/tmp/my_big_file.csv', 'rb') as fl: - storage.put_object('my_bucket', 'my_big_file.csv', fl) - ``` - - -### `Storage.get_object()` - -Retrieves objects from the storage backend. - -**get_object**(bucket, key, \*\*kwargs) - -|Parameter | Description| -|---|---| -|bucket | Name of the bucket (String)| -|key | Name of the object (String)| -|stream | Get the object data or a file-like object (True/False) | -|extra_get_args | Extra get arguments to be passed to the underlying backend implementation (dict). For example, to specify the byte-range to read: `extra_get_args={'Range': 'bytes=0-100'}`| - -* **Usage**: - - ```python - storage = Storage() - data = storage.get_object('my_bucket', 'test.txt') - ``` - - -### `Storage.head_object()` -The HEAD operation retrieves metadata from an object without returning the object itself. This operation is useful if you're only interested in an object's metadata. - -**head_object**(bucket, key) - -|Parameter | Description| -|---|---| -|bucket | Name of the bucket (String)| -|key | Name of the object (String)| - -* **Usage**: - - ```python - storage = Storage() - obj_metadata = storage.head_object('my_bucket', 'test.txt') - ``` - - -### `Storage.delete_object()` - -Removes objects from the storage backend - -**delete_object**(bucket, key) - -|Parameter | Description| -|---|---| -|bucket | Name of the bucket (String)| -|key | Name of the object (String)| - -* **Usage**: - - ```python - storage = Storage() - storage.delete_object('my_bucket', 'test.txt') - ``` - -### `Storage.delete_objects()` - -This operation enables you to delete multiple objects from a bucket using a single HTTP request. If you know the object keys that you want to delete, then this operation provides a suitable alternative to sending individual delete requests, reducing per-request overhead. - -**delete_objects**(bucket, key_list) - -|Parameter | Description| -|---|---| -|bucket | Name of the bucket (String)| -|key_list | Name of the objects (list)| - -* **Usage**: - - ```python - storage = Storage() - storage.delete_objects('my_bucket', ['test1.txt', 'test2.txt']) - ``` - - -### `Storage.head_bucket()` - -This operation is useful to determine if a bucket exists and you have permission to access it. The operation returns a 200 OK if the bucket exists and you have permission to access it. Otherwise, the operation might return responses such as 404 Not Found and 403 Forbidden . - -**head_bucket**(bucket) - -|Parameter | Description| -|---|---| -|bucket | Name of the bucket (String)| - -* **Usage**: - - ```python - storage = Storage() - storage.head_bucket('my_bucket') - ``` - - -### `Storage.list_objects()` - -Returns all of the objects in a bucket. For each object, the list contains the name of the object (key) and the size. - -**list_objects**(bucket, \*\*kwargs) - -|Parameter | Description| -|---|---| -|bucket | Name of the bucket (String)| -|prefix | key prefix for filtering (String)| - -* **Usage**: - - ```python - storage = Storage() - storage.list_objects('my_bucket', prefix='temp/') - ``` - - -### `Storage.list_keys()` - -Similar to list_objects(), it returns all of the objects in a bucket. For each object, the list contains only the names of the objects (keys). - -**list_keys**(bucket, \*\*kwargs) - -|Parameter | Description| -|---|---| -|bucket | Name of the bucket (String)| -|prefix | key prefix for filtering (String)| - -* **Usage**: - - ```python - storage = Storage() - storage.list_keys('my_bucket') - ``` - - -### `Storage.get_client()` -Returns the underlying storage backend client. For example, if `Storage` is an instance built on top of AWS S3, it returns a boto3 client. - -**get_client**() - -* **Usage**: - - ```python - storage = Storage() - boto3_client = storage.get_client() - ``` - -### `Storage.put_cloudobject()` - -Adds objects to a bucket of the storage backend. Returns a **cloudobject** that is a reference to the object. - -**put_cloudobject**(body, \*\*kwargs) - -|Parameter | Description| -|---|---| -|body| Object data (bytes/string or seekable file-like object)| -|bucket | Name of the bucket (String). By default it uses the `storage_bucket`| -|key | Name of the object (String). By default it creates a random key| - -If `bucket` paramter is not provided, it will use the `storage_bucket` set in the lithops config. If `key` is not provided, it will create a random temporary key. - -* **Usage**: - - ```python - storage = Storage() - # Bytes/string - cobj = storage.put_cloudobject('Hello World!') - ``` - - ```python - storage = Storage() - # Seekable file-like object - with open('/tmp/my_big_file.csv', 'rb') as fl: - cobj = storage.put_cloudobject(fl) - ``` - - -### `Storage.get_cloudobject()` - -Retrieves CloudObjects from a bucket of the storage backend. - -**get_cloudobject**(cloudobject, \*\*kwargs) - - -|Parameter | Description| -|---|---| -|cloudobject| CloudObject Instance| -|stream | Get the object data or a file-like object (True/False) | - - -* **Usage**: - - ```python - storage = Storage() - cobj = storage.put_cloudobject('Hello World!', 'my-bucket', 'test.txt') - data = storage.get_cloudobject(cobj) - ``` - - -### `Storage.delete_cloudobject()` - -Removes CloudObjects from a bucket of the storage backend. - -**delete_cloudobject**(cloudobject) - - -|Parameter | Description| -|---|---| -|cloudobject| CloudObject Instance| - - -* **Usage**: - - ```python - storage = Storage() - cobj = storage.put_cloudobject('Hello World!', 'test.txt') - storage.delete_cloudobject(cobj) - ``` - -### `Storage.delete_cloudobjects()` - -This operation enables you to delete multiple objects from a bucket using a single HTTP request. If you know the object keys that you want to delete, then this operation provides a suitable alternative to sending individual delete requests, reducing per-request overhead. - -**delete_cloudobject**(cloudobjects, \*\*kwargs) - - -|Parameter | Description| -|---|---| -|cloudobjects| CloudObject Instances (list)| - - -* **Usage**: - - ```python - storage = Storage() - cobj1 = storage.put_cloudobject('Hello World!', 'test1.txt') - cobj2 = storage.put_cloudobject('Hello World!', 'test2.txt') - storage.delete_cloudobjects([cobj1, cobj2]) \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 1ef6d8512..b427d607e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,64 +1,98 @@ -What is Lithops? -**************** +Welcome to Lithops! +******************** -**Lithops is a Python multi-cloud serverless computing framework. It allows to run unmodified local python code at massive scale in the main serverless computing platforms.** +**Lithops is a Python multi-cloud serverless computing framework** that empowers you to **run unmodified Python code at massive scale** on leading serverless platforms and beyond. -Lithops delivers the user’s code into the cloud without requiring knowledge of how it is deployed and run. -Moreover, its multicloud-agnostic architecture ensures portability across cloud providers, overcoming vendor lock-in. +Whether you're processing terabytes of data or launching thousands of parallel tasks, Lithops lets you **focus on your code, not infrastructure**. It brings simplicity, performance, and flexibility to cloud-native computing. ------------- -**Lithops provides great value for data-intensive applications like Big Data analytics and embarrassingly parallel jobs.** +Why Lithops? +============ -It is specially suited for highly-parallel programs with little or no need for communication between processes. +Serverless computing makes it easy to run code in the cloud β€” but scaling data-intensive workloads across clouds is hard. Lithops solves this by providing: -Examples of applications that run with Lithops include Monte Carlo simulations, deep learning and machine learning processes, metabolomics computations, and geospatial -analytics, to name a few. +- βœ… **Zero-configuration scale-out**: Run your Python functions on thousands of cloud workers with no infrastructure management. +- 🌍 **True multi-cloud portability**: Move seamlessly between AWS, GCP, Azure, IBM Cloud, etc... +- πŸ’‘ **Developer-first experience**: Write standard Python code, including NumPy, pandas, and scikit-learn β€” no cloud-specific boilerplate required. +- 🧠 **Optimized for big data and AI**: Efficiently process massive datasets stored in object storage services with automatic partitioning. ------------- -**Lithops facilitates consuming data from object storage (like AWS S3, GCP Storage or IBM Cloud Object Storage) by providing automatic partitioning and data discovery for common data formats like CSV.** +What You Can Build +=================== -Lithops abstracts away the underlying cloud-specific APIs for accessing storage and provides an intuitive and easy to use interface to process high volumes of data. +Lithops is ideal for **highly parallel, data-heavy workloads**. These include: +- πŸ” Monte Carlo simulations +- 🧬 Metabolomics and genomics pipelines +- πŸ—ΊοΈ Geospatial analytics +- 🧠 Deep learning and hyperparameter tuning +- πŸ“Š Big Data ETL and analytics workflows -Use any Cloud -------------- -**Lithops provides an extensible backend architecture that is designed to work with different compute and storage services available on Cloud providers and on-premise backends.** +If your problem can be broken down into many small, independent tasks, Lithops will help you solve it at scale β€” fast. -In this sense, you can code your application in Python and run it unmodified wherever your data is located at: IBM Cloud, AWS, Azure, Google Cloud and Alibaba Aliyun... +Key Features +============ + +Compute Anywhere +---------------- +**Lithops features a modular and extensible backend architecture**, allowing you to run workloads across: + +- Serverless functions +- Cloud VMs and Kubernetes clusters +- On-premise compute resources + +No matter where your data lives, Lithops can execute your code right next to it. .. image:: source/images/multicloud.jpg :alt: Available backends :align: center -| -Quick Start ------------ +Object Storage Made Easy +------------------------- + +**Seamlessly process large-scale data stored in object storage.** + +Lithops simplifies working with data lakes and object storage by providing: + +- πŸ” **Automatic data discovery**: Detects and lists files across nested directories. +- πŸ“‚ **Transparent data partitioning**: Splits large files (e.g., CSV, Parquet, JSON) into chunks for efficient parallel processing. +- 🧰 **Unified, Pythonic API**: Interact with your data using a single interface, regardless of where it's stored. + +You write simple Python code β€” Lithops handles the complexity of parallel I/O, data distribution, and storage backends under the hood. + + +Get Started Quickly +==================== + +To start using Lithops: + +1. Install via pip: -Lithops is available for Python 3.6 and up. Install it using ``pip``: + .. code-block:: bash -.. code-block:: + pip install lithops - pip install -U lithops +2. Configure your cloud credentials (see the full guide in :doc:`/config`) -You're ready to execute a simple example! +3. Write and run your first parallel job: -.. code:: python + .. code-block:: python - from lithops import FunctionExecutor + import lithops - def hello(name): - return 'Hello {}!'.format(name) + def my_function(x): + return x * 2 - with FunctionExecutor() as fexec: - fut = fexec.call_async(hello, 'World') - print(fut.result()) + fexec = lithops.FunctionExecutor() + fexec.map(my_function, range(10)) + print(fexec.get_result()) + +You're now running massively parallel workloads with just a few lines of code! Success stories ---------------- +=============== * `Metaspace Metabolomics Platform `_ is running in production in AWS with hundreds of users. MetaSpace is using Lithops over Lambda Functions and EC2 VMs to access metabolomics data in Amazon S3. @@ -77,8 +111,9 @@ Success stories reaching tens of thousands of concurrent functions. LithopsHPC is now being used in the neardata.eu project for extreme data analytics of genomics pipelines. + Blogs and Talks ---------------- +=============== * `Simplify the developer experience with OpenShift for Big Data processing by using Lithops framework `_ @@ -117,8 +152,9 @@ Blogs and Talks * `Industrial project in Technion on Lithops `_ + Papers ------- +====== * `Serverful Functions: Leveraging Servers in Complex Serverless Workflows `_ - ACM Middleware Industrial Track 2024 @@ -144,6 +180,21 @@ Papers `_ - ACM/IFIP International Middleware Conference 2018 +Join the Community +================== + +Lithops is an open-source project, actively maintained and supported by a community of contributors and users. You can: + +- πŸ’¬ Join the discussion on `GitHub Discussions `_ +- 🐞 Report issues or contribute on `GitHub `_ +- πŸ“– Read more in the full documentation + + +--- + +**Start writing scalable cloud applications β€” with Lithops.** + + .. toctree:: :hidden: diff --git a/docs/source/api_futures.rst b/docs/source/api_futures.rst index a2cee0c5f..e374a2651 100644 --- a/docs/source/api_futures.rst +++ b/docs/source/api_futures.rst @@ -3,19 +3,43 @@ Lithops Futures API =================== -The primary object in Lithops is the executor. The standard way to get everything set up is to import `lithops`, and create an instance of one of the available modes of executions. +The core abstraction in Lithops is the **executor**, responsible for orchestrating the execution of your functions across different environments. -Lithops is shipped with 3 modes of execution: **Localhost**, **Serverless** and **Standalone**. In this sense, each mode of execution has its own executor class: +To get started, you typically import `lithops` and create an executor instance to run your code. Lithops provides a flexible set of executors to suit different needs. -* `lithops.LocalhostExecutor()`: Executor that uses local processes to run jobs in the local machine. -* `lithops.ServerlessExecutor()`: Executor to run jobs in one of the available serverless compute backends. -* `lithops.StandaloneExecutor()`: Executor to run jobs in one of the available standalone compute backends. +### Primary Executors -Additionally, Lithops includes a top-level function executor, which encompasses all three previous executors: +* **FunctionExecutor** (`lithops.FunctionExecutor()`): + The main, generic executor that automatically selects its execution mode based on the provided configuration. + This lets you write your code once and run it seamlessly on localhost, serverless, or standalone backends without changing your code. -* `lithops.FunctionExecutor()`: Generic executor that will use the configuration to determine its mode of execution, i.e., based on the configuration it will be **localhost**, **serverless** or **standalone**. +* **RetryingFunctionExecutor** (`lithops.RetryingFunctionExecutor()`): + A robust wrapper around `FunctionExecutor` that transparently handles retries on failed tasks. + It supports all features of `FunctionExecutor` with added automatic retry logic, improving fault tolerance and reliability for unstable or transient failure-prone environments. + +### Secondary Executors + +For more specialized use cases, Lithops also provides explicit executors for each execution mode: + +* **LocalhostExecutor** (`lithops.LocalhostExecutor()`): + Runs jobs locally using multiple processes on your machine. Ideal for development, debugging, or small-scale workloads. + +* **ServerlessExecutor** (`lithops.ServerlessExecutor()`): + Executes jobs on serverless compute platforms, managing scaling and deployment automatically. Best for massively parallel, ephemeral workloads. + +* **StandaloneExecutor** (`lithops.StandaloneExecutor()`): + Runs jobs on standalone compute backends such as clusters or virtual machines, suitable for long-running or resource-heavy tasks. + +--- + +### Configuration and Initialization + +By default, executors load configuration from the Lithops configuration file (e.g., `lithops_config.yaml`). You can also supply configuration parameters programmatically via a Python dictionary when creating an executor instance. Parameters passed explicitly override those in the config file, allowing for flexible customization on the fly. + +--- + +This layered executor design lets Lithops provide a powerful, unified API for parallel function execution β€” from local development to multi-cloud production deployments with fault tolerance and retries built-in. -By default, the executor load the configuration from the config file. Alternatively, you can pass the configuration with a python dictionary. In any case, note that all the parameters set in the executor will overwrite those set in the configuration. Futures API Reference --------------------- @@ -24,3 +48,8 @@ Futures API Reference :members: :undoc-members: :show-inheritance: + +.. autoclass:: lithops.retries.RetryingFunctionExecutor + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/data_processing.rst b/docs/source/data_processing.rst index 1fa0d48a1..4ddfb9479 100644 --- a/docs/source/data_processing.rst +++ b/docs/source/data_processing.rst @@ -1,156 +1,292 @@ .. _data-processing: -Processing data from the Cloud -=========================================== +Processing Data from the Cloud +============================== -Lithops has built-in logic for processing data objects from public URLs and object storage services. This logic is automatically activated with the reseverd parameter named **obj**. When you write in the parameters of a function the parameter name **obj**, you are telling to Lithops that you want to process objects located in an object storage service, public urls, or localhost files. +Lithops provides built-in support for reading and processing data from **object storage**, **public URLs**, and **local files**. This functionality is automatically enabled when your function includes a reserved parameter named **obj**. -Additionally, the built-in data-processing logic integrates a **data partitioner** system that allows to automatically split the dataset in smallest chunks. Splitting a file into smaller chunks permit to leverage the parallelism provided by the compute backends to process the data. We designed the partitioner within the ``map()`` and ``map_reduce()`` API calls, an it is configurable by specifying the *size of the chunk*, or the *number of chunks* to split each file. The current implementation of the data partitioner supports to split files that contain multiple lines (or rows) ended by '\n', for example, a .txt book or a common .csv file among others. More data-types will be supported in future releases. +When you define a function with the parameter `obj`, Lithops knows to pass in a special object representing a file (or a chunk of a file) from an external data source. This allows you to write scalable data processing workflows with minimal boilerplate. +Data Partitioning +----------------- -Cloud Object Storage --------------------- -For processing data from a cloud object storage service, the input data must be either a list of buckets, a list of buckets with object prefix, or a list of data objects. If you set the *size of the chunk* or the *number of chunks*, the partitioner is activated inside Lithops and it is responsible to split the objects into smaller chunks, eventually running one function activation for each generated chunk. If *size of the chunk* and *number of chunks* are not set, chunk is an entire object, so one function activation is executed for each individual object. +Lithops includes an integrated **data partitioner** that allows you to automatically split large datasets into smaller, more manageable chunks. This partitioning enables massive parallelism across the compute backend, accelerating data processing tasks. -The **obj** parameter is a python class from where you can access all the information related to the object (or chunk) that the function is processing. For example, consider the following function that shows all the available attributes in **obj** when you are processing objects from an object store: +Partitioning is supported directly within the :meth:`map()` and :meth:`map_reduce()` APIs and can be controlled via: +- **`obj_chunk_size`**: The size (in bytes) of each chunk to split the object into. +- **`obj_chunk_number`**: The total number of chunks to split each object into. -.. code:: python +Currently, the partitioner supports **text-based files** where rows are separated by newline characters (`\n`), such as `.txt` and `.csv`. Support for additional data types is planned in future releases. + +Cloud Object Storage Integration +-------------------------------- + +When processing data from cloud object storage, your input must be one of the following: + +1. A single bucket or a list of buckets +2. A bucket prefix (e.g., a folder path) +3. A list of specific object keys + +Based on your configuration: + +- If `obj_chunk_size` or `obj_chunk_number` is set, **each object is automatically split into smaller chunks**, and Lithops runs one function activation per chunk. +- If chunking is not configured, Lithops runs one function activation per full object. + +Accessing Object Metadata +-------------------------- + +Inside your function, the `obj` parameter gives you access to metadata and data for the current chunk being processed. + +Example: + +.. code-block:: python def my_map_function(obj): - print(obj.bucket) - print(obj.key) - print(obj.part) - print(obj.data_byte_range) - print(obj.chunk_size) - - data = obj.data_stream.read() + print(obj.bucket) # Bucket name + print(obj.key) # Object key + print(obj.part) # Chunk number + print(obj.data_byte_range) # Byte range for this chunk + print(obj.chunk_size) # Chunk size in bytes + + data = obj.data_stream.read() # Read the data for this chunk + +Accepted Input Formats +----------------------- -The allowed inputs of a function can be: +Lithops accepts **only one type** of input format per execution. Do not mix formats in the same list. The supported formats are: -- Input data is a bucket or a list of buckets. See an example in [map_reduce_cos_bucket.py](../../examples/map_reduce_cos_bucket.py): +- **Buckets**: One or more buckets + *(See: `map_reduce_cos_bucket.py <../../examples/map_reduce_cos_bucket.py>`_)* -.. code:: python + .. code-block:: python - iterdata = 'bucket1' + iterdata = ['my-bucket-1', 'my-bucket-2'] -- Input data is a bucket(s) with object prefix. See an example in [map_cos_prefix.py](../../examples/map_cos_prefix.py): +- **Object Prefixes**: Folder-like paths ending with `/` + *(See: `map_cos_prefix.py <../../examples/map_cos_prefix.py>`_)* -.. code:: python + .. code-block:: python - iterdata = ['bucket1/images/', 'bucket1/videos/'] + iterdata = ['my-bucket/data/csvs/', 'my-bucket/logs/'] -Notice that you must write the end slash (/) to inform partitioner you are providing an object prefix. + ⚠️ Prefixes must end with a `/` to indicate to the partitioner that you're specifying a folder-like path. -- Input data is a list of object keys. See an example in [map_reduce_cos_key.py](../../examples/map_reduce_cos_key.py): +- **Object Keys**: Specific file paths + *(See: `map_reduce_cos_key.py <../../examples/map_reduce_cos_key.py>`_)* -.. code:: python + .. code-block:: python - iterdata = ['bucket1/object1', 'bucket1/object2', 'bucket1/object3'] + iterdata = ['my-bucket/file1.csv', 'my-bucket/file2.csv'] -Notice that *iterdata* must be only one of the previous 3 types. Intermingled types are not allowed. For example, you cannot set in the same *iterdata* a bucket and some object keys: +❌ **Mixing formats is not allowed**: -.. code:: python +.. code-block:: python - iterdata = ['bucket1', 'bucket1/object2', 'bucket1/object3'] # Not allowed + # This will raise an error + iterdata = ['my-bucket', 'my-bucket/file2.csv'] -Once iterdata is defined, you can execute Lithops as usual, either using *map()* or *map_reduce()* calls. If you need to split the files in smaller chunks, you can set (optionally) the *obj_chunk_size* or *obj_chunk_number* parameters. +Putting It All Together +------------------------ -.. code:: python +Once you've defined your input and function, you can run Lithops as usual with optional chunking: + +.. code-block:: python import lithops - object_chunksize = 4*1024**2 # 4MB + object_chunksize = 4 * 1024 ** 2 # 4 MB per chunk fexec = lithops.FunctionExecutor() fexec.map_reduce(my_map_function, iterdata, obj_chunk_size=object_chunksize) result = fexec.get_result() -Processing data from public URLs --------------------------------- -For processing data from public URLs, the input data must be either a single URL or a list of URLs. As in the previous case, if you set the *size of the chunk* or the *number of chunks*, the partitioner is activated inside Lithops and it is responsible to split the objects into smaller chunks, as long as the remote storage server allows requests in chunks (ranges). If range requests are not allowed in the remote storage server, each URL is treated as a single object. -The **obj** parameter is a python class from where you can access all the information related to the object (or chunk) that the function is processing. For example, consider the following function that shows all the available attributes in **obj** when you are processing URLs: +Processing Data from Public URLs +================================ + +Lithops also supports processing data directly from **public URLs**. The input can be a single URL or a list of URLs. +If you set the `obj_chunk_size` or `obj_chunk_number`, Lithops activates its internal partitioner to split each file into smaller chunksβ€”**provided that the remote server supports HTTP range requests**. If range requests are not supported, each URL is processed as a single object. -.. code:: python +As with other backends, the special **`obj`** parameter gives you access to metadata and the content of the chunk being processed. + +Example: + +.. code-block:: python import lithops def my_map_function(obj): - print(obj.url) - print(obj.part) - print(obj.data_byte_range) - print(obj.chunk_size) + print(obj.url) # Full URL of the object + print(obj.part) # Chunk number + print(obj.data_byte_range) # Byte range for this chunk + print(obj.chunk_size) # Size of this chunk (in bytes) data = obj.data_stream.read() for line in data.splitlines(): - # Do some process - return partial_intersting_data + # Process each line + pass + + return partial_result def my_reduce_function(results): - for partial_intersting_data in results: - # Do some process + for partial_result in results: + # Aggregate results + pass + return final_result - iterdata = ['http://myurl/my_file_1.csv', 'http://myurl/my_file_2.csv'] - object_chunk_number= 2 + iterdata = ['http://example.com/file1.csv', 'http://example.com/file2.csv'] + chunk_number = 2 fexec = lithops.FunctionExecutor() fexec.map_reduce(my_map_function, iterdata, my_reduce_function, - obj_chunk_number=object_chunk_number) + obj_chunk_number=chunk_number) result = fexec.get_result() -See a complete example in `map_reduce_url.py `_ +πŸ“„ See the full example in: +`map_reduce_url.py `_ + + +Processing Data from Localhost Files +==================================== +.. note:: This feature is only available when using the **localhost backend**. -Processing data from localhost files ------------------------------------- +Lithops can also process files stored on the local filesystem. The input can be: -.. note:: This is only allowed when running Lithops with the localhost backend +- A single file path +- A list of file paths +- A directory path +- A list of directory paths -For processing data from localhost files, the input data must be either a directory path, a list of directory paths, a file path a list of file paths. As in the previous cases, if you set the *size of the chunk* or the *number of chunks*, the partitioner is activated inside Lithops and it is responsible to split the objects into smaller chunks, eventually spawning one function for each generated chunk. If *size of the chunk* and *number of chunks* are not set, chunk is an entire object, so one function activation is executed for each individual object. +As in other cases, if you set `obj_chunk_size` or `obj_chunk_number`, the file(s) will be split into chunks and processed in parallel. If not set, each file is processed as a single object. -The **obj** parameter is a python class from where you can access all the information related to the object (or chunk) that the function is processing. For example, consider the following function that shows all the available attributes in **obj** when you are processing localhost files: +The **`obj`** parameter again exposes the metadata and content of the chunk. -.. code:: python +Example: + +.. code-block:: python import lithops def my_map_function(obj): - print(obj.path) - print(obj.part) - print(obj.data_byte_range) - print(obj.chunk_size) + print(obj.path) # Full local file path + print(obj.part) # Chunk number + print(obj.data_byte_range) # Byte range for this chunk + print(obj.chunk_size) # Size of this chunk (in bytes) data = obj.data_stream.read() for line in data.splitlines(): - # Do some process - return partial_intersting_data + # Process each line + pass + + return partial_result def my_reduce_function(results): - for partial_intersting_data in results: - # Do some process + for partial_result in results: + # Aggregate results + pass + return final_result - iterdata = ['/home/user/data/my_file_1.csv', '/home/user/data/my_file_2.csv'] - object_chunk_number= 2 + iterdata = ['/home/user/file1.csv', '/home/user/file2.csv'] + chunk_number = 2 fexec = lithops.FunctionExecutor() fexec.map_reduce(my_map_function, iterdata, my_reduce_function, - obj_chunk_number=object_chunk_number) + obj_chunk_number=chunk_number) result = fexec.get_result() -See a complete example in `map_reduce_localhost.py `_. +πŸ“„ See the full example in: +`map_reduce_localhost.py `_ -Reducer granularity +Reducer Granularity ------------------- -When using the ``map_reduce()`` API call with ``obj_chunk_size`` or ``obj_chunk_number``, by default there will be only one reducer for all the object chunks from all the objects. Alternatively, you can spawn one reducer for each object by setting the parameter ``obj_reduce_by_key=True``. -.. code:: python +When using the :meth:`map_reduce()` API along with `obj_chunk_size` or `obj_chunk_number`, Lithops defaults to using **a single reducer** to aggregate results across **all chunks and objects**. + +If you'd prefer to reduce results **per original object** (e.g., one reducer per file), you can set the parameter `obj_reduce_by_key=True`. + +Example: + +.. code-block:: python fexec.map_reduce(my_map_function, bucket_name, my_reduce_function, - obj_chunk_size=obj_chunk_size, obj_reduce_by_key=True) + obj_chunk_size=obj_chunk_size, + obj_reduce_by_key=True) + + +Elastic Data Processing and Cloud-Optimized Formats +=================================================== + +Lithops is especially powerful for **massively parallel data processing**. When the input to `map()` or `map_reduce()` is a **storage bucket** or a collection of large files, Lithops will automatically: + +- Launch one function per file, or +- Partition large files into chunks and assign each chunk to a different function + +This behavior enables **elastic scaling** that fully utilizes the underlying compute backend. + +Cloud-Optimized Formats +------------------------ + +Lithops is ideally suited for processing **cloud-optimized data formats** such as: + +- **ZARR** +- **COG** (Cloud Optimized GeoTIFF) +- **COPC** (Cloud Optimized Point Clouds) +- **FlatGeoBuf** + +These formats are designed to support **random access via HTTP range requests**, making them a perfect match for cloud object storage and serverless computing. + +By leveraging HTTP range primitives, Lithops enables fast and scalable parallel processing β€” distributing workload across many concurrent function activations, each fetching only the data it needs. This approach takes full advantage of the **high aggregate bandwidth** provided by modern object storage systems. + +Partitioning Non-Optimized Formats with Dataplug +------------------------------------------------- + +Thanks to the `DATAPLUG `_ library, Lithops also supports **on-the-fly partitioning** of data formats that are **not cloud-optimized**. Supported formats include: + +- Genomics: **FASTA**, **FASTQ**, **FASTQ.GZ** +- Metabolomics: **mlMZ** +- Geospatial: **LIDAR (.laz)** + +Dataplug wraps these formats into cloud-native interfaces and exposes partitioning strategies that Lithops can consume directly. + +Example: Parallel Processing of a Cloud-Hosted LIDAR File +---------------------------------------------------------- + +In the example below, we use Dataplug to wrap a COPC (Cloud Optimized Point Cloud) file stored in S3, partition it into spatial chunks, and process each chunk in parallel using Lithops: + +.. code-block:: python + + from dataplug import CloudObject + from dataplug.formats.geospatial.copc import CloudOptimizedPointCloud, square_split_strategy + import laspy + import lithops + + # Function to process each LiDAR slice + def process_lidar_slice(data_slice): + las_data = data_slice.get() + lidar_file = laspy.open(las_data) + ... + + # Load the COPC file from S3 using Dataplug + co = CloudObject.from_s3( + CloudOptimizedPointCloud, + "s3://geospatial/copc/CA_YosemiteNP_2019/USGS_LPC_CA_YosemiteNP_2019_D19_11SKB6892.laz", + s3_config=local_minio, + ) + + # Partition the point cloud into 9 spatial chunks + slices = co.partition(square_split_strategy, num_chunks=9) + + # Process slices in parallel using Lithops + with lithops.FunctionExecutor() as executor: + futures = executor.map(process_lidar_slice, slices) + results = executor.get_result(futures) + +This enables truly **elastic and serverless geospatial processing pipelines**, with no infrastructure overhead and full cloud-native efficiency. diff --git a/docs/user_guide.md b/docs/user_guide.md deleted file mode 100644 index 7982f864a..000000000 --- a/docs/user_guide.md +++ /dev/null @@ -1,40 +0,0 @@ -# User Guide - -1. [Lithops design overview](source/design.rst) - -1. [Supported Clouds](source/supported_clouds.rst) - -1. [Execution Modes](source/execution_modes.rst) - -1. High-level Compute and Storage APIs - - [Futures API](api_futures.md) - - [Multiprocessing API](source/api_multiprocessing.rst) - - [Storage API](api_storage.md) - - [Storage OS API](source/api_storage_os.rst) - -1. [Lithops Monitoring](source/monitoring.rst) - -1. [Functions design and parameters](source/functions.md) - - [Reserved parameters](source/functions.md#reserved-parameters) - - [Parameters format for a *single* call](source/functions.md#parameters-in-the-call_async-method) - - [Parameters format for a *map* call](source/functions.md#parameters-in-the-map-and-map_reduce-methods) - - [Common parameters across functions](source/functions.md#common-parameters-across-functions-invocations) - -1. [Distributed shared objects across function activations](source/dso.rst) - -1. [Distributed Scikit-learn / Joblib](source/sklearn_joblib.rst) - -1. [Lithops for big data analytics](source/data_processing.rst) - - [Processing data from a cloud object store](source/data_processing.rst#processing-data-from-a-cloud-object-storage-service) - - [Processing data from public URLs](source/data_processing.rst#processing-data-from-public-urls) - - [Processing data from localhost files](source/data_processing.rst#processing-data-from-localhost-files) - -1. [Run Lithops on Jupyter notebooks](../examples/hello_world.ipynb) - -1. [Execute Airflow workflows using Lithops](https://github.com/lithops-cloud/airflow-plugin) - -1. [Lithops end-to-end Applications](https://github.com/lithops-cloud/applications) - -1. [Build and manage custom runtimes to run the functions](../runtime/) - -1. [Command Line Tool](source/cli.rst) \ No newline at end of file diff --git a/lithops/retries.py b/lithops/retries.py index 20af85254..f22b676af 100644 --- a/lithops/retries.py +++ b/lithops/retries.py @@ -29,7 +29,11 @@ class RetryingFuture: """ - A wrapper around `ResponseFuture` that takes care of retries. + A wrapper around `ResponseFuture` that adds retry capabilities. + + This class is used internally by Lithops to handle retry logic for + failed function executions. It allows retrying a map function with + the same input upon failure, up to a specified number of times. """ def __init__( @@ -40,6 +44,15 @@ def __init__( retries: Optional[int] = None, **kwargs ): + """ + Initialize a RetryingFuture. + + :param response_future: The initial ResponseFuture object. + :param map_function: The function to retry on failure. + :param input: The input data for the map function. + :param retries: Maximum number of retry attempts. + :param kwargs: Additional arguments to pass to the map function. + """ self.response_future = response_future self.map_function = map_function self.input = input @@ -49,12 +62,25 @@ def __init__( self.cancelled = False def _inc_failure_count(self): + """ + Increment the internal failure counter. + """ self.failure_count += 1 def _should_retry(self): + """ + Determine whether another retry attempt should be made. + + :return: True if retry is allowed, False otherwise. + """ return not self.cancelled and self.failure_count <= self.retries def _retry(self, function_executor: FunctionExecutor): + """ + Re-submit the map function using the provided FunctionExecutor. + + :param function_executor: An instance of FunctionExecutor to resubmit the job. + """ inputs = [self.input] futures_list = function_executor.map( self.map_function, inputs, **self.map_kwargs @@ -62,23 +88,45 @@ def _retry(self, function_executor: FunctionExecutor): self.response_future = futures_list[0] def cancel(self): - # cancelling will prevent any further retries, but won't affect any running tasks + """ + Cancel any future retries. This does not cancel any running tasks. + """ self.cancelled = True @property def done(self): + """ + Check if the function execution is complete. + + :return: True if the execution is done, False otherwise. + """ return self.response_future.done @property def error(self): + """ + Get the error from the function execution, if any. + + :return: An exception or error message if an error occurred. + """ return self.response_future.error @property def _exception(self): + """ + Get the exception tuple (type, value, traceback) from the execution. + + :return: Exception tuple. + """ return self.response_future._exception @property def stats(self): + """ + Get execution statistics. + + :return: A dictionary containing performance and usage metrics. + """ return self.response_future.stats def status( @@ -87,6 +135,14 @@ def status( internal_storage: Any = None, check_only: bool = False, ): + """ + Return the current status of the function execution. + + :param throw_except: Whether to raise any captured exception. + :param internal_storage: Optional internal storage reference. + :param check_only: If True, only checks status without updating. + :return: Execution status string. + """ stat = self.response_future.status( throw_except=throw_except, internal_storage=internal_storage, @@ -97,6 +153,13 @@ def status( return stat def result(self, throw_except: bool = True, internal_storage: Any = None): + """ + Get the result of the function execution. + + :param throw_except: Whether to raise any captured exception. + :param internal_storage: Optional internal storage reference. + :return: The result of the executed function. + """ res = self.response_future.result( throw_except=throw_except, internal_storage=internal_storage ) @@ -107,17 +170,30 @@ def result(self, throw_except: bool = True, internal_storage: Any = None): class RetryingFunctionExecutor: """ - A wrapper around `FunctionExecutor` that supports retries. + A wrapper around `FunctionExecutor` that adds automatic retry capabilities to function invocations. + This class allows spawning multiple function activations and handling failures by retrying them + according to the configured retry policy. + + It provides the same interface as `FunctionExecutor` for compatibility, with an extra `retries` parameter + in `map()` to control the number of retries per invocation. + + :param executor: An instance of FunctionExecutor (e.g., Localhost, Serverless, or Standalone) """ def __init__(self, executor: FunctionExecutor): self.executor = executor def __enter__(self): + """ + Context manager entry. Delegates to the inner FunctionExecutor. + """ self.executor.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): + """ + Context manager exit. Delegates to the inner FunctionExecutor. + """ self.executor.__exit__(exc_type, exc_value, traceback) def map( @@ -136,6 +212,25 @@ def map( exclude_modules: Optional[List[str]] = [], retries: Optional[int] = None, ) -> List[RetryingFuture]: + """ + Spawn multiple function activations with automatic retry on failure. + + :param map_function: The function to map over the data. + :param map_iterdata: An iterable of input data (e.g., Python list). + :param chunksize: Split map_iterdata in chunks of this size. One worker per chunk. + :param extra_args: Additional arguments to pass to each function. + :param extra_env: Additional environment variables for the function environment. + :param runtime_memory: Memory (in MB) to allocate per function activation. + :param obj_chunk_size: For file processing. Split each object into chunks of this size (in bytes). + :param obj_chunk_number: For file processing. Number of chunks to split each object into. + :param obj_newline: Newline character for line integrity in file partitioning. + :param timeout: Max time per function activation (in seconds). + :param include_modules: Explicitly pickle these dependencies. + :param exclude_modules: Explicitly exclude these modules from pickling. + :param retries: Number of retries for each function activation upon failure. + + :return: A list of RetryingFuture objects, one for each function activation. + """ futures_list = self.executor.map( map_function, map_iterdata, @@ -181,6 +276,20 @@ def wait( wait_dur_sec: Optional[int] = WAIT_DUR_SEC, show_progressbar: Optional[bool] = True, ) -> Tuple[List[RetryingFuture], List[RetryingFuture]]: + """ + Wait for a set of futures to complete, retrying any that fail. + + :param fs: List of RetryingFuture objects to wait on. + :param throw_except: Raise exceptions encountered during execution. + :param return_when: Completion policy. One of: ALWAYS, ANY_COMPLETED, or ALL_COMPLETED. + :param download_results: Whether to download results after completion. + :param timeout: Maximum wait time (in seconds). + :param threadpool_size: Number of threads used for polling. + :param wait_dur_sec: Polling interval (in seconds). + :param show_progressbar: Show progress bar for the wait operation. + + :return: A tuple (done, pending) of lists of RetryingFutures. + """ lookup = {f.response_future: f for f in fs} while True: @@ -205,7 +314,6 @@ def wait( retrying_future._inc_failure_count() if retrying_future._should_retry(): retrying_future._retry(self.executor) - # put back into pending since we are retrying this input retrying_pending.append(retrying_future) lookup[retrying_future.response_future] = retrying_future else: @@ -230,4 +338,16 @@ def clean( clean_fn: Optional[bool] = False, force: Optional[bool] = False ): + """ + Cleans up temporary files and objects related to this executor, including: + - Function packages + - Serialized input/output data + - Cloud objects (if specified) + + :param fs: List of futures to clean. + :param cs: List of cloudobjects to clean. + :param clean_cloudobjects: Whether to delete all cloudobjects created with this executor. + :param clean_fn: Whether to delete cached functions. + :param force: Force cleanup even for unfinished futures. + """ self.executor.clean(fs, cs, clean_cloudobjects, clean_fn, force)