1+ import json
12import logging
23import os
3- import pickle
44import shutil
55import subprocess
66import sys
7- import tempfile
8- import zipfile
97from pathlib import Path
8+ from typing import Dict
109
1110logging .basicConfig (level = logging .INFO )
1211logger = logging .getLogger ("osparc-python-main" )
1312
1413
1514ENVIRONS = ["INPUT_FOLDER" , "OUTPUT_FOLDER" ]
1615try :
17- input_dir , output_dir = [Path (os .environ [v ]) for v in ENVIRONS ]
16+ INPUT_FOLDER , OUTPUT_FOLDER = [Path (os .environ [v ]) for v in ENVIRONS ]
1817except KeyError :
1918 raise ValueError ("Required env vars {ENVIRONS} were not set" )
2019
21- # TODO: sync with schema in metadata!!
22- OUTPUT_FILE = "output_data.zip"
20+ # NOTE: sync with schema in metadata!!
21+ NUM_INPUTS = 5
22+ NUM_OUTPUTS = 4
23+ OUTPUT_SUBFOLDER_ENV_TEMPLATE = "OUTPUT_{}"
24+ OUTPUT_SUBFOLDER_TEMPLATE = "output_{}"
25+ OUTPUT_FILE_TEMPLATE = "output_{}.zip"
2326
24- def copy (src , dest ):
25- try :
26- src , dest = str (src ), str (dest )
27- shutil .copytree (
28- src , dest , ignore = shutil .ignore_patterns ("*.zip" , "__pycache__" , ".*" )
29- )
30- except OSError as err :
31- # If the error was caused because the source wasn't a directory
32- if err .errno == shutil .errno .ENOTDIR :
33- shutil .copy (src , dest )
34- else :
35- logger .error ("Directory not copied. Error: %s" , err )
36-
37-
38- def clean_dir (dirpath : Path ):
39- for root , dirs , files in os .walk (dirpath ):
40- for f in files :
41- os .unlink (os .path .join (root , f ))
42- for d in dirs :
43- shutil .rmtree (os .path .join (root , d ))
44-
45-
46- def run_cmd (cmd : str ):
47- subprocess .run (cmd .split (), shell = False , check = True , cwd = input_dir )
48- # TODO: deal with stdout, log? and error??
4927
50-
51- def unzip_dir (parent : Path ):
52- for filepath in list (parent .rglob ("*.zip" )):
53- if zipfile .is_zipfile (filepath ):
54- with zipfile .ZipFile (filepath ) as zf :
55- zf .extractall (filepath .parent )
56-
57-
58- def zipdir (dirpath : Path , ziph : zipfile .ZipFile ):
59- """ Zips directory and archives files relative to dirpath
60- """
61- for root , dirs , files in os .walk (dirpath ):
62- for filename in files :
63- filepath = os .path .join (root , filename )
64- ziph .write (filepath , arcname = os .path .relpath (filepath , dirpath ))
65- dirs [:] = [name for name in dirs if not name .startswith ("." )]
66-
67-
68- def ensure_main_entrypoint (code_dir : Path ) -> Path :
28+ def _find_user_code_entrypoint (code_dir : Path ) -> Path :
29+ logger .info ("Searching for script main entrypoint ..." )
6930 code_files = list (code_dir .rglob ("*.py" ))
7031
7132 if not code_files :
@@ -79,10 +40,12 @@ def ensure_main_entrypoint(code_dir: Path) -> Path:
7940 raise ValueError (f"Many entrypoints found: { code_files } " )
8041
8142 main_py = code_files [0 ]
43+ logger .info ("Found %s as main entrypoint" , main_py )
8244 return main_py
8345
8446
85- def ensure_requirements (code_dir : Path ) -> Path :
47+ def _ensure_pip_requirements (code_dir : Path ) -> Path :
48+ logger .info ("Searching for requirements file ..." )
8649 requirements = list (code_dir .rglob ("requirements.txt" ))
8750 if len (requirements ) > 1 :
8851 raise ValueError (f"Many requirements found: { requirements } " )
@@ -91,40 +54,63 @@ def ensure_requirements(code_dir: Path) -> Path:
9154 # deduce requirements using pipreqs
9255 logger .info ("Not found. Recreating requirements ..." )
9356 requirements = code_dir / "requirements.txt"
94- run_cmd (f"pipreqs --savepath={ requirements } --force { code_dir } " )
57+ subprocess .run (
58+ f"pipreqs --savepath={ requirements } --force { code_dir } " .split (),
59+ shell = False ,
60+ check = True ,
61+ cwd = INPUT_FOLDER ,
62+ )
9563
9664 # TODO log subprocess.run
9765
9866 else :
9967 requirements = requirements [0 ]
68+ logger .info (f"Found: { requirements } " )
10069 return requirements
10170
10271
103- def setup ():
104- logger .info ("Cleaning output ..." )
105- clean_dir (output_dir )
106-
107- # TODO: snapshot_before = list(input_dir.rglob("*"))
108-
109- # NOTE The inputs defined in ${INPUT_FOLDER}/inputs.json are available as env variables by their key in capital letters
110- # For example: input_1 -> $INPUT_1
111- #
112-
113- logger .info ("Processing input ..." )
114- unzip_dir (input_dir )
72+ # TODO: Next version of integration will take care of this and maybe the ENVs as well
73+ def _ensure_output_subfolders_exist () -> Dict [str , str ]:
74+ output_envs = {}
75+ for n in range (1 , NUM_OUTPUTS + 1 ):
76+ output_sub_folder_env = f"OUTPUT_{ n } "
77+ output_sub_folder = OUTPUT_FOLDER / OUTPUT_SUBFOLDER_TEMPLATE .format (n )
78+ # NOTE: exist_ok for forward compatibility in case they are already created
79+ output_sub_folder .mkdir (parents = True , exist_ok = True )
80+ output_envs [output_sub_folder_env ] = f"{ output_sub_folder } "
81+ logger .info (
82+ "Output ENVs available: %s" ,
83+ json .dumps (output_envs , indent = 2 ),
84+ )
85+ return output_envs
86+
87+
88+ def _ensure_input_environment () -> Dict [str , str ]:
89+ input_envs = {
90+ f"INPUT_{ n } " : os .environ [f"INPUT_{ n } " ] for n in range (1 , NUM_INPUTS + 1 )
91+ }
92+ logger .info (
93+ "Input ENVs available: %s" ,
94+ json .dumps (input_envs , indent = 2 ),
95+ )
96+ return input_envs
11597
116- # logger.info("Copying input to output ...")
117- # copy(input_dir, code_dir)
11898
119- logger .info ("Searching main entrypoint ..." )
120- main_py = ensure_main_entrypoint (input_dir )
121- logger .info ("Found %s as main entrypoint" , main_py )
99+ def setup ():
100+ input_envs = _ensure_input_environment ()
101+ output_envs = _ensure_output_subfolders_exist ()
102+ logger .info ("Available data:" )
103+ os .system ("ls -tlah" )
122104
123- logger . info ( "Searching requirements ..." )
124- requirements_txt = ensure_requirements ( input_dir )
105+ user_code_entrypoint = _find_user_code_entrypoint ( INPUT_FOLDER )
106+ requirements_txt = _ensure_pip_requirements ( INPUT_FOLDER )
125107
126108 logger .info ("Preparing launch script ..." )
127109 venv_dir = Path .home () / ".venv"
110+ bash_input_env_export = [f"export { env } ={ path } " for env , path in input_envs .items ()]
111+ bash_output_env_export = [
112+ f"export { env } ='{ path } '" for env , path in output_envs .items ()
113+ ]
128114 script = [
129115 "#!/bin/sh" ,
130116 "set -o errexit" ,
@@ -134,40 +120,30 @@ def setup():
134120 f'python3 -m venv --system-site-packages --symlinks --upgrade "{ venv_dir } "' ,
135121 f'"{ venv_dir } /bin/pip" install -U pip wheel setuptools' ,
136122 f'"{ venv_dir } /bin/pip" install -r "{ requirements_txt } "' ,
137- f'echo "Executing code { main_py .name } ..."' ,
138- f'"{ venv_dir } /bin/python3" "{ main_py } "' ,
123+ "\n " .join (bash_input_env_export ),
124+ "\n " .join (bash_output_env_export ),
125+ f'echo "Executing code { user_code_entrypoint .name } ..."' ,
126+ f'"{ venv_dir } /bin/python3" "{ user_code_entrypoint } "' ,
139127 'echo "DONE ..."' ,
140128 ]
141129 main_script_path = Path ("main.sh" )
142- with main_script_path .open ("w" ) as fp :
143- for line in script :
144- print (f"{ line } \n " , file = fp )
145-
146- # # TODO: take snapshot
147- # logger.info("Creating virtual environment ...")
148- # run_cmd("python3 -m venv --system-site-packages --symlinks --upgrade .venv")
149- # run_cmd(".venv/bin/pip install -U pip wheel setuptools")
150- # run_cmd(f".venv/bin/pip install -r {requirements}")
151-
152- # # TODO: take snapshot
153- # logger.info("Executing code ...")
154- # run_cmd(f".venv/bin/python3 {main_py}")
130+ main_script_path .write_text ("\n " .join (script ))
155131
156132
157133def teardown ():
158- logger .info ("Zipping output . ..." )
159-
160- # TODO: sync zipped name with docker/labels/outputs.json
161- with tempfile . TemporaryDirectory () as tmpdir :
162- zipped_file = Path ( f" { tmpdir } / { OUTPUT_FILE } " )
163- with zipfile . ZipFile ( str ( zipped_file ), "w" , zipfile . ZIP_DEFLATED ) as zh :
164- zipdir ( output_dir , zh )
165-
166- logger . info ( "Cleaning output" )
167- clean_dir ( output_dir )
168-
169- logger .info ("Moving %s" , zipped_file . name )
170- shutil . move ( str ( zipped_file ), str ( output_dir ) )
134+ logger .info ("Zipping output..." )
135+ for n in range ( 1 , NUM_OUTPUTS + 1 ):
136+ output_path = OUTPUT_FOLDER / f"output_ { n } "
137+ archive_file_path = OUTPUT_FOLDER / OUTPUT_FILE_TEMPLATE . format ( n )
138+ logger . info ( "Zipping %s into %s..." , output_path , archive_file_path )
139+ shutil . make_archive (
140+ f" { ( archive_file_path . parent / archive_file_path . stem ) } " ,
141+ format = "zip" ,
142+ root_dir = output_path ,
143+ logger = logger ,
144+ )
145+ logger .info ("Zipping %s into %s done " , output_path , archive_file_path )
146+ logger . info ( "Zipping done." )
171147
172148
173149if __name__ == "__main__" :
0 commit comments