diff --git a/docs/How-to-run-CLI-Usage.md b/docs/How-to-run-CLI-Usage.md index e76f2eb..b7edb47 100644 --- a/docs/How-to-run-CLI-Usage.md +++ b/docs/How-to-run-CLI-Usage.md @@ -22,6 +22,8 @@ The following main commands are currently implemented: - [`discovery`](./How-to-run-discover-measured-patterns.md): discover measured patterns within a project source code - [`manual-discovery`](./How-to-run-manual-discovery.md): execute discovery rules (normally associated to patterns) within a project source code - reporting: create reports about SAST measurement and/or pattern discovery (**TODO**) +- [`checkdiscoveryrules`](./How-to-run-checkdiscoveryrules.md): Check/test the discovery rules of the pattern instances on the pattern instances themselves. +- [`patternrepair`](./How-to-run-patternrepair.md): Helps you keeping your pattern catalogue nice and tidy. The following are under-investigation: diff --git a/docs/How-to-run-Measure-SAST-tools-over-patterns.md b/docs/How-to-run-Measure-SAST-tools-over-patterns.md index 57bf5dd..43c66bf 100644 --- a/docs/How-to-run-Measure-SAST-tools-over-patterns.md +++ b/docs/How-to-run-Measure-SAST-tools-over-patterns.md @@ -52,6 +52,10 @@ Instead of specifying certain pattern ids, you can use `-a`. ] ``` +The value in `result` is a boolean value, where 'true' signifies the tool's correct output matching the expected result, while 'false' indicates an incorrect outcome. +For example, if it is expected, that the pattern does not contain a vulnerability, ('expectation': false) and the tool result is 'no vulnerability', than the value of `result` will be true. +If it is expected that the pattern contains a vulnerability ('expectation': true) and the tool does not detect that, the `result` field will be false. + ## Example Here a simple example that will measure patterns 1, 2, 4 and 7 from the PHP catalog with 3 workers: diff --git a/docs/How-to-run-checkdiscoveryrules.md b/docs/How-to-run-checkdiscoveryrules.md new file mode 100644 index 0000000..48c2e2e --- /dev/null +++ b/docs/How-to-run-checkdiscoveryrules.md @@ -0,0 +1,43 @@ +# How to run: Checkdiscoveryrules + +## Overview + +This commands allows to run the discovery rule on the pattern instance itself. + +## Command line + +To check discovery rules on your pattern run: + +```bash +tpframework checkdiscoveryrules --help +usage: tpframework [OPTIONS] COMMAND checkdiscoveryrules [-h] (--print | --export EXPORTFILE) -l LANGUAGE (-p PATTERN_ID [PATTERN_ID ...] | --pattern-range RANGE_START-RANGE_END | -a) + [--tp-lib TP_LIB_DIR] [-s NUMBER] [--output-dir OUTPUT_DIR] + +options: + -h, --help show this help message and exit + --print Print measurements on stdout. + --export EXPORTFILE Export measurements to the specified csv file. + -l LANGUAGE, --language LANGUAGE + Programming language targeted + -p PATTERN_ID [PATTERN_ID ...], --patterns PATTERN_ID [PATTERN_ID ...] + Specify pattern(s) ID(s) to test for discovery + --pattern-range RANGE_START-RANGE_END + Specify pattern ID range separated by`-` (ex. 10-50) + -a, --all-patterns Test discovery for all available patterns + --tp-lib TP_LIB_DIR Absolute path to alternative pattern library, default resolves to `./testability_patterns` + -s NUMBER, --timeout NUMBER + Timeout for CPG generation + --output-dir OUTPUT_DIR + Absolute path to the folder where outcomes (e.g., log file, export file if any) will be stored, default resolves to `./out` +``` + +## Example + +Here a simple example that will run checkdiscoveryrules on the first PHP pattern and print the results to the cmd. +`tpframework checkdiscoveryrules -p 1 -l php --print` + +Note: Minimum requirement for this command is a pattern, a language and either `--print` or `--export`. + +## Required fields in instance `json` metadata + +The explanation for the instance `json` metadata can be found [here](https://github.com/testable-eu/sast-testability-patterns/blob/master/docs/testability-patterns-structure.md) \ No newline at end of file diff --git a/docs/How-to-run-patternrepair.md b/docs/How-to-run-patternrepair.md new file mode 100644 index 0000000..418b6cc --- /dev/null +++ b/docs/How-to-run-patternrepair.md @@ -0,0 +1,62 @@ +# How to run: Patternrepair + +## Overview + +This commands should help you reviewing your testability catalogue and keep it nice and tidy. +It might also help you, repair patterns, that are broken. + +## Command line + +To start repairing/reviewing your patterns runs: + +```bash +tpframework patternrepair --help +usage: tpframework [OPTIONS] COMMAND patternrepair [-h] -l LANGUAGE (-p PATTERN_ID [PATTERN_ID ...] | --pattern-range RANGE_START-RANGE_END | -a) [--tp-lib TP_LIB_DIR] + [--output-dir OUTPUT_DIR] [--masking-file MASKING_FILE] [--measurement-results MEASUREMENT_DIR] + [--checkdiscoveryrules-results CHECKDISCOVERYRULES_FILE] [--skip-readme] + +options: + -h, --help show this help message and exit + -l LANGUAGE, --language LANGUAGE + Programming language targeted + -p PATTERN_ID [PATTERN_ID ...], --patterns PATTERN_ID [PATTERN_ID ...] + Specify pattern(s) ID(s) to test for discovery + --pattern-range RANGE_START-RANGE_END + Specify pattern ID range separated by`-` (ex. 10-50) + -a, --all-patterns Test discovery for all available patterns + --tp-lib TP_LIB_DIR Absolute path to alternative pattern library, default resolves to `./testability_patterns` + --output-dir OUTPUT_DIR + Absolute path to the folder where outcomes (e.g., log file, export file if any) will be stored, default resolves to `./out` + --masking-file MASKING_FILE + Absolute path to a json file, that contains a mapping, if the name for some measurement tools should be kept secret, default is None + --measurement-results MEASUREMENT_DIR + Absolute path to the folder where measurement results are stored, default resolves to `./measurements` + --checkdiscoveryrules-results CHECKDISCOVERYRULES_FILE + Absolute path to the csv file, where the results of the `checkdiscoveryrules` command are stored, default resolves to `./checkdiscoveryrules.csv` + --skip-readme If set, the README generation is skipped. +``` + +Note: At the moment only `patternrepair` for PHP is supported. Support your own language by writing an `InstanceRepair` class, that inherits from `InstanceRepair`. + +The `patternrepair` enforces the pattern structure as described [here](https://github.com/testable-eu/sast-testability-patterns/blob/master/docs/testability-patterns-structure.md). +To do so, it is seperated into different steps: + +- `PatternRepair`: This will check the pattern JSON file, correct the references to the instance json files. +- `InstanceRepair`: This will check and correct the instance json file for each instance. At the moment, only PHP patterns are supported. + - It generates opcode for every PHP file. + - It checks for the comments `// source` and `// sink` in the file in order to fill in the source and sink line in the correspoding instance json file. +- `READMEGenerator`: This creates a README file for a pattern based on the JSON files. If you want to skip the generation of the README file, use the `--skip-readme` flag. As the README includes results of `measure` and `checkdiscoveryresults`, valid filepaths for these must be provided, when generating a README file. + +## Example + +Note: Minimum requirement for this command is a pattern and a language. + +### Example 1 + +Here a simple example that will run patternrepair on the first PHP pattern without generating a new README file for that pattern. +`tpframework patternrepair -p 1 -l php --skip-readme` + +### Example 2 + +Here an example for a patternrepair, that repairs all php patterns and generates a new readme for each pattern. +`tpframework patternrepair -a -l php --measurement-results ./your_measurement_results --checkdiscoveryrules-results ./your_results.csv` diff --git a/qualitytests/patternrepair/__init__.py b/qualitytests/patternrepair/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/qualitytests/patternrepair/test_markdown.py b/qualitytests/patternrepair/test_markdown.py new file mode 100644 index 0000000..616e699 --- /dev/null +++ b/qualitytests/patternrepair/test_markdown.py @@ -0,0 +1,80 @@ +import pytest + +from qualitytests.qualitytests_utils import join_resources_path + +from pattern_repair.README_markdown_elements import * +from pattern_repair.README_generator import READMEGenerator + + +class TestMarkdownPatternRepair: + + def test_markdown_code(self): + code = MarkdownCode('\n\nMore\n\n\nHello\n\n\n' == coll.to_markdown() + + def test_markdown_string(self): + s = MarkdownString("Test") + assert "\nTest\n" == s.to_markdown() + + def test_markdown_link(self): + link = MarkdownLink("Test", MarkdownHeading("Heading 1", 3)) + assert "[Test](#heading-1)" == link.to_markdown() + + def test_markdown_table(self): + test_content = {"0::column1": ["value1", "value1.1"], "column2": ["value2"]} + tab = MarkdownTable(test_content) + expected_tab = "\n| column1 | column2 |\n" + expected_tab += "|-----------|-----------|\n" + expected_tab += "| value1 | value2 |\n" + expected_tab += "| value1.1 | |\n" + assert expected_tab == tab.to_markdown() + + def test_markdown_document(self): + coll = MarkdownCollapsible([MarkdownString("Hello")], MarkdownString("More")) + doc = MarkdownDocument([coll]) + assert '
\n\nMore\n\nHello\n\n
\n' == doc.to_markdown() + + def test_README_generation_one_instance(self): + path_to_test_pattern = join_resources_path("sample_patlib/PHP/2_global_variables") + path_to_tplib = join_resources_path("sample_patlib") + instance_jsons = [path_to_test_pattern / "1_instance_2_global_variables" / "1_instance_2_global_variables.json"] + md_doc = READMEGenerator(path_to_test_pattern, 'php', path_to_tplib, instance_jsons)._generate_README_elements() + + assert 14 == len(md_doc.content) + assert isinstance(md_doc.content[0], MarkdownComment) + assert isinstance(md_doc.content[1], MarkdownHeading) # Global Variables + assert isinstance(md_doc.content[2], MarkdownString) # Tags: ... + assert isinstance(md_doc.content[3], MarkdownString) # Version: ... + assert isinstance(md_doc.content[4], MarkdownHeading) # Description + assert isinstance(md_doc.content[5], MarkdownString) # + assert isinstance(md_doc.content[6], MarkdownHeading) # Overview + assert isinstance(md_doc.content[7], MarkdownTable) # + assert isinstance(md_doc.content[8], MarkdownHeading) # Instance 1 + assert isinstance(md_doc.content[9], MarkdownHeading) # Code + assert isinstance(md_doc.content[10], MarkdownCode) # + assert isinstance(md_doc.content[11], MarkdownHeading) # Instance Properties + assert isinstance(md_doc.content[12], MarkdownTable) # + assert isinstance(md_doc.content[13], MarkdownCollapsible) # More + + assert 2 == len(md_doc.content[13].content) + assert isinstance(md_doc.content[13].content[0], MarkdownCollapsible) # Compile + assert 1 == len(md_doc.content[13].content[0].content) + assert isinstance(md_doc.content[13].content[0].content[0], MarkdownCode) # + + assert isinstance(md_doc.content[13].content[1], MarkdownCollapsible) # Discovery + assert 3 == len(md_doc.content[13].content[1].content) + assert isinstance(md_doc.content[13].content[1].content[0], MarkdownString) # + assert isinstance(md_doc.content[13].content[1].content[1], MarkdownCode) # + assert isinstance(md_doc.content[13].content[1].content[2], MarkdownTable) # \ No newline at end of file diff --git a/qualitytests/patternrepair/test_pattern_repair.py b/qualitytests/patternrepair/test_pattern_repair.py new file mode 100644 index 0000000..d64e1f8 --- /dev/null +++ b/qualitytests/patternrepair/test_pattern_repair.py @@ -0,0 +1,61 @@ +import pytest +import os +import shutil +from pathlib import Path + +from pattern_repair.pattern_repair import PatternRepair +from pattern_repair.PHP.instance_repair_php import InstanceRepairPHP + +from qualitytests.qualitytests_utils import join_resources_path + +@pytest.fixture(autouse=True) +def run_around_tests(): + # Code that will run before the test + path_to_test_pattern = join_resources_path("sample_patlib/PHP/5_pattern_to_repair") + path_to_save = join_resources_path("sample_patlib/PHP/5_pattern_to_repair_copy") + # copy the directory, to save it + shutil.copytree(path_to_test_pattern, path_to_save) + + # A test function will be run at this point + yield + + # Code that will run after the test + # restore the saved pattern + shutil.rmtree(path_to_test_pattern) + os.rename(path_to_save, path_to_test_pattern) + assert os.path.exists(path_to_test_pattern) + +class TestPatternRepair: + def test_repair_test_pattern_assert_files_exist(self): + path_to_test_pattern = join_resources_path("sample_patlib/PHP/5_pattern_to_repair") + instance_path = path_to_test_pattern / "1_instance_5_pattern_to_repair" + assert os.path.exists(instance_path) + + PatternRepair(path_to_test_pattern, "PHP", join_resources_path("sample_patlib")).repair(True) + + expected_pattern_json = path_to_test_pattern / "5_pattern_to_repair.json" + assert expected_pattern_json.is_file() + expected_instance_json = instance_path / "1_instance_5_pattern_to_repair.json" + assert expected_instance_json.is_file() + expected_instance_php = instance_path / "1_instance_5_pattern_to_repair.php" + assert expected_instance_php.is_file() + expected_instance_bash = instance_path / "1_instance_5_pattern_to_repair.bash" + assert expected_instance_bash.is_file() + expected_instance_sc = instance_path / "1_instance_5_pattern_to_repair.sc" + assert expected_instance_sc.is_file() + expected_docs_dir = path_to_test_pattern / "docs" + assert expected_docs_dir.is_dir() + expected_description = expected_docs_dir / "description.md" + assert expected_description.is_file() + expected_README_file = path_to_test_pattern / "README.md" + assert expected_README_file.is_file() + + def test_finding_source_and_sink_line(self): + path_to_test_pattern = join_resources_path("sample_patlib/PHP/5_pattern_to_repair") + instance_repair = InstanceRepairPHP("PHP", path_to_test_pattern, "", join_resources_path("sample_pathlib")) + + path_to_php_file = path_to_test_pattern / "1_instance_5_pattern_to_repair" / "test.php" + + source, sink = instance_repair._get_source_and_sink_for_file(path_to_php_file) + assert 2 == source + assert 3 == sink \ No newline at end of file diff --git a/qualitytests/patternrepair/test_pattern_repair_utils.py b/qualitytests/patternrepair/test_pattern_repair_utils.py new file mode 100644 index 0000000..0a9de3d --- /dev/null +++ b/qualitytests/patternrepair/test_pattern_repair_utils.py @@ -0,0 +1,75 @@ +import pytest +from unittest.mock import patch + +from qualitytests.qualitytests_utils import join_resources_path + +from core.exceptions import PatternDoesNotExists +from pattern_repair.utils import ( + assert_pattern_valid, compare_dicts, + get_dict_keys, get_instance_name, + get_files_with_ending, get_language_by_file_ending, + list_instances_jsons, repair_keys_of_json + ) + +class TestPatternRepairUtils: + def test_assert_pattern_valid(self): + path_to_non_existing_pattern = join_resources_path("100_non_existing") + with pytest.raises(PatternDoesNotExists) as e_info: + assert_pattern_valid(path_to_non_existing_pattern) + assert "Specified Pattern `100_non_existing` does not exists." in str(e_info.value) + + def test_compare_dicts(self): + o_dict = {"key1": 1, "key2": 3, "key3": 2} + n_dict = {"key1": 1, "key3": 3, "key4": 42} + assert {'key3': 2} == compare_dicts(o_dict, n_dict) + + def test_get_dict_keys(self): + d = { + "key1": { + "key1.1": 0, + "key1.2": {"key1.2.1": 0} + }, + "key2": 42 + } + assert set(["key1:key1.1", "key1:key1.2:key1.2.1", "key2"]) == set(get_dict_keys(d)) + + def test_get_instance_name(self): + path_to_pattern = join_resources_path("sample_patlib/PHP/5_pattern_to_repair") + path_to_instance = path_to_pattern / "1_instance_5_pattern_to_repair" + + assert "1 Instance", get_instance_name(path_to_instance) + + def test_get_files_with_ending(self): + path_to_pattern = join_resources_path("sample_patlib/PHP/3_global_array") + assert [] == get_files_with_ending(path_to_pattern, ".php") + expected_instance_1_php_file = str(path_to_pattern / "1_instance_3_global_array" / "1_instance_3_global_array.php") + expected_instance_2_php_file = str(path_to_pattern / "2_instance_3_global_array" / "2_instance_3_global_array.php") + assert set([expected_instance_1_php_file, expected_instance_2_php_file]) == set(get_files_with_ending(path_to_pattern, ".php", True)) + + def test_get_language_by_file_ending(self): + assert "python" == get_language_by_file_ending("test.py") + assert "php" == get_language_by_file_ending("test.php") + assert "javascript" == get_language_by_file_ending("test.js") + assert "java" == get_language_by_file_ending("test.java") + assert "scala" == get_language_by_file_ending("test.sc") + assert "bash" == get_language_by_file_ending("test.bash") + + with pytest.raises(NotImplementedError) as e_info: + get_language_by_file_ending("") + assert "The ending of the given filename is not yet supported" in str(e_info.value) + + def test_list_instance_jsons(self): + path_to_pattern = join_resources_path("sample_patlib/PHP/3_global_array") + expected_instance_1_json_file = str(path_to_pattern / "1_instance_3_global_array" / "1_instance_3_global_array.json") + expected_instance_2_json_file = str(path_to_pattern / "2_instance_3_global_array" / "2_instance_3_global_array.json") + assert set([expected_instance_1_json_file, expected_instance_2_json_file]) == set(list_instances_jsons(path_to_pattern)) + + def test_repair_keys_of_json(self): + json_dict_tested = {"a": 42, "b": {"b.0": 1}} + json_dict_ground_truth = {"a": 42, "b": {"b.0": 1, "b.1": 1}, "c": 42, "d": 36} + with patch("pattern_repair.utils.read_json") as read_json_mock, \ + patch("pattern_repair.utils.write_json") as write_json_mock: + read_json_mock.side_effect = [json_dict_tested, json_dict_ground_truth] + + repair_keys_of_json("", "", ["d"]) + write_json_mock.assert_called_once_with("", {"a": 42, "b": {"b.0": 1, "b.1": ""}, "c": ""}) diff --git a/qualitytests/resources/sample_patlib/PHP/2_global_variables/2_global_variables.json b/qualitytests/resources/sample_patlib/PHP/2_global_variables/2_global_variables.json index c905791..da72bf5 100644 --- a/qualitytests/resources/sample_patlib/PHP/2_global_variables/2_global_variables.json +++ b/qualitytests/resources/sample_patlib/PHP/2_global_variables/2_global_variables.json @@ -5,5 +5,6 @@ "tags": ["sast", "php", "php_v7.4.9"], "instances": [ "./1_instance_2_global_variables/1_instance_2_global_variables.json" - ] + ], + "version": "v0" } \ No newline at end of file diff --git a/qualitytests/resources/sample_patlib/PHP/3_global_array/3_global_array.json b/qualitytests/resources/sample_patlib/PHP/3_global_array/3_global_array.json index c71f100..518489e 100644 --- a/qualitytests/resources/sample_patlib/PHP/3_global_array/3_global_array.json +++ b/qualitytests/resources/sample_patlib/PHP/3_global_array/3_global_array.json @@ -1,9 +1,11 @@ { + "description": "", "name": "Global Array", "family": "code_pattern_php", "tags": ["sast", "php", "php_v7.4.9"], "instances": [ "./1_instance_3_global_array/1_instance_3_global_array.json", "./2_instance_3_global_array/2_instance_3_global_array.json" - ] + ], + "version": "v0" } \ No newline at end of file diff --git a/qualitytests/resources/sample_patlib/PHP/5_pattern_to_repair/1_instance_5_pattern_to_repair/test.php b/qualitytests/resources/sample_patlib/PHP/5_pattern_to_repair/1_instance_5_pattern_to_repair/test.php new file mode 100644 index 0000000..e5b689b --- /dev/null +++ b/qualitytests/resources/sample_patlib/PHP/5_pattern_to_repair/1_instance_5_pattern_to_repair/test.php @@ -0,0 +1,3 @@ + Path: if not output_dir: output_dir: str = str(config.RESULT_DIR) try: @@ -542,6 +631,19 @@ def parse_output_dir(output_dir: str): exit(1) +def parse_dir_or_file(path_to_file_or_dir: str, + default_path: str = config.RESULT_DIR, + name: str = "Output directory") -> Path: + if not path_to_file_or_dir: + path_to_file_or_dir: str = str(default_path) + try: + path_to_file_or_dir_as_path: Path = Path(path_to_file_or_dir).resolve() + return path_to_file_or_dir_as_path + except Exception as e: + print(f"{name} is wrong: {path_to_file_or_dir}") + exit(1) + + def parse_tool_list(tools: list[str]): if not tools: return config.SAST_TOOLS_ENABLED diff --git a/tp_framework/core/errors.py b/tp_framework/core/errors.py index 8d0e99b..a3984a1 100644 --- a/tp_framework/core/errors.py +++ b/tp_framework/core/errors.py @@ -84,4 +84,17 @@ def discoveryRuleParsingResultError(): def unexpectedException(e): - return f"Unexpected exception triggered: {e}." \ No newline at end of file + return f"Unexpected exception triggered: {e}." + +# Pattern Repair + +def measurementResultsDirDoesNotExist(): + return "The directory with the measurements does not exist." + + +def fileDoesNotExist(): + return "The file you provided for does not exist or is the wrong file type." + + +def templateDirDoesNotExist(not_exisitng_dir_or_file): + return f"Your tplib does not have {not_exisitng_dir_or_file}." \ No newline at end of file diff --git a/tp_framework/core/exceptions.py b/tp_framework/core/exceptions.py index 281221e..ff7cc79 100644 --- a/tp_framework/core/exceptions.py +++ b/tp_framework/core/exceptions.py @@ -121,4 +121,23 @@ def __init__(self, stderr=None): self.message = stderr else: self.message = errors.discoveryRuleParsingResultError() + super().__init__(self.message) + +# Pattern Repair + +class MeasurementResultsDoNotExist(Exception): + def __init__(self, message=errors.measurementResultsDirDoesNotExist()): + self.message = message + super().__init__(self.message) + + +class FileDoesNotExist(Exception): + def __init__(self, message=errors.fileDoesNotExist()): + self.message = message + super().__init__(self.message) + + +class TemplateDoesNotExist(Exception): + def __init__(self, message=errors.templateDirDoesNotExist('template')) -> None: + self.message = message super().__init__(self.message) \ No newline at end of file diff --git a/tp_framework/core/utils.py b/tp_framework/core/utils.py index 5725b3f..9f12c5b 100644 --- a/tp_framework/core/utils.py +++ b/tp_framework/core/utils.py @@ -17,7 +17,8 @@ import config from core import pattern, instance from core.exceptions import PatternDoesNotExists, LanguageTPLibDoesNotExist, TPLibDoesNotExist, InvalidSastTools, \ - DiscoveryMethodNotSupported, TargetDirDoesNotExist, InvalidSastTool, PatternFolderNotFound, InstanceDoesNotExists + DiscoveryMethodNotSupported, TargetDirDoesNotExist, InvalidSastTool, PatternFolderNotFound, InstanceDoesNotExists, \ + MeasurementResultsDoNotExist, FileDoesNotExist from core import errors @@ -46,6 +47,7 @@ def list_tpi_paths_by_tp_id(language: str, pattern_id: int, tp_lib_dir: Path) -> def get_tpi_id_from_jsonpath(jp: Path) -> int: return get_id_from_name(jp.parent.name) + def get_pattern_dir_from_id(pattern_id: int, language: str, tp_lib_dir: Path) -> Path: tp_lib_dir_lang_dir: Path = tp_lib_dir / language if tp_lib_dir_lang_dir.is_dir(): @@ -196,6 +198,24 @@ def get_discovery_rules(discovery_rule_list: list[str], discovery_rule_ext: str) return list(discovery_rules_to_run) +################################################################################ +# Pattern Repair +# + +def check_measurement_results_exist(measurement_dir: Path): + if not measurement_dir.is_dir(): + e = MeasurementResultsDoNotExist() + logger.error(get_exception_message(e)) + raise e + + +def check_file_exist(file_path: Path, file_suffix = ".csv"): + if not file_path.is_file() or not file_path.suffix == file_suffix: + e = FileDoesNotExist(file_path) + logger.error(get_exception_message(e)) + raise e + + ################################################################################ # Others # @@ -291,7 +311,6 @@ def add_loggers(output_dir_path: Path, filename: str=None, console=True): loggermgr.add_console_logger() - def get_operation_build_name_and_dir(op: str, src_dir: Path | None, language: str, output_dir: Path): now = datetime.now() if not src_dir: @@ -351,4 +370,4 @@ def get_file_hash(fpath, bigfile=False): else: while chunk := f.read(8192): hash.update(chunk) - return hash.hexdigest() \ No newline at end of file + return hash.hexdigest() diff --git a/tp_framework/pattern_repair/PHP/__init__.py b/tp_framework/pattern_repair/PHP/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tp_framework/pattern_repair/PHP/generate_opcode.py b/tp_framework/pattern_repair/PHP/generate_opcode.py new file mode 100644 index 0000000..54e9078 --- /dev/null +++ b/tp_framework/pattern_repair/PHP/generate_opcode.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 + +""" +This script can be used to generate opcode for PHP patterns +""" +import os +import logging +import time + +from pattern_repair.utils import get_files_with_ending, read_json, write_json +from core import loggermgr + +logger = logging.getLogger(loggermgr.logger_name(__name__)) + + +class PHPOpcodeGenerator: + """This class encapsulates the opcode generation for PHP files""" + + def __init__( + self, pattern_instance_path: str, path_to_testability_patterns: str + ) -> None: + self.pattern_instance_path = pattern_instance_path + self.path_to_testability_patterns = path_to_testability_patterns + + def _adjust_json_file(self, bash_file_name): + """Adapts the JSON file of the instance, that 'compile', 'binary' points to the new opcode file + + Args: + bash_file_name (_type_): _description_ + """ + json_files_paths = get_files_with_ending(self.pattern_instance_path, ".json") + if not len(json_files_paths) == 1: + logger.error( + f"Expected one JSON file for {self.pattern_instance_path} got {len(json_files_paths)}" + ) + exit(1) + result_dict = read_json(json_files_paths[0]) + result_dict["compile"][ + "binary" + ] = f".{os.sep}{os.path.relpath(bash_file_name, self.pattern_instance_path)}" + write_json(json_files_paths[0], result_dict) + + def _mask_line(self, input_line: str, php_file: str) -> str: + """Should masquerades the opcode line, where the path to the php file is written. + If `php_file` cannot be found in `input_line`, the `input_line` is returned. + + Args: + input_line (str): any line from bash code file. + php_file (str): path of php file. + + Returns: + str: masked line, everything, that is before the testability pattern lib is cut with `/../`. + """ + if not php_file in input_line: + return input_line + line_prefix = input_line.split(os.sep)[0] + line_suffix = input_line[input_line.rfind(".php") + 4 :] + actual_filepath = input_line.replace(line_prefix, "").replace(line_suffix, "") + new_path = f"{os.sep}...{os.sep}{os.path.relpath(actual_filepath, self.path_to_testability_patterns)}" + return line_prefix + new_path + line_suffix + + def _make_optcode_from_php_file(self, php_file_path: str) -> str: + """Generates opcode for a php file. + + Args: + php_file_path (str): Path to PHP file + + Returns: + str: File path to the corresponding file containing the opcode. + """ + # define necessary paths + php_file_path = os.path.abspath(php_file_path) + bash_file_path = f'{php_file_path.strip("ph")}bash' + + # opcache will only compile and cache files older than the script execution start (https://www.php.net/manual/en/function.opcache-compile-file.php) + # therefor we have to modify the time the php file was created + one_minute_ago = time.time() - 60 + os.utime(php_file_path, (one_minute_ago, one_minute_ago)) + + # Generate the bash file + os.system( + f"php -d zend_extension=opcache -d opcache.enable_cli=1 -d opcache.opt_debug_level=0x10000 --syntax-check {php_file_path} 2> {bash_file_path} 1>/dev/null" + ) + + # Sanitize the opcode: on some systems, there is an error included in the bash file + with open(bash_file_path, "r") as file: + result = file.readlines() + for idx, line in enumerate(result): + if line.startswith("$_main"): + result = result[max(idx - 1, 0) :] + break + # mask the path to file + final_lines = [self._mask_line(line, php_file_path) for line in result] + with open(bash_file_path, "w") as file: + file.writelines(final_lines) + return bash_file_path + + def generate_opcode_for_pattern_instance(self) -> str: + """Generates the opcode for a pattern instance, and adjusts the JSON file accordingly. + + Returns: + str: file path to the generated opcode. + """ + php_files_paths = get_files_with_ending( + self.pattern_instance_path, ".php", recursive=True + ) + if not php_files_paths: + logger.warning( + f"Expected one PHP file for {self.pattern_instance_path}, found {len(php_files_paths)}" + ) + return [] + bash_files = [] + for php_file_path in php_files_paths: + bash_files += [self._make_optcode_from_php_file(php_file_path)] + if len(php_files_paths) == 1: + self._adjust_json_file(bash_files[-1]) + return bash_files diff --git a/tp_framework/pattern_repair/PHP/instance_repair_php.py b/tp_framework/pattern_repair/PHP/instance_repair_php.py new file mode 100644 index 0000000..30fe34a --- /dev/null +++ b/tp_framework/pattern_repair/PHP/instance_repair_php.py @@ -0,0 +1,157 @@ +import os +import logging + +from pattern_repair.instance_repair import InstanceRepair +from pattern_repair.PHP.generate_opcode import PHPOpcodeGenerator +from pattern_repair.utils import read_json, write_json, get_files_with_ending + +from core import loggermgr + +logger = logging.getLogger(loggermgr.logger_name(__name__)) + + +class InstanceRepairPHP(InstanceRepair): + def _get_source_and_sink_for_file(self, path_to_file: str) -> tuple: + """Looks for '// source' and '// sink' in a file and returns the line numbers of these lines (index starting at 1) + + Args: + path_to_file (str): path to the file source and sink should be found in. + + Returns: + tuple: (source_line, sink_line) if one does not exists, it returns None for that. + """ + with open(path_to_file, "r") as fp: + file_lines = fp.readlines() + sink = None + source = None + for idx, line in enumerate(file_lines): + if "// sink" in line: + sink = idx + 1 + if "// source" in line: + source = idx + 1 + return (source, sink) + + def _repair_json_field_with_path( + self, instance_dict: dict, file_ending: str, keyword1: str, keyword2: str + ) -> dict: + """Checks if the path in the JSON, identified by keyword1 and keyword2 is path to a valid file. + + Args: + instance_dict (dict): Dict of instance + file_ending (str): fileending of the wanted file + keyword1 (str): Keyword for first level in `instance_dict` + keyword2 (str): Keyword in second level in `instance_dict` + + Returns: + dict: Dict of instance + """ + self._find_and_rename_file(file_ending) + expected_path = f".{os.sep}{self.instance_name}.{file_ending}" + abs_expected_path = os.path.abspath( + os.path.join(self.instance_path, expected_path) + ) + if os.path.isfile(abs_expected_path): + instance_dict[keyword1][keyword2] = expected_path + else: + # check if the path inserted in the field is actually valid + if os.path.isfile( + os.path.join(self.instance_path, instance_dict[keyword1][keyword2] if instance_dict[keyword1][keyword2] else '') + ): + return instance_dict + logger.warning( + f"Could not verify {file_ending} filepath for instance {self.instance_name}" + ) + return instance_dict + + def _repair_json_expectation(self, instance_dict: dict) -> dict: + """Corrects 'expectation:source_file', 'expectation:sink_file', 'expectation:source_line', 'expectation:sink_line' + + Args: + instance_dict (dict): Dict of instance + + Returns: + dict: Dict of instance + """ + # get paths from the JSON file + path_to_source_file = instance_dict["expectation"]["source_file"] + abs_path_to_source_file = os.path.join(self.instance_path, path_to_source_file) + path_to_sink_file = instance_dict["expectation"]["sink_file"] + abs_path_to_sink_file = os.path.join(self.instance_path, path_to_sink_file) + path_to_php_file = instance_dict["code"]["path"] + abs_path_to_php_file = os.path.join(self.instance_path, path_to_php_file) + + if not path_to_php_file or not os.path.isfile(abs_path_to_php_file): + logger.warning(f'Could not verify "expectation" for {self.instance_name}') + return instance_dict + + if not os.path.isfile(abs_path_to_sink_file): + abs_path_to_sink_file = abs_path_to_php_file + path_to_sink_file = path_to_php_file + logger.info(f"Changing sink file path to {path_to_php_file}") + if not os.path.isfile(abs_path_to_source_file): + abs_path_to_source_file = abs_path_to_php_file + path_to_source_file = path_to_php_file + logger.info(f"Changing source file path to {path_to_php_file}") + source0, sink0 = self._get_source_and_sink_for_file(abs_path_to_sink_file) + source1, sink1 = self._get_source_and_sink_for_file(abs_path_to_source_file) + + # set values in instance dict + instance_dict["expectation"]["source_file"] = path_to_source_file + instance_dict["expectation"]["source_line"] = source0 if source0 else source1 + instance_dict["expectation"]["sink_file"] = path_to_sink_file + instance_dict["expectation"]["sink_line"] = sink0 if sink0 else sink1 + if not (bool(source0) or bool(source1)): + logger.warning(f"Could not verify source files for {self.instance_name}") + if not (bool(sink0) or bool(sink1)): + logger.warning(f"Could not verify sink files for {self.instance_name}") + return instance_dict + + def _repair_opcode(self): + """Generates opcode and checks if it is empty.""" + # remove old bash files first, before generating new ones + all_bash_files = get_files_with_ending(self.instance_path, ".bash", recursive=True) + for bash_file in all_bash_files: + os.remove(bash_file) + bash_file_paths = PHPOpcodeGenerator( + self.instance_path, self.path_to_testability_patterns + ).generate_opcode_for_pattern_instance() + for bash_file_path in bash_file_paths: + if not bash_file_path or not os.stat(bash_file_path).st_size: + logger.warning(f"Bash file {bash_file_path} is empty") + + def _repair_instance_json(self) -> None: + """Repairs JSON of instance""" + # make sure file exists and has all the right fields + super().repair_instance_json() + instance_dict = read_json(self.instance_json_file) + # make sure bash filepath is correct + instance_dict = self._repair_json_field_with_path( + instance_dict, "bash", "compile", "binary" + ) + # make sure PHP filepath is correct + instance_dict = self._repair_json_field_with_path( + instance_dict, "php", "code", "path" + ) + # make sure discovery filepath is correct + instance_dict = self._repair_json_field_with_path( + instance_dict, "sc", "discovery", "rule" + ) + # make sure expectations is correct + instance_dict = self._repair_json_expectation(instance_dict) + write_json(self.instance_json_file, instance_dict) + + def _repair_num_files(self) -> None: + """Checks how many php and bash files are there.""" + all_bash_files = get_files_with_ending(self.instance_path, ".bash") + all_php_files = get_files_with_ending(self.instance_path, ".php") + if len(all_bash_files) != len(all_php_files): + logger.warning( + f"Expected same number of .bash and .php files, but got {len(all_php_files)} PHP files and {len(all_bash_files)} BASH files" + ) + + def repair(self): + super().repair_instance_json() + super().repair() + self._find_and_rename_file("php") + self._repair_opcode() + self._repair_instance_json() diff --git a/tp_framework/pattern_repair/README_generator.py b/tp_framework/pattern_repair/README_generator.py new file mode 100644 index 0000000..14ffa2e --- /dev/null +++ b/tp_framework/pattern_repair/README_generator.py @@ -0,0 +1,155 @@ +import logging + +from os import path +from pathlib import Path + +from pattern_repair.README_markdown_elements import * +from pattern_repair.utils import ( + read_json, + read_csv_to_dict, + read_file, + translate_bool, + get_instance_name, +) +from pattern_repair.README_instance_generator import InstanceREADMEGenerator + +from core.utils import check_lang_tp_lib_path, get_id_from_name +from core import loggermgr + +logger = logging.getLogger(loggermgr.logger_name(__name__)) + + +class READMEGenerator: + def __init__( + self, + path_to_pattern: str, + language: str, + tp_lib_path: str, + instance_jsons: list[str], + discovery_rule_results: str = "", + measurement_results: str = "", + masking_file: str = "", + ) -> None: + check_lang_tp_lib_path(Path(path.join(tp_lib_path, language.upper()))) + + self.pattern_path = path_to_pattern + self.pattern_dict = read_json( + path.join(path_to_pattern, f"{path.basename(path_to_pattern)}.json") + ) + self.language = language.upper() + self.log_prefix = "Generating README: " + self.discovery_rule_results = None + self.measurement_results = measurement_results + self.masking_file = masking_file + self.instance_jsons = instance_jsons + + if not path.isfile(discovery_rule_results): + logger.warning( + f"{self.log_prefix}Cannot locate discoveryrule results in {self.discovery_rule_results}" + ) + else: + self.discovery_rule_results = read_csv_to_dict(discovery_rule_results) + + self.readme_structure = [ + self._comment, + self._heading, + self._tags, + self._pattern_description, + self._pattern_metadata, + self._instances, + ] + + def _comment(self) -> list: + """Generates a Comment for the top of the README file.""" + return [ + MarkdownComment( + "This file is automatically generated. If you wish to make any changes, please use the JSON files and regenerate this file using the tpframework." + ) + ] + + def _heading(self) -> list: + """Generates the heading for the README file.""" + return [MarkdownHeading(self.pattern_dict["name"], 1)] + + def _pattern_description(self) -> list: + """Generates the description for the pattern.""" + desc = self.pattern_dict["description"] + if path.isfile(path.join(self.pattern_path, desc)): + desc = read_file(path.join(self.pattern_path, desc)) + return [MarkdownHeading("Description", 2), MarkdownString(desc)] + + def _tags(self) -> list: + """Generates pattern tags.""" + return [ + MarkdownString(f'Tags: {", ".join(self.pattern_dict["tags"])}'), + MarkdownString(f'Version: {self.pattern_dict["version"]}'), + ] + + def _pattern_metadata(self) -> list: + """Generates a table of pattern metadata, such as the instances, discovery rule discovery method and if the discovery rule is successfull on the instance.""" + discovery_rule_exists = [] + instance_names = [] + discovery_rule_successfull = [] + discovery_method = [] + for instance_path_json in self.instance_jsons: + instance_dict = read_json(instance_path_json) + instance_path = path.dirname(instance_path_json) + + instance_name = get_instance_name(path.basename(instance_path)) + instance_names += [ + MarkdownLink(instance_name, MarkdownHeading(instance_name, 2)) + ] + + discovery_file = path.join( + instance_path, instance_dict["discovery"]["rule"] + ) + discovery_rule_exists += [translate_bool(path.isfile(discovery_file))] + + pattern_id, instance_id = get_id_from_name( + path.basename(self.pattern_path) + ), get_id_from_name(path.basename(instance_path)) + if self.discovery_rule_results: + discovery_rule_successfull += ([self.discovery_rule_results[self.language][str(pattern_id)][str(instance_id)]] + if self.discovery_rule_results + else [""] + ) + if not discovery_rule_successfull[-1]: + logger.warning(f'{self.log_prefix}Could not find discovery rule result for {instance_name}. Assuming "error"') + discovery_rule_successfull[-1] = "error" + + discovery_method += [instance_dict["discovery"]["method"]] + + metadata_dict = { + "0::Instances": instance_names, + "1::has discovery rule": discovery_rule_exists, + "2::discovery method": discovery_method, + "3::rule successfull": discovery_rule_successfull, + } + if not self.discovery_rule_results: + metadata_dict.pop("3::rule successfull") + + return [MarkdownHeading("Overview", 2), MarkdownTable(metadata_dict)] + + def _instances(self) -> list: + """Generates the README elements for all instances.""" + return InstanceREADMEGenerator( + self.pattern_path, + self.language, + self.measurement_results, + self.instance_jsons, + masking_file=self.masking_file, + ).generate_md() + + def _generate_README_elements(self) -> MarkdownDocument: + md_elements = [] + for f in self.readme_structure: + md_elements += f() + return MarkdownDocument(md_elements) + + def generate_README(self) -> str: + """Entrypoint for generating a README file for that pattern. + + Returns: + str: The generated README file following `self.readme_structure` + """ + return self._generate_README_elements().to_markdown() diff --git a/tp_framework/pattern_repair/README_instance_generator.py b/tp_framework/pattern_repair/README_instance_generator.py new file mode 100644 index 0000000..3e3dca8 --- /dev/null +++ b/tp_framework/pattern_repair/README_instance_generator.py @@ -0,0 +1,383 @@ +import logging +import re +from os import path +from datetime import datetime + +from pattern_repair.utils import ( + read_json, + get_dict_keys, + translate_bool, + get_language_by_file_ending, + get_instance_name, + get_files_with_ending, + read_file, +) +from pattern_repair.README_markdown_elements import * + +from core import loggermgr + +logger = logging.getLogger(loggermgr.logger_name(__name__)) + + +class InstanceREADMEGenerator: + def __init__( + self, + path_to_pattern: str, + language: str, + path_to_pattern_measurements: str, + instance_jsons: list[str], + level: int = 2, + masking_file: str = "mask.json", + ) -> None: + self.language = language.upper() + self.log_prefix = "Generating README: " + self.pattern_path = path_to_pattern + self.level = level + self.pattern_measurements = ( + path_to_pattern_measurements if path_to_pattern_measurements else "" + ) + + self.instances_jsons = instance_jsons + self.has_multiple_instances = len(self.instances_jsons) > 1 + self.instance_dicts = [read_json(i_path) for i_path in self.instances_jsons] + + self.current_instance = None + self.current_instance_dict = None + self.current_instance_dict_keys = None + + self.instance_structure = [ + self._instance_name, + self._instance_description, + self._instance_code, + self._instance_properties, + self._instance_more, + ] + self.instance_more_structure = [ + self._compile, + self._discovery, + self._measurement, + self._remediation, + ] + + self.mask = {} + if masking_file and path.isfile(masking_file): + self.mask = read_json(masking_file) + elif masking_file: + logger.info(f"Could not file the provided masking file: {masking_file}") + + def _instance_name(self) -> list: + """Generates the Markdown heading for the current instance.""" + return [MarkdownHeading(get_instance_name(self.current_instance), self.level)] + + def _instance_description(self) -> list: + """Generates the description for the current instance.""" + desc = ( + self.current_instance_dict["description"] + if "description" in self.current_instance_dict_keys + else "" + ) + content = self._get_file_content_if_exists(desc, debug_name="description") + return [MarkdownString(content)] if content else [] + + def _instance_code(self) -> list: + """Generates the Instance code for the current instance.""" + heading = MarkdownHeading("Code", self.level + 1) + code = ( + self.current_instance_dict["code"]["path"] + if "code:path" in self.current_instance_dict_keys + else "" + ) + source = ( + self.current_instance_dict["expectation"]["source_file"] + if "expectation:source_file" in self.current_instance_dict_keys + else "" + ) + sink = ( + self.current_instance_dict["expectation"]["sink_file"] + if "expectation:sink_file" in self.current_instance_dict_keys + else "" + ) + if source == sink: + content = self._get_file_content_if_exists(code, debug_name="code") + return [heading, MarkdownCode(content, self.language)] if content else [] + source_content = self._get_file_content_if_exists( + source, debug_name="source_file" + ) + sink_content = self._get_file_content_if_exists(sink, debug_name="sink_file") + return [ + heading, + MarkdownHeading("Source File", self.level + 2), + MarkdownCode(source_content, self.language), + MarkdownHeading("Sink File", self.level + 2), + MarkdownCode(sink_content, self.language), + ] + + def _instance_properties(self) -> list: + """Generates the table of instance properties.""" + properties_dict = { + "category": [self.current_instance_dict["properties"]["category"]], + "feature_vs_internal_api": [ + self.current_instance_dict["properties"]["feature_vs_internal_api"] + ], + "input_sanitizer": [ + translate_bool( + self.current_instance_dict["properties"]["input_sanitizer"] + ) + ], + "source_and_sink": [ + translate_bool( + self.current_instance_dict["properties"]["source_and_sink"] + ) + ], + "negative_test_case": [ + translate_bool( + self.current_instance_dict["properties"]["negative_test_case"] + ) + ], + } + return [ + MarkdownHeading("Instance Properties", self.level + 1), + MarkdownTable(properties_dict), + ] + + def _instance_more(self) -> list: + """Generates the 'more' section for an instance.""" + ret = [] + for f in self.instance_more_structure: + ret += f() + return [MarkdownCollapsible(ret, MarkdownString("More"))] + + def _compile(self) -> list: + """Generates the compile section for an instance.""" + compile = ( + self.current_instance_dict["compile"]["binary"] + if "compile:binary" in self.current_instance_dict_keys + else "" + ) + content = self._get_file_content_if_exists(compile, "compile") + binary = MarkdownCode(content, get_language_by_file_ending(compile)) + return ( + [MarkdownCollapsible([binary], MarkdownHeading("Compile", self.level + 1))] + if content + else [] + ) + + def _discovery(self) -> list: + """Generates the 'discovery' section for an instance.""" + desc = ( + self.current_instance_dict["discovery"]["notes"] + if "discovery:notes" in self.current_instance_dict_keys + else "" + ) + desc = MarkdownString(self._get_file_content_if_exists(desc, "discovery notes")) + rule_path = ( + self.current_instance_dict["discovery"]["rule"] + if "discovery:rule" in self.current_instance_dict_keys + else "" + ) + rule = self._get_file_content_if_exists(rule_path, "discovery rule") + # get only necessary content + rule = re.sub("@main def main\(name .*{$", "", rule, flags=re.M) + rule = re.sub("importCpg.*$", "", rule, flags=re.M) + rule = re.sub("println\(.*\)$", "", rule, flags=re.M) + rule = re.sub("delete;.*$", "", rule, flags=re.M) + rule = re.sub(".*}.*$", "", rule) + rule = "\n".join([l.strip() for l in rule.split("\n")]) + rule = ( + MarkdownCode(rule, get_language_by_file_ending(rule_path)) + if rule_path + else MarkdownString("No discovery rule yet.") + ) + discovery_table = { + "discovery method": [self.current_instance_dict["discovery"]["method"]], + "expected accuracy": [ + self.current_instance_dict["discovery"]["rule_accuracy"] + ], + } + discovery_table = MarkdownTable(discovery_table) + return [ + MarkdownCollapsible( + [desc, rule, discovery_table], + MarkdownHeading("Discovery", self.level + 1), + ) + ] + + def _measurement(self) -> list: + """Generates the 'measurement' section for an instance.""" + if not path.isdir(self.pattern_measurements): + logger.warning( + f"{self.log_prefix}Could not generate measurement table, because {self.pattern_measurements} does not exist" + ) + return [] + instance_measurements = path.join( + self.pattern_measurements, path.basename(self.current_instance) + ) + measurement_table = {} + has_measurement = False + dates = [] + ground_truth = self.current_instance_dict["expectation"]["expectation"] + for json_file in get_files_with_ending(instance_measurements, ".json"): + current_json = read_json(json_file) + for c_dict in current_json: + has_measurement = True + tool = f'1::{self.mask[c_dict["tool"].lower()] if c_dict["tool"].lower() in self.mask.keys() else c_dict["tool"]}' + date = datetime.strptime(c_dict["date"], "%Y-%m-%d %H:%M:%S").strftime( + "%d %b %Y" + ) + dates += [date] + sast_tool_result = translate_bool(not (c_dict["result"] ^ ground_truth)) + try: + measurement_table[tool] += [(sast_tool_result, date)] + measurement_table[tool] = sorted( + measurement_table[tool], + key=lambda tup: datetime.strptime(tup[1], "%d %b %Y"), + ) + except KeyError: + measurement_table[tool] = [(sast_tool_result, date)] + if not has_measurement: + return [] + measurement_table, sorted_dates = self._format_measurements( + measurement_table, dates + ) + measurement_table["0::Tool"] = sorted_dates + measurement_table["2::Ground Truth"] = [translate_bool(ground_truth)] * len( + sorted_dates + ) + return [ + MarkdownCollapsible( + [MarkdownTable(measurement_table)], + MarkdownHeading("Measurement", self.level + 1), + is_open=True, + ) + ] + + def _remediation(self) -> list: + """Generates the 'remediation' section for an instance.""" + note = ( + self.current_instance_dict["remediation"]["notes"] + if "remediation:notes" in self.current_instance_dict_keys + else "" + ) + note = MarkdownString( + self._get_file_content_if_exists(note, "remediation note") + ) + transformation = ( + self.current_instance_dict["remediation"]["transformation"] + if "remediation:transformation" in self.current_instance_dict_keys + else "" + ) + transformation = MarkdownString( + self._get_file_content_if_exists(transformation, "transformation") + ) + modeling_rule = ( + self.current_instance_dict["remediation"]["modeling_rule"] + if "remediation:modeling_rule" in self.current_instance_dict_keys + else "" + ) + modeling_rule = MarkdownString( + self._get_file_content_if_exists(modeling_rule, "modeling rule") + ) + if any([note, transformation, modeling_rule]): + note = [ + note + if note + else MarkdownString( + "Can you think of a transformation, that makes this tarpit less challenging for SAST tools?" + ) + ] + transformation = ( + [MarkdownHeading("Transformation", self.level + 2), transformation] + if transformation + else [] + ) + modeling_rule = ( + [MarkdownHeading("Modeling Rule", self.level + 2), modeling_rule] + if modeling_rule + else [] + ) + return [ + MarkdownCollapsible( + note + transformation + modeling_rule, + MarkdownHeading("Remediation", self.level + 1), + ) + ] + return [] + + def _get_file_content_if_exists( + self, path_to_file: str, debug_name: str = "" + ) -> str: + """If the `path_to_file` is a valid filepath within the current instance, this will return the content of that file. + Provide a `debug_name` if you want a unique logging warning. + + Args: + path_to_file (str): path to a file within the current instance. + debug_name (str, optional): Name, that is used in the debug output. Defaults to ''. + + Returns: + str: content of the file or empty string. + """ + content = path_to_file if path_to_file else "" + if path.isfile(path.join(self.current_instance, content)): + content = read_file(path.join(self.current_instance, content)) + if not content: + logger.warning( + f"{self.log_prefix}Could not find {debug_name} for instance {path.basename(self.current_instance)}" + ) + return "" + return content + + def _format_measurements(self, tool_measurement_dict: dict, dates: list) -> tuple: + """Formats the measurements in the wanted table format: + + | | Tool1 | Tool2 | + |--------+--------+--------| + | Date1 | yes | no | + + Args: + tool_measurement_dict (dict): dict containing measurement results and date as a list of tuple for each tool. + dates (list): a list of measurement dates. + + Returns: + tuple(dict, list): dict of all tools and their measurement results (one column) and a list of sorted measurement dates (first column) + """ + dates_sorted = sorted(list(set(dates))) + formatted_measurement_table = {} + for tool, measurements in tool_measurement_dict.items(): + formatted_measurements = [] + current_measurement = measurements.pop(0) + for date in dates_sorted: + if current_measurement[1] == date: + formatted_measurements += [current_measurement[0]] + if len(measurements): + current_measurement = measurements.pop(0) + else: + break + else: + formatted_measurements += [""] + formatted_measurement_table[tool] = formatted_measurements + return formatted_measurement_table, dates_sorted + + def generate_md(self) -> list: + """Entrypoint for generating Markdown for an instance, + + Returns: + list: a list of Markdown elements following the structure in `self.instance_structure` + """ + ret = [] + for idx, _ in enumerate(self.instances_jsons): + self.current_instance = path.dirname(self.instances_jsons[idx]) + self.current_instance_dict = self.instance_dicts[idx] + self.current_instance_dict_keys = get_dict_keys(self.current_instance_dict) + + instance_md_elements = [] + for f in self.instance_structure: + instance_md_elements += f() + if self.has_multiple_instances: + ret += [ + MarkdownCollapsible( + instance_md_elements[1:], instance_md_elements[0], idx == 0 + ) + ] + else: + ret = instance_md_elements + return ret diff --git a/tp_framework/pattern_repair/README_markdown_elements.py b/tp_framework/pattern_repair/README_markdown_elements.py new file mode 100644 index 0000000..260cc7b --- /dev/null +++ b/tp_framework/pattern_repair/README_markdown_elements.py @@ -0,0 +1,186 @@ +from tabulate import tabulate + + +class MarkdownElement: + """Super class for all MarkdownElements used within generating README files for a testability pattern.""" + + def __init__(self, content: str): + self.content = content.strip() + + def linkable(self) -> str: + """Makes it possible for a markdown Element to be used within a link. + + Returns: + str: a string representation, that can be used in a markdown link. + """ + raise NotImplementedError + + def to_markdown(self): + raise NotImplementedError + + def strip(self): + return self.to_markdown().strip() + + def __bool__(self): + return bool(self.content) + + +class MarkdownCode(MarkdownElement): + """A markdown code block. + Syntax: + + ``` + self.content + ``` + + """ + + def __init__(self, content, code_type): + super().__init__(content) + self.code_type = code_type + + def to_markdown(self) -> str: + return f"\n```{self.code_type.lower()}\n{self.content}\n```\n" + + +class MarkdownComment(MarkdownElement): + """A markdown comment + Syntax: + + [//]: # () + + """ + + def to_markdown(self): + self.content = self.content.replace("\\r\\n", " ") + return f"\n[//]: # ({self.content})\n" + + +class MarkdownHeading(MarkdownElement): + """A markdown heading, `self.level` indicates the number of '#' + Syntax example: + + # + + """ + + def __init__(self, content, level: int): + super().__init__(content) + self.level = int(level) + assert self.level >= 1 + + def to_markdown(self) -> str: + return f'\n{"#" * self.level} {self.content}\n' + + def linkable(self) -> str: + return f'#{self.content.replace(" " , "-").lower()}' + + +class MarkdownCollapsible(MarkdownElement): + """A markdown collapsible element. + Syntax example: + +
+ + + + + +
+ """ + + def __init__(self, content: list, heading: MarkdownElement, is_open: bool = False): + self.content = content + self.is_open = is_open + self.heading = heading + + def to_markdown(self) -> str: + final = f'\n
' + heading = ( + self.heading.to_markdown().strip() + if not isinstance(self.heading, MarkdownHeading) + else self.heading.to_markdown() + ) + final += f"\n\n{heading}\n\n" + for element in self.content: + final += element.to_markdown() + final += f"\n
\n" + return final + + +class MarkdownString(MarkdownElement): + """Representation of a String, it is surrounded by newlines.""" + + def to_markdown(self) -> str: + return f"\n{self.content}\n" + + +class MarkdownLink(MarkdownElement): + """A markdown link. + Syntax: + + [self.content](self.link) + + """ + + def __init__(self, content: str | MarkdownElement, link: MarkdownElement): + super().__init__(content) + assert isinstance( + link, MarkdownElement + ), "The link of a MarkdownLink must be a MarkdownElement." + self.link = link.linkable() + + def to_markdown(self): + return f"[{self.content.strip()}]({self.link.strip()})" + + +class MarkdownTable(MarkdownElement): + """A markdown table + Syntax: + + | | | + |---|---| + | | | + + The content must be provided as a dict, where the value for each key is a list. + The key will be the header and the list contains values for that column. + Columns will be sorted alphabetically, if you wish to sort columns yourself you can prefix them using ::. + """ + + def __init__(self, content: dict): + assert isinstance( + content, dict + ), "content for Markdown table must be provided as dict" + assert all( + [isinstance(v, list) for v in content.values()] + ), "content for Markdowntable must have lists as values" + self.headings = sorted(content.keys(), key=lambda x: x.lower()) + num_rows = max([len(v) for v in content.values()]) + self.lines = [ + [None for _ in range(len(self.headings))] for _ in range(num_rows) + ] + for column_idx, key in enumerate(self.headings): + for row_index, v in enumerate(content[key]): + self.lines[row_index][column_idx] = v.strip() if v else "" + + def to_markdown(self): + return f'\n{tabulate(self.lines, [h.split("::")[-1] if "::" in h else h for h in self.headings], "github")}\n' + + +class MarkdownDocument(MarkdownElement): + """A central point, where all markdown elements are collected into one single markdown document.""" + + def __init__(self, content: list) -> None: + self.content = content + + def to_markdown(self) -> str: + final = "" + for element in self.content: + assert isinstance(element, MarkdownElement) + final += element.to_markdown() + import re + + final = re.sub("\n\n\n*", "\n\n", final) + return ( + f"{final.strip()}\n" # GitHub markdown likes a newline at the end of files + ) diff --git a/tp_framework/pattern_repair/__init__.py b/tp_framework/pattern_repair/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tp_framework/pattern_repair/instance_repair.py b/tp_framework/pattern_repair/instance_repair.py new file mode 100644 index 0000000..98ccb8f --- /dev/null +++ b/tp_framework/pattern_repair/instance_repair.py @@ -0,0 +1,220 @@ +import logging +import os +import re +import shutil + +from pattern_repair.utils import ( + assert_pattern_valid, + repair_keys_of_json, + read_json, + write_json, + list_instances_jsons, + INSTANCE_JSON_NOT_MANDATORY_KEYS, + get_template_instance_json_path, + get_template_instance_discovery_rule_path, + get_files_with_ending +) + +from core.utils import get_id_from_name + +from core import loggermgr + +logger = logging.getLogger(loggermgr.logger_name(__name__)) + + +class InstanceRepair: + """Super class for all language specific `InstanceRepair`.""" + + def __init__( + self, + language: str, + path_to_pattern: str, + instance_json_path: str, + path_to_tp_lib: str, + ) -> None: + assert_pattern_valid(path_to_pattern) + + self.language = language + self.pattern_path = path_to_pattern + self.pattern_name = os.path.basename(self.pattern_path) + self.pattern_id = get_id_from_name(self.pattern_name) + self.instance_path = os.path.dirname(instance_json_path) + self.instance_name = os.path.basename(self.instance_path) + self.instance_json_file = instance_json_path + self.path_to_testability_patterns = path_to_tp_lib + + def _adjust_variable_number_in_discovery_rule( + self, path_to_discovery_file: str + ) -> None: + """Adjusts the scala discovery file. + + Args: + path_to_discovery_file (str): path to discovery file + """ + pattern_number = int(os.path.basename(self.pattern_path).split("_")[0]) + with open(path_to_discovery_file, "r") as fp: + result = fp.readlines() + + # assume, that a scala files end with + # println() + # delete; + try: + println_line = result[ + result.index(list(filter(lambda line: "delete;" in line, result))[0]) + - 1 + ] + except IndexError: + logger.warning( + f'Could not find "delete;" in {os.path.relpath(path_to_discovery_file, self.instance_path)}' + ) + return + try: + real_number = re.search(r"println\(x(\d+)\)", println_line).group(1) + except AttributeError: + logger.warning( + f"Could not find the pattern number in {os.path.relpath(path_to_discovery_file, self.instance_path)}" + ) + return + # determine the name for the rule in scala file + # if there is more than one instance, it should be _i + # if this rule is for multiple patterns, it should be _iall + rule_name = ( + f'{self.pattern_name.lower()}_i{self.instance_name.split("_")[0]}' + if len(list_instances_jsons(self.pattern_path)) > 1 + and os.path.abspath(os.path.dirname(path_to_discovery_file)) + != os.path.abspath(self.pattern_path) + else f"{self.pattern_name}_iall" + ) + # make sure the number and the pattern name + new_rule = [] + for line in result: + new_line = line.replace(f"x{real_number}", f"x{pattern_number}") + new_rule += [ + re.sub( + f"({self.pattern_name}_i(\d+|all)|ID_pattern_name_i1)", + rule_name, + new_line, + ) + ] + + diff = [line for line in new_rule if line not in result] + if diff: + logger.info( + f"Changed lines in Scala rule for instance {self.instance_name}:\n{[line.strip() for line in diff]}" + ) + with open(path_to_discovery_file, "w") as fp: + fp.writelines(new_rule) + + def _check_rule_accuracy(self): + """Checks that there is a rule accuracy given if there is a rule given""" + instance_dict = read_json(self.instance_json_file) + if ( + instance_dict["discovery"]["rule"] + and not instance_dict["discovery"]["rule_accuracy"] + ): + logger.warning( + f"There is a rule, but no rule accuracy given for {self.instance_name}" + ) + + def _find_and_rename_file(self, file_ending: str): + """Checks if there is already an existing file with the expected name '_instance_.'. + If not, it gets all files with that fileending in the instance directory. If there is only one, and it is in the instance_path, + it will be renamed into the expected filename. + + Args: + file_ending (str): Ending of the files (without the `.` e.g. `txt`) + """ + expected_abs_filepath = os.path.join(self.instance_path, f"{self.instance_name}.{file_ending}") + if os.path.isfile(expected_abs_filepath): + return + # list all files with that fileending in the instance + files_with_this_ending = get_files_with_ending(self.instance_path, f".{file_ending}", recursive=True) + if len(files_with_this_ending) == 1 and os.path.exists(os.path.join(self.instance_path, os.path.basename(files_with_this_ending[0]))): + # There is only one file with the file ending in the instance_path directory + os.rename(files_with_this_ending[0], expected_abs_filepath) + if files_with_this_ending[0] != expected_abs_filepath: + logger.info(f"Renamed file from {files_with_this_ending[0]} to {expected_abs_filepath}") + + def _repair_description(self) -> None: + """Checks if 'description' is given in an instance dict, removes the key, when it is empty.""" + instance_dict = read_json(self.instance_json_file) + if "description" not in instance_dict.keys(): + logger.warning( + f"Instance description for {self.instance_name} does not exist." + ) + return + if not instance_dict["description"]: + instance_dict.pop("description") + logger.warning( + f"Instance description for {self.instance_name} is empty, deleting it." + ) + write_json(self.instance_json_file, instance_dict) + + def _repair_discovery_rule(self) -> None: + """Repairs the discovery rule of a pattern instance""" + self._find_and_rename_file("sc") + instance_dict = read_json(self.instance_json_file) + path_to_discovery_rule = os.path.join( + self.instance_path, f"{self.instance_name}.sc" + ) + expected_file = ( + f".{os.sep}{os.path.relpath(path_to_discovery_rule, self.instance_path)}" + ) + real = ( + instance_dict["discovery"]["rule"] + if instance_dict["discovery"]["rule"] + else "" + ) + real_path = os.path.join(self.instance_path, real) + # check if there is already a path to a discovery rule given, and if this path is valid + if os.path.isfile(real_path): + if expected_file == real: + # the file path is correct, just check the structure of the file + self._repair_discovery_rule_structure(real_path) + return + else: + self._repair_discovery_rule_structure(real_path) + return + # given value is not a real file, so check if there is nevertheless a discovery rule with the expected name + if not os.path.isfile(path_to_discovery_rule): + logger.info( + f"Could not find discovery rule for {self.instance_name}, added sc file" + ) + logger.warning(f"Please adjust discovery rule of {self.instance_name}") + shutil.copy( + get_template_instance_discovery_rule_path( + self.path_to_testability_patterns + ), + path_to_discovery_rule, + ) + # adapt scala file + self._repair_discovery_rule_structure(path_to_discovery_rule) + # adapt JSON file + instance_dict["discovery"]["rule"] = expected_file + write_json(self.instance_json_file, instance_dict) + + def _repair_discovery_rule_structure(self, path_to_discovery_file: str) -> None: + self._adjust_variable_number_in_discovery_rule(path_to_discovery_file) + self._check_rule_accuracy() + + def repair_instance_json(self) -> None: + """Repairs the instance JSON of the pattern. + Meaning, it makes sure that the JSON file is there, + has all necessary keys and the description points to a markdown file containing the description.""" + if not os.path.isfile(self.instance_json_file): + logger.info( + f"Could not find instance JSON for {self.instance_name}, copying template" + ) + shutil.copy( + get_template_instance_json_path(self.path_to_testability_patterns), + self.instance_json_file, + ) + repair_keys_of_json( + self.instance_json_file, + get_template_instance_json_path(self.path_to_testability_patterns), + INSTANCE_JSON_NOT_MANDATORY_KEYS, + ) + self._repair_description() + + def repair(self) -> str: + self._repair_discovery_rule() diff --git a/tp_framework/pattern_repair/pattern_repair.py b/tp_framework/pattern_repair/pattern_repair.py new file mode 100644 index 0000000..941aede --- /dev/null +++ b/tp_framework/pattern_repair/pattern_repair.py @@ -0,0 +1,180 @@ +import os +import shutil +import logging + +from copy import deepcopy +from pathlib import Path + +from pattern_repair.utils import ( + assert_pattern_valid, + repair_keys_of_json, + get_template_pattern_json_path, + read_json, + write_json, + compare_dicts, + get_files_with_ending, + list_instances_jsons, +) +from pattern_repair.README_generator import READMEGenerator +# This import is needed, because otherwise it would not be possible to get the instance repair class for a certain language +from pattern_repair.PHP.instance_repair_php import InstanceRepairPHP + +from core.utils import check_lang_tp_lib_path, get_id_from_name +from core import loggermgr + +logger = logging.getLogger(loggermgr.logger_name(__name__)) + + +class PatternRepair: + def __init__( + self, + path_to_pattern: Path, + language: str, + tp_lib_path: Path, + discovery_rule_results: str = "", + masking_file: str = "", + all_measurement_results: str = "", + ) -> None: + check_lang_tp_lib_path(Path(os.path.join(tp_lib_path, language.upper()))) + assert_pattern_valid(path_to_pattern) + + # user defined constants + self.pattern_path = path_to_pattern + self.pattern_name = os.path.basename(self.pattern_path) + self.pattern_id = get_id_from_name(self.pattern_name) + self.language = language + self.pattern_json_file = None + self.discovery_rule_results = discovery_rule_results + self.masking_file = masking_file + self.all_measurement_results = all_measurement_results + self.tp_lib_path = tp_lib_path + + # get repair for specific language + try: + self.instance_repair_class = globals()[f"InstanceRepair{language.upper()}"] + except KeyError: + logger.error( + f"InstanceRepair{language.upper()} could not be found, maybe it is not imported?" + ) + exit(1) + + def _find_instances_json(self) -> list: + """Gets all pattern instance jsons as relative paths + + Returns: + list: list of relative paths to JSON files. + """ + # + pattern_instances = list_instances_jsons(self.pattern_path) + if not pattern_instances: + return [] + # get the relative path for instances + pattern_instances_rel_path = [ + f".{os.sep}{os.path.relpath(str(pattern_instance_path), self.pattern_path)}" + for pattern_instance_path in pattern_instances + ] + return pattern_instances_rel_path + + def _repair_documentation(self) -> None: + """Makes sure, the pattern description is in a `./docs/description.md` and the field in the JSON file points to that markdown file.""" + # make sure ./docs/description.md exists + docs_directory = os.path.join(self.pattern_path, "docs") + description_file_path = os.path.join(docs_directory, "description.md") + os.makedirs(docs_directory, exist_ok=True) + open(description_file_path, "a").close() + + # check out the "description" field in the pattern JSON file + json_dict = read_json(self.pattern_json_file) + description_in_json = json_dict["description"] + rel_path_to_description = ( + f".{os.sep}{os.path.relpath(description_file_path, self.pattern_path)}" + ) + if rel_path_to_description == description_in_json: + # the description_in_json is already the right path + if not os.stat(description_file_path).st_size: + logger.info(f"Description for {self.pattern_name} is missing") + return + + # set the description field point to ./docs/description.md + json_dict["description"] = rel_path_to_description + original_description = [] + with open(description_file_path, "r") as fp: + original_description = fp.readlines() + original_description += [description_in_json] + with open(description_file_path, "w") as fp: + fp.write("\n".join(original_description)) + write_json(self.pattern_json_file, json_dict) + if description_in_json: + logger.info(f"Changed {description_file_path} in pattern JSON") + else: + logger.info(f"Description for Pattern {self.pattern_name} is missing") + + def _repair_instances(self) -> None: + """Repairs instances of that pattern, using the instance repair class as well.""" + all_instances = list_instances_jsons(self.pattern_path) + if not all_instances: + logger.error(f"Pattern {self.pattern_name} has no instances") + exit(1) + for instance_json in all_instances: + self.instance_repair_class( + self.language, self.pattern_path, instance_json, self.tp_lib_path + ).repair() + + def _repair_pattern_json(self) -> None: + """Repairs the JSON file of the pattern""" + # check if pattern json file exists, if not copy the template + pattern_json = os.path.join(self.pattern_path, f"{self.pattern_name}.json") + self.pattern_json_file = pattern_json + if not os.path.isfile(pattern_json): + logger.info("Could not find Pattern JSON, copying the template") + shutil.copy(get_template_pattern_json_path(self.tp_lib_path), pattern_json) + repair_keys_of_json( + self.pattern_json_file, get_template_pattern_json_path(self.tp_lib_path) + ) + + # get the content of the pattern json + pattern_dict = read_json(pattern_json) + + # adapt the fields (name, family, tags, instances) of the pattern_dict for the fields + new_pattern_dict = deepcopy(pattern_dict) + new_pattern_dict["name"] = " ".join(self.pattern_name.split("_")[1:]).title() + new_pattern_dict["family"] = f"code_pattern_{self.language.lower()}" + if "LANG" in new_pattern_dict["tags"]: + new_pattern_dict["tags"] = ["sast", self.language.lower()] + new_pattern_dict["instances"] = self._find_instances_json() + new_pattern_dict["version"] = ( + new_pattern_dict["version"] if new_pattern_dict["version"] else "v0.draft" + ) + + # compare with original dict and if something has changed write the new dict to file + dict_diff = compare_dicts(pattern_dict, new_pattern_dict) + if dict_diff: + write_json(pattern_json, new_pattern_dict) + self._repair_documentation() + + def _repair_pattern_README(self) -> None: + """Repairs the README file of the pattern""" + all_md_files = get_files_with_ending(self.pattern_path, ".md") + if len(all_md_files) == 1: + os.rename(all_md_files[0], os.path.join(self.pattern_path, "README.md")) + pattern_measurement = os.path.join( + self.all_measurement_results, self.pattern_name + ) + instance_jsons = list_instances_jsons(self.pattern_path) + new_readme = READMEGenerator( + self.pattern_path, + self.language, + self.tp_lib_path, + instance_jsons, + self.discovery_rule_results, + pattern_measurement, + self.masking_file, + ).generate_README() + with open(os.path.join(self.pattern_path, "README.md"), "w") as file: + file.write(new_readme) + + def repair(self, should_include_readme: bool = True): + self._repair_pattern_json() + self._repair_instances() + if should_include_readme: + self._repair_pattern_README() diff --git a/tp_framework/pattern_repair/pattern_repair_interface.py b/tp_framework/pattern_repair/pattern_repair_interface.py new file mode 100644 index 0000000..1660432 --- /dev/null +++ b/tp_framework/pattern_repair/pattern_repair_interface.py @@ -0,0 +1,48 @@ +from pathlib import Path + +from core import utils +from core.pattern import get_pattern_path_by_pattern_id +from pattern_repair.pattern_repair import PatternRepair + + +def repair_patterns( + language: str, + pattern_ids: list[int], + include_README: bool, + checkdiscoveryrule_results: Path, + measurement_results: Path, + masking_file: Path, + tp_lib_path: Path, + output_dir: Path, +) -> None: + """Interface, that starts a pattern repair + + Args: + language (str): language of the targetted patterns + pattern_ids (list[int]): list of pattern ids + checkdiscoveryrule_results (Path): results of `checkdiscoveryrules` run with tp-framework, for all patterns to repair + measurement_results (Path): results of `measure` run with tp-framework, for all patterns to repair + masking_file (Path): file that can be used to Mask the name of tools, if they should be kept secret + tp_lib_path (Path): Path to tesability pattern library + output_dir (Path): Output dir for any written data + """ + print("Pattern Repair started...") + should_include_readme = not include_README + utils.check_tp_lib(tp_lib_path) + if should_include_readme: + utils.check_file_exist(checkdiscoveryrule_results) + utils.check_file_exist(masking_file, ".json") if masking_file else None + utils.check_measurement_results_exist(measurement_results) + output_dir.mkdir(exist_ok=True, parents=True) + utils.add_loggers(output_dir) + + for pattern_id in pattern_ids: + pattern_path = get_pattern_path_by_pattern_id(language, pattern_id, tp_lib_path) + PatternRepair( + pattern_path, + language, + tp_lib_path, + checkdiscoveryrule_results, + masking_file, + measurement_results, + ).repair(should_include_readme) diff --git a/tp_framework/pattern_repair/utils.py b/tp_framework/pattern_repair/utils.py new file mode 100644 index 0000000..dd29130 --- /dev/null +++ b/tp_framework/pattern_repair/utils.py @@ -0,0 +1,298 @@ +import csv +import json +import logging + +from collections import defaultdict +from os import path, listdir, walk +from pathlib import Path + +from core.errors import templateDirDoesNotExist +from core.exceptions import TemplateDoesNotExist, PatternDoesNotExists, FileDoesNotExist +from core.utils import get_exception_message +from core import loggermgr + +logger = logging.getLogger(loggermgr.logger_name(__name__)) + +INSTANCE_JSON_NOT_MANDATORY_KEYS = ["description", "reporting"] + + +def assert_pattern_valid(path_to_pattern: Path) -> None: + """Asserts that a pattern is a valid directory + + Args: + path_to_pattern (Path): absolute path to a pattern + + Raises: + e: PatternDoesNotExists error, when pattern does not exist. + """ + if not Path(path_to_pattern).is_dir(): + e = PatternDoesNotExists(path.basename(path_to_pattern)) + logger.error(get_exception_message(e)) + raise e + + +def compare_dicts(old_dict, new_dict) -> dict: + return { + k: old_dict[k] for k in old_dict if k in new_dict and old_dict[k] != new_dict[k] + } + + +def get_dict_keys(d: dict) -> list: + """Returns a list of keys in a multidimensional dict. + The keynames are seperated by `:` i.e. `level1_key:level2_key` + + Args: + d (dict): a multidimensional dict + + Returns: + list: all keys from all dict level + """ + all_keys = [] + current_keys = d.keys() + for k in current_keys: + if isinstance(d[k], dict): + sub_keys = get_dict_keys(d[k]) + all_keys += [f"{k}:{sk}" for sk in sub_keys] + else: + all_keys += [k] + return all_keys + + +def get_instance_name(path_to_instance) -> str: + return " ".join(path.basename(path_to_instance).split("_")[:2]).title() + + +def get_files_with_ending( + path_to_dir: str, file_ending: str, recursive: bool = False + ) -> list: + """Returns all files with a certain ending. Be sure to include the `.` when passing the `file_ending` argument, i.e. `file_ending='.txt'`. + + Args: + path_to_dir (str): Directories from which the files should be listed. + file_ending (str): The ending of the files, that should be filtered for. + recursive (bool, optional): Should the algorithm go through the directory recursivly?. Defaults to False. + + Returns: + list: all filepaths, to files in the directory, having the `file_ending`. + """ + matches = [] + for root, _, filenames in walk(path_to_dir): + for filename in filter(lambda f: f.endswith(file_ending), filenames): + matches.append(path.join(root, filename)) + return ( + matches + if recursive + else sorted( + [ + path.join(path_to_dir, f) + for f in filter( + lambda filename: Path(filename).suffix == file_ending, + listdir(path_to_dir), + ) + ] + ) + ) + + +def get_template_dir_path(tp_lib_path) -> str: + template_path = path.join(tp_lib_path, "pattern_template", "ID_pattern_name") + if not path.isdir(template_path): + e = TemplateDoesNotExist(templateDirDoesNotExist(template_path)) + logger.error(get_exception_message(e)) + raise e + return template_path + + +def get_template_pattern_json_path(tp_lib_path) -> str: + template__pattern_json_path = path.join( + get_template_dir_path(tp_lib_path), "ID_pattern_name.json" + ) + if not path.isfile(template__pattern_json_path): + e = TemplateDoesNotExist(templateDirDoesNotExist(template__pattern_json_path)) + logger.error(get_exception_message(e)) + raise e + return template__pattern_json_path + + +def get_template_instance_path(tp_lib_path) -> str: + template_instance_path = path.join( + get_template_dir_path(tp_lib_path), "IID_instance_ID_pattern_name" + ) + if not path.isdir(template_instance_path): + e = TemplateDoesNotExist(templateDirDoesNotExist(template_instance_path)) + logger.error(get_exception_message(e)) + raise e + return template_instance_path + + +def get_template_instance_json_path(tp_lib_path) -> str: + template_instance_json_path = path.join( + get_template_instance_path(tp_lib_path), "IID_instance_ID_pattern_name.json" + ) + if not path.isfile(template_instance_json_path): + e = TemplateDoesNotExist(templateDirDoesNotExist(template_instance_json_path)) + logger.error(get_exception_message(e)) + raise e + return template_instance_json_path + + +def get_template_instance_discovery_rule_path(tp_lib_path) -> str: + template_instance_discovery_rule_path = path.join( + get_template_instance_path(tp_lib_path), "pattern_discovery_rule.sc" + ) + if not path.isfile(template_instance_discovery_rule_path): + e = TemplateDoesNotExist( + templateDirDoesNotExist(template_instance_discovery_rule_path) + ) + logger.error(get_exception_message(e)) + raise e + return template_instance_discovery_rule_path + + +def get_language_by_file_ending(filename: str) -> str: + """Returns the language, by simply looking at the suffix of the file + + Args: + filename (str): name of a file + + Raises: + NotImplementedError: if the suffix is not yet supported, the function raises a NotImplementedError. + + Returns: + str: language + """ + if Path(filename).suffix == ".py": + return "python" + if Path(filename).suffix == ".php": + return "php" + if Path(filename).suffix == ".js": + return "javascript" + if Path(filename).suffix == ".java": + return "java" + if Path(filename).suffix == ".sc": + return "scala" + if Path(filename).suffix == ".bash": + return "bash" + raise NotImplementedError( + f"The ending of the given filename {filename} is not yet supported" + ) + + +def list_directories(path_to_parent_directory: str): + return list(filter(lambda x: path.isdir(x), [path.join(path_to_parent_directory, f) for f in listdir(path_to_parent_directory)])) + + +def list_instances_jsons(path_to_pattern: str | Path): + return [ + path.join(instance, f"{path.basename(instance)}.json") + for instance in filter( + lambda x: path.isdir(x) and path.basename(x)[0].isdigit(), + list_directories(path_to_pattern), + ) + ] + + +def read_json(path_to_json: str) -> dict: + result = {} + try: + with open(path_to_json, "r") as json_file: + result = json.load(json_file) + except json.JSONDecodeError as err: + raise Exception(f"JSON is corrupt, please check {path_to_json}") from err + if not result: + logger.error(f"JSON file is empty") + return result + + +def read_file(path_to_file: str) -> str: + try: + with open(path_to_file, "r") as file: + ret = file.read() + except Exception: + e = FileDoesNotExist( + f"The file {path_to_file} you wanted to read does not exist or is corrupt. Cannot read the file." + ) + logger.error(get_exception_message(e)) + raise e + return ret + + +def read_csv_to_dict(path_to_file: str) -> dict: + """Reads a csv file into a dictionary, the csv file must contain the columns 'pattern_id', 'instance_id', 'language', 'successful' + The dict will have the form: + {: {: {: }}} + + Args: + path_to_file (str): path to csv file (with discovery rule results) + + Returns: + dict: defaultdict of dicts + """ + res = [] + with open(path_to_file, "r") as csvfile: + r = csv.reader(csvfile, delimiter=",") + headings = next(r) + wanted_columns = ["pattern_id", "instance_id", "language", "successful"] + wanted_idx = [headings.index(w) for w in wanted_columns] + assert len(wanted_idx) == len( + wanted_columns + ), f"Could not find wanted column names in csv {path_to_file}" + res = [[line[i] for i in wanted_idx] for line in r] + ret = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) + for line in res: + ret[line[2]][line[0]][line[1]] = line[3] + return ret + + +def repair_dict_keys( + tested_dict: dict, ground_truth_dict: dict, not_mandatory_keys: list = [] +) -> None: + """Modifies `tested_dict` and inserts all keys from `ground_truth_dict`, that are not in `tested_dict`, except they are in `not_mandatory_keys`. + + Args: + tested_dict (dict): Dict that has potentially missing keys. + ground_truth_dict (dict): Dict that has all necessary keys + not_mandatory_keys (list, optional): list of keys in `ground_truth_dict` that are not mandatory. Defaults to []. + """ + tested_keys = set(tested_dict.keys()) + ground_truth_keys = set(ground_truth_dict.keys()) + + common_keys = set.intersection(tested_keys, ground_truth_keys) + for k in common_keys: + if isinstance(tested_dict[k], dict) and isinstance(ground_truth_dict[k], dict): + repair_dict_keys(tested_dict[k], ground_truth_dict[k], not_mandatory_keys) + if isinstance(tested_dict[k], dict) != isinstance(ground_truth_dict[k], dict): + logger.warning( + f'One of the values for "{k}" is a dict, the other one is not' + ) + + missing_keys = ground_truth_keys - tested_keys + unexpected_keys = tested_keys - ground_truth_keys + for key in missing_keys: + if key in not_mandatory_keys: + continue + tested_dict[key] = "" + logger.info(f'Added "{key}"') + if unexpected_keys: + logger.warning(f'Keys "{list(unexpected_keys)}" is unexpected') + + +def repair_keys_of_json( + path_to_json_tested: str, + path_to_json_ground_truth: str, + not_mandatory_keys: list = [], +) -> None: + # checks if all keys from path_to_json_ground_truth are in path_to_json_tested, if not it adds them + tested_json_dict = read_json(path_to_json_tested) + template_json_dicts = read_json(path_to_json_ground_truth) + repair_dict_keys(tested_json_dict, template_json_dicts, not_mandatory_keys) + write_json(path_to_json_tested, tested_json_dict) + + +def translate_bool(to_translate: bool) -> str: + return "yes" if to_translate else "no" + + +def write_json(path_to_json: str, result_dict: dict) -> None: + with open(path_to_json, "w") as json_file: + json.dump(result_dict, json_file, indent=4)