diff --git a/tests/test_cli_run.py b/tests/test_cli_run.py deleted file mode 100644 index 61ecb40..0000000 --- a/tests/test_cli_run.py +++ /dev/null @@ -1,200 +0,0 @@ -"""This does not cover all CLI parameters defined in caper/caper_args.py. -Google Cloud Platform is tested in test_cli_server_client.py. -However, other cloud (aws) and HPCs (slurm/sge/pbs) are not tested. - -In this testing module, 'caper run' is tested with a local backend. - -See test_cli_server_client.py for 'caper server/submit/...'. -We will use gcp (Google Cloud Platform) backend to test server-client -functions. -""" -import json -import os - -import pytest -from autouri import GCSURI - -from caper.cli import main as cli_main -from caper.cromwell_metadata import CromwellMetadata -from caper.wdl_parser import WDLParser - -from .example_wdl import make_directory_with_wdls - - -def test_wrong_subcmd(): - cmd = ['wrong_subcmd'] - with pytest.raises(SystemExit): - cli_main(cmd) - - -@pytest.mark.parametrize( - 'cmd', - [ - ['--docker', '--singularity'], - ['--docker', 'ubuntu:latest', '--singularity'], - ['--docker', '--singularity', 'docker://ubuntu:latest'], - ['--docker', 'ubuntu:latest', '--singularity', 'docker://ubuntu:latest'], - ['--docker', '--soft-glob-output'], - ['--docker', 'ubuntu:latest', '--soft-glob-output'], - ], -) -def test_mutually_exclusive_params(tmp_path, cmd): - make_directory_with_wdls(str(tmp_path)) - - cmd = ['run', str(tmp_path / 'main.wdl')] + cmd - with pytest.raises(ValueError): - cli_main(cmd) - - -@pytest.mark.integration -def test_run(tmp_path, cromwell, womtool, debug_caper): - """Will test most local parameters (run only) here.""" - make_directory_with_wdls(str(tmp_path)) - wdl = tmp_path / 'main.wdl' - inputs = tmp_path / 'inputs.json' - p = WDLParser(str(wdl)) - imports = p.zip_subworkflows(str(tmp_path / 'imports.zip')) - - cmd = ['run'] - cmd += [str(wdl)] - cmd += ['--tmp-dir', str(tmp_path / 'tmp_dir')] - # local (instead of correct Local with capital L) should work. - cmd += ['--backend', 'local'] - cmd += ['--cromwell-stdout', str(tmp_path / 'cromwell_stdout.o')] - cmd += ['--db', 'file'] - cmd += ['--db-timeout', '500000'] - cmd += ['--file-db', str(tmp_path / 'file_db_prefix')] - cmd += ['--max-concurrent-tasks', '2'] - cmd += ['--max-concurrent-workflows', '2'] - cmd += ['--disable-call-caching'] - cmd += ['--soft-glob-output'] - cmd += ['--local-hash-strat', 'path'] - cmd += ['--local-out-dir', str(tmp_path / 'out_dir')] - cmd += ['--inputs', str(inputs)] - cmd += ['--imports', str(imports)] - cmd += ['--ignore-womtool'] - cmd += ['--cromwell', cromwell] - cmd += ['--womtool', womtool] - cmd += ['--java-heap-womtool', '2G'] - cmd += ['--java-heap-run', '2G'] - cmd += ['--max-retries', '1'] - cmd += ['--metadata-output', str(tmp_path / 'metadata.json')] - if debug_caper: - cmd += ['--debug'] - - cli_main(cmd) - - assert (tmp_path / 'tmp_dir').exists() - assert (tmp_path / 'file_db_prefix.lobs').exists() - assert (tmp_path / 'metadata.json').exists() - assert (tmp_path / 'cromwell_stdout.o').exists() - - # test cleanup() on local storage - cm = CromwellMetadata(str(tmp_path / 'metadata.json')) - # check if metadata JSON and workflowRoot dir exists - root_out_dir = cm.data['workflowRoot'] - assert os.path.exists(root_out_dir) and os.path.isdir(root_out_dir) - - # dry-run should not delete anything - cm.cleanup(dry_run=True) - assert os.path.exists(root_out_dir) - - cm.cleanup(dry_run=False) - assert not os.path.exists(root_out_dir) - - -@pytest.mark.google_cloud -@pytest.mark.integration -def test_run_gcp_with_life_sciences_api( - tmp_path, - gcs_root, - ci_prefix, - cromwell, - womtool, - gcp_prj, - gcp_service_account_key_json, - debug_caper, -): - """Test run with Google Cloud Life Sciences API""" - out_gcs_bucket = os.path.join(gcs_root, 'caper_out', ci_prefix) - tmp_gcs_bucket = os.path.join(gcs_root, 'caper_tmp') - - # prepare WDLs and input JSON, imports to be submitted - make_directory_with_wdls(str(tmp_path)) - wdl = tmp_path / 'main.wdl' - inputs = tmp_path / 'inputs.json' - metadata = tmp_path / 'metadata.json' - - cmd = ['run', str(wdl)] - cmd += ['--inputs', str(inputs)] - cmd += ['-m', str(metadata)] - if gcp_service_account_key_json: - cmd += ['--gcp-service-account-key-json', gcp_service_account_key_json] - cmd += ['--gcp-region', 'us-central1'] - cmd += ['--gcp-zones', 'us-west1-a,us-west1-b'] - cmd += ['--gcp-prj', gcp_prj] - cmd += ['--memory-retry-error-keys', 'Killed'] - cmd += ['--memory-retry-multiplier', '1.5'] - cmd += ['--tmp-dir', str(tmp_path / 'tmp_dir')] - cmd += ['--backend', 'gcp'] - cmd += ['--gcp-out-dir', out_gcs_bucket] - cmd += ['--gcp-loc-dir', tmp_gcs_bucket] - cmd += ['--cromwell-stdout', str(tmp_path / 'cromwell_stdout.o')] - # test with file type DB - cmd += ['--db', 'file'] - cmd += ['--db-timeout', '500000'] - cmd += ['--file-db', str(tmp_path / 'file_db_prefix')] - cmd += ['--max-concurrent-tasks', '2'] - cmd += ['--max-concurrent-workflows', '2'] - cmd += ['--disable-call-caching'] - cmd += ['--cromwell', cromwell] - cmd += ['--womtool', womtool] - cmd += ['--java-heap-run', '4G'] - cmd += ['--docker', 'ubuntu:latest'] - if debug_caper: - cmd += ['--debug'] - print(' '.join(cmd)) - - cli_main(cmd) - m_dict = json.loads(metadata.read_text()) - - assert m_dict['status'] == 'Succeeded' - - # test CromwellMetadata.gcp_monitor() here - # since it's for gcp only and this function is one of the two - # test functions ran on a gcp backend. - # task main.t1 has sleep 10 so that monitoring_script has time to - # write monitoring data to `monitoringLog` file - cm = CromwellMetadata(m_dict) - monitor_data = cm.gcp_monitor() - for data in monitor_data: - instance_cpu = data['instance']['cpu'] - instance_mem = data['instance']['mem'] - instance_disk = data['instance']['disk'] - assert instance_cpu >= 1 - assert instance_mem >= 1024 * 1024 * 1024 - assert instance_disk >= 10 * 1024 * 1024 * 1024 - - max_cpu_percent = data['stats']['max']['cpu_pct'] - max_mem = data['stats']['max']['mem'] - max_disk = data['stats']['max']['disk'] - if max_cpu_percent or data['task_name'] == 'main.t1': - assert max_cpu_percent <= 100.0 - if max_mem or data['task_name'] == 'main.t1': - assert max_mem <= instance_mem - if max_disk or data['task_name'] == 'main.t1': - assert max_disk <= instance_disk - - # test cleanup on gcp backend (gs://) - root_out_dir = cm.data['workflowRoot'] - - # remote metadata JSON file on workflow's root output dir. - remote_metadata_json_file = os.path.join(root_out_dir, 'metadata.json') - assert GCSURI(remote_metadata_json_file).exists - - # dry-run should not delete anything - cm.cleanup(dry_run=True) - assert GCSURI(remote_metadata_json_file).exists - - cm.cleanup(dry_run=False) - assert not GCSURI(remote_metadata_json_file).exists diff --git a/tests/test_cli_server_client_gcp.py b/tests/test_cli_server_client_gcp.py deleted file mode 100644 index c2b42e5..0000000 --- a/tests/test_cli_server_client_gcp.py +++ /dev/null @@ -1,143 +0,0 @@ -"""This does not cover all CLI parameters defined in caper/caper_args.py. -gcp (Google Cloud Platform) backend is tested here with server/client functions. -""" -import os -import time - -import pytest -from autouri import AutoURI - -from caper.cli import main as cli_main -from caper.cromwell_rest_api import CromwellRestAPI -from caper.wdl_parser import WDLParser - -from .example_wdl import make_directory_with_wdls - -TIMEOUT_SERVER_SPIN_UP = 500 -TIMEOUT_SERVER_RUN_WORKFLOW = 960 - - -@pytest.mark.google_cloud -@pytest.mark.integration -def test_server_client( - tmp_path, - gcs_root, - ci_prefix, - cromwell, - womtool, - gcp_prj, - gcp_service_account_key_json, - debug_caper, -): - """Test server, client stuffs""" - # server command line - server_port = 8015 - - out_gcs_bucket = os.path.join(gcs_root, 'caper_out', ci_prefix) - tmp_gcs_bucket = os.path.join(gcs_root, 'caper_tmp') - - cmd = ['server'] - cmd += ['--local-loc-dir', str(tmp_path / 'tmp_dir')] - cmd += ['--backend', 'gcp'] - if gcp_service_account_key_json: - cmd += ['--gcp-service-account-key-json', gcp_service_account_key_json] - cmd += ['--gcp-prj', gcp_prj] - cmd += ['--gcp-zones', 'us-west1-a,us-west1-b'] - cmd += ['--gcp-out-dir', out_gcs_bucket] - cmd += ['--gcp-loc-dir', tmp_gcs_bucket] - cmd += ['--cromwell-stdout', str(tmp_path / 'cromwell_stdout.o')] - cmd += ['--db', 'in-memory'] - cmd += ['--db-timeout', '500000'] - cmd += ['--file-db', str(tmp_path / 'file_db_prefix')] - cmd += ['--max-concurrent-tasks', '2'] - cmd += ['--max-concurrent-workflows', '2'] - cmd += ['--disable-call-caching'] - cmd += ['--local-hash-strat', 'path'] - cmd += ['--local-out-dir', str(tmp_path / 'out_dir')] - cmd += ['--cromwell', cromwell] - cmd += ['--java-heap-server', '8G'] - cmd += ['--port', str(server_port)] - if debug_caper: - cmd += ['--debug'] - print(' '.join(cmd)) - - try: - th = cli_main(cmd, nonblocking_server=True) - - # wait until server is ready to take submissions - t_start = time.time() - while th.status is None: - time.sleep(1) - if time.time() - t_start > TIMEOUT_SERVER_SPIN_UP: - raise TimeoutError('Timed out waiting for Cromwell server spin-up.') - - # prepare WDLs and input JSON, imports to be submitted - make_directory_with_wdls(str(tmp_path)) - wdl = tmp_path / 'main.wdl' - inputs = tmp_path / 'inputs.json' - p = WDLParser(str(wdl)) - imports = p.zip_subworkflows(str(tmp_path / 'imports.zip')) - - # test "submit" with on_hold - cmd = ['submit', str(wdl)] - if gcp_service_account_key_json: - cmd += ['--gcp-service-account-key-json', gcp_service_account_key_json] - cmd += ['--port', str(server_port)] - cmd += ['--inputs', str(inputs)] - cmd += ['--imports', str(imports)] - cmd += ['--gcp-zones', 'us-west1-a,us-west1-b'] - cmd += ['--gcp-loc-dir', tmp_gcs_bucket] - cmd += ['--ignore-womtool'] - cmd += ['--java-heap-womtool', '2G'] - cmd += ['--max-retries', '1'] - cmd += ['--docker', 'ubuntu:latest'] - cmd += ['--backend', 'gcp'] - cmd += ['--hold'] - if debug_caper: - cmd += ['--debug'] - cli_main(cmd) - - time.sleep(10) - - # find workflow ID - cra = CromwellRestAPI(hostname='localhost', port=server_port) - workflow_id = cra.find(['*'])[0]['id'] - - m = cra.get_metadata([workflow_id])[0] - assert m['status'] == 'On Hold' - - # unhold it - cmd = ['unhold', workflow_id] - cmd += ['--port', str(server_port)] - cli_main(cmd) - - time.sleep(5) - - m = cra.get_metadata([workflow_id])[0] - assert m['status'] in ('Submitted', 'Running') - - t_start = time.time() - while True: - time.sleep(5) - m = cra.get_metadata([workflow_id])[0] - workflow_root = m.get('workflowRoot') - if workflow_root: - metadata_json_file = os.path.join(workflow_root, 'metadata.json') - else: - metadata_json_file = None - print('polling: ', workflow_id, m['status'], metadata_json_file) - - if m['status'] in ('Failed', 'Succeeded'): - if AutoURI(metadata_json_file).exists: - break - elif metadata_json_file: - assert not AutoURI(metadata_json_file).exists - - if time.time() - t_start > TIMEOUT_SERVER_RUN_WORKFLOW: - raise TimeoutError('Timed out waiting for workflow being done.') - - finally: - # all done. so stop the server - if th: - th.stop() - th.join() diff --git a/tests/test_nb_subproc_thread.py b/tests/test_nb_subproc_thread.py deleted file mode 100644 index 412a3d0..0000000 --- a/tests/test_nb_subproc_thread.py +++ /dev/null @@ -1,117 +0,0 @@ -import os -import time - -import pytest - -from caper.nb_subproc_thread import NBSubprocThread - -SH_CONTENTS = """#!/bin/bash - -echoerr() { echo "$@" 1>&2; } - -NUM=$1 -if [ -z "$NUM" ] -then - NUM=10 -fi - -echo $NUM - -# NUM kitties (1 kitty per sec) -for i in $(seq 1 $NUM) -do - echo "hello kitty $i-1. (STDOUT)" - sleep 0.25 - echoerr "hello kitty $i-1. (STDERR)" - sleep 0.25 - echoerr "hello kitty $i-2. (STDERR)" - sleep 0.25 - echo "hello kitty $i-2. (STDOUT)" - sleep 0.25 -done - -exit 10 -""" - - -def on_stdout(stdout): - print('captured stdout:', stdout) - assert stdout.endswith('\n') - - -def on_stderr(stderr): - print('captured stderr:', stderr) - assert stderr.endswith('\n') - - -def on_poll(): - print('polling') - - -def on_finish(): - return 'done' - - -def test_nb_subproc_thread(tmp_path): - sh = tmp_path / 'test.sh' - sh.write_text(SH_CONTENTS) - - th = NBSubprocThread( - args=['bash', str(sh)], - on_poll=on_poll, - on_stdout=on_stdout, - on_stderr=on_stderr, - on_finish=on_finish, - poll_interval=0.1, - ) - assert th.returnvalue is None - assert not th.is_alive() - th.start() - assert th.is_alive() - # rc is None while running - assert th.returncode is None - th.join() - assert th.returncode == 10 - assert th.returnvalue == 'done' - - -def test_nb_subproc_thread_stopped(tmp_path): - sh = tmp_path / 'test.sh' - sh.write_text(SH_CONTENTS) - - th = NBSubprocThread(args=['bash', str(sh)], on_stdout=on_stdout) - th.start() - time.sleep(2) - assert th.is_alive() - th.stop(wait=True) - assert not th.is_alive() - # rc should be is None if terminated - assert th.returncode is not None - # subprocess is terminated until it reaches kitty 4 (4 sec > 2 sec). - assert 'hello kitty 4' not in th.stderr - - -def test_nb_subproc_thread_nonzero_rc(): - for rc in range(10): - th = NBSubprocThread( - args=['bash', '-c', 'exit {rc}'.format(rc=rc)], on_stderr=on_stderr - ) - th.start() - th.join() - assert th.returncode == rc - - -@pytest.mark.parametrize('test_app,expected_rc', [('cat', 1), ('ls', 2), ('java', 1)]) -def test_nb_subproc_thread_nonzero_rc_for_real_apps(test_app, expected_rc): - test_str = 'asdfasf-10190212-zxcv' - if os.path.exists(test_str): - raise ValueError('Test string should not be an existing file.') - - th = NBSubprocThread( - args=[test_app, test_str], on_stdout=on_stdout, on_stderr=on_stderr - ) - th.start() - th.join() - assert th.returncode == expected_rc - assert test_str in th.stderr - assert th.stdout == '' diff --git a/tests/test_resource_analysis.py b/tests/test_resource_analysis.py deleted file mode 100644 index 44ffa47..0000000 --- a/tests/test_resource_analysis.py +++ /dev/null @@ -1,86 +0,0 @@ -"""Test is based on a metadata JSON file generated from -running atac-seq-pipeline v1.8.0 with the following input JSON. -gs://encode-pipeline-test-samples/encode-atac-seq-pipeline/ENCSR356KRQ_subsampled_caper.json -""" - -import pytest - -from caper.resource_analysis import LinearResourceAnalysis, ResourceAnalysis - - -def test_resource_analysis_abstract_class(gcp_res_analysis_metadata): - with pytest.raises(TypeError): - # abstract base-class - ResourceAnalysis() - - -def test_resource_analysis_analyze_task(gcp_res_analysis_metadata): - analysis = LinearResourceAnalysis() - analysis.collect_resource_data([gcp_res_analysis_metadata]) - - result_align1 = analysis.analyze_task( - 'atac.align', - in_file_vars=['fastqs_R1'], - reduce_in_file_vars=None, - target_resources=['stats.max.mem', 'stats.mean.cpu_pct'], - ) - assert result_align1['x'] == {'fastqs_R1': [15643136, 18963919]} - assert 'stats.mean.cpu_pct' in result_align1['y'] - assert 'stats.max.mem' in result_align1['y'] - assert 'stats.max.disk' not in result_align1['y'] - assert list(result_align1['y'].keys()) == list(result_align1['coeffs'].keys()) - assert result_align1['coeffs']['stats.mean.cpu_pct'][0][0] == pytest.approx( - 1.6844513715565233e-06 - ) - assert result_align1['coeffs']['stats.mean.cpu_pct'][1] == pytest.approx( - 42.28561239506905 - ) - assert result_align1['coeffs']['stats.max.mem'][0][0] == pytest.approx( - 48.91222341236991 - ) - assert result_align1['coeffs']['stats.max.mem'][1] == pytest.approx( - 124314029.09791338 - ) - - result_align2 = analysis.analyze_task( - 'atac.align', in_file_vars=['fastqs_R2'], reduce_in_file_vars=sum - ) - assert result_align2['x'] == {'sum(fastqs_R2)': [16495088, 20184668]} - assert 'stats.mean.cpu_pct' not in result_align2['y'] - assert 'stats.max.mem' in result_align2['y'] - assert 'stats.max.disk' in result_align2['y'] - assert list(result_align2['y'].keys()) == list(result_align2['coeffs'].keys()) - - result_align_star = analysis.analyze_task('atac.align*', reduce_in_file_vars=max) - assert result_align_star['x'] == { - 'max(chrsz,fastqs_R1,fastqs_R2,idx_tar,tmp_fastqs)': [ - 32138224, - 39148587, - 3749246230, - 3749246230, - ] - } - - -def test_resource_analysis_analyze(gcp_res_analysis_metadata): - """Test method analyze() which analyze all tasks defined in in_file_vars.""" - analysis = LinearResourceAnalysis() - analysis.collect_resource_data([gcp_res_analysis_metadata]) - - result = analysis.analyze( - in_file_vars={ - 'atac.align*': ['fastqs_R1', 'fastqs_R2'], - 'atac.filter*': ['bam'], - } - ) - assert len(result) == 2 - assert result['atac.align*']['x'] == { - 'sum(fastqs_R1,fastqs_R2)': [32138224, 39148587, 32138224, 39148587] - } - assert result['atac.filter*']['x'] == { - 'sum(bam)': [61315022, 76789196, 61315022, 76789196] - } - - result_all = analysis.analyze() - # 38 tasks in total - assert len(result_all) == 38