diff --git a/docs/How-to-run-CLI-Usage.md b/docs/How-to-run-CLI-Usage.md
index e76f2eb..b7edb47 100644
--- a/docs/How-to-run-CLI-Usage.md
+++ b/docs/How-to-run-CLI-Usage.md
@@ -22,6 +22,8 @@ The following main commands are currently implemented:
- [`discovery`](./How-to-run-discover-measured-patterns.md): discover measured patterns within a project source code
- [`manual-discovery`](./How-to-run-manual-discovery.md): execute discovery rules (normally associated to patterns) within a project source code
- reporting: create reports about SAST measurement and/or pattern discovery (**TODO**)
+- [`checkdiscoveryrules`](./How-to-run-checkdiscoveryrules.md): Check/test the discovery rules of the pattern instances on the pattern instances themselves.
+- [`patternrepair`](./How-to-run-patternrepair.md): Helps you keeping your pattern catalogue nice and tidy.
The following are under-investigation:
diff --git a/docs/How-to-run-Measure-SAST-tools-over-patterns.md b/docs/How-to-run-Measure-SAST-tools-over-patterns.md
index 57bf5dd..43c66bf 100644
--- a/docs/How-to-run-Measure-SAST-tools-over-patterns.md
+++ b/docs/How-to-run-Measure-SAST-tools-over-patterns.md
@@ -52,6 +52,10 @@ Instead of specifying certain pattern ids, you can use `-a`.
]
```
+The value in `result` is a boolean value, where 'true' signifies the tool's correct output matching the expected result, while 'false' indicates an incorrect outcome.
+For example, if it is expected, that the pattern does not contain a vulnerability, ('expectation': false) and the tool result is 'no vulnerability', than the value of `result` will be true.
+If it is expected that the pattern contains a vulnerability ('expectation': true) and the tool does not detect that, the `result` field will be false.
+
## Example
Here a simple example that will measure patterns 1, 2, 4 and 7 from the PHP catalog with 3 workers:
diff --git a/docs/How-to-run-checkdiscoveryrules.md b/docs/How-to-run-checkdiscoveryrules.md
new file mode 100644
index 0000000..48c2e2e
--- /dev/null
+++ b/docs/How-to-run-checkdiscoveryrules.md
@@ -0,0 +1,43 @@
+# How to run: Checkdiscoveryrules
+
+## Overview
+
+This commands allows to run the discovery rule on the pattern instance itself.
+
+## Command line
+
+To check discovery rules on your pattern run:
+
+```bash
+tpframework checkdiscoveryrules --help
+usage: tpframework [OPTIONS] COMMAND checkdiscoveryrules [-h] (--print | --export EXPORTFILE) -l LANGUAGE (-p PATTERN_ID [PATTERN_ID ...] | --pattern-range RANGE_START-RANGE_END | -a)
+ [--tp-lib TP_LIB_DIR] [-s NUMBER] [--output-dir OUTPUT_DIR]
+
+options:
+ -h, --help show this help message and exit
+ --print Print measurements on stdout.
+ --export EXPORTFILE Export measurements to the specified csv file.
+ -l LANGUAGE, --language LANGUAGE
+ Programming language targeted
+ -p PATTERN_ID [PATTERN_ID ...], --patterns PATTERN_ID [PATTERN_ID ...]
+ Specify pattern(s) ID(s) to test for discovery
+ --pattern-range RANGE_START-RANGE_END
+ Specify pattern ID range separated by`-` (ex. 10-50)
+ -a, --all-patterns Test discovery for all available patterns
+ --tp-lib TP_LIB_DIR Absolute path to alternative pattern library, default resolves to `./testability_patterns`
+ -s NUMBER, --timeout NUMBER
+ Timeout for CPG generation
+ --output-dir OUTPUT_DIR
+ Absolute path to the folder where outcomes (e.g., log file, export file if any) will be stored, default resolves to `./out`
+```
+
+## Example
+
+Here a simple example that will run checkdiscoveryrules on the first PHP pattern and print the results to the cmd.
+`tpframework checkdiscoveryrules -p 1 -l php --print`
+
+Note: Minimum requirement for this command is a pattern, a language and either `--print` or `--export`.
+
+## Required fields in instance `json` metadata
+
+The explanation for the instance `json` metadata can be found [here](https://github.com/testable-eu/sast-testability-patterns/blob/master/docs/testability-patterns-structure.md)
\ No newline at end of file
diff --git a/docs/How-to-run-patternrepair.md b/docs/How-to-run-patternrepair.md
new file mode 100644
index 0000000..418b6cc
--- /dev/null
+++ b/docs/How-to-run-patternrepair.md
@@ -0,0 +1,62 @@
+# How to run: Patternrepair
+
+## Overview
+
+This commands should help you reviewing your testability catalogue and keep it nice and tidy.
+It might also help you, repair patterns, that are broken.
+
+## Command line
+
+To start repairing/reviewing your patterns runs:
+
+```bash
+tpframework patternrepair --help
+usage: tpframework [OPTIONS] COMMAND patternrepair [-h] -l LANGUAGE (-p PATTERN_ID [PATTERN_ID ...] | --pattern-range RANGE_START-RANGE_END | -a) [--tp-lib TP_LIB_DIR]
+ [--output-dir OUTPUT_DIR] [--masking-file MASKING_FILE] [--measurement-results MEASUREMENT_DIR]
+ [--checkdiscoveryrules-results CHECKDISCOVERYRULES_FILE] [--skip-readme]
+
+options:
+ -h, --help show this help message and exit
+ -l LANGUAGE, --language LANGUAGE
+ Programming language targeted
+ -p PATTERN_ID [PATTERN_ID ...], --patterns PATTERN_ID [PATTERN_ID ...]
+ Specify pattern(s) ID(s) to test for discovery
+ --pattern-range RANGE_START-RANGE_END
+ Specify pattern ID range separated by`-` (ex. 10-50)
+ -a, --all-patterns Test discovery for all available patterns
+ --tp-lib TP_LIB_DIR Absolute path to alternative pattern library, default resolves to `./testability_patterns`
+ --output-dir OUTPUT_DIR
+ Absolute path to the folder where outcomes (e.g., log file, export file if any) will be stored, default resolves to `./out`
+ --masking-file MASKING_FILE
+ Absolute path to a json file, that contains a mapping, if the name for some measurement tools should be kept secret, default is None
+ --measurement-results MEASUREMENT_DIR
+ Absolute path to the folder where measurement results are stored, default resolves to `./measurements`
+ --checkdiscoveryrules-results CHECKDISCOVERYRULES_FILE
+ Absolute path to the csv file, where the results of the `checkdiscoveryrules` command are stored, default resolves to `./checkdiscoveryrules.csv`
+ --skip-readme If set, the README generation is skipped.
+```
+
+Note: At the moment only `patternrepair` for PHP is supported. Support your own language by writing an `InstanceRepair` class, that inherits from `InstanceRepair`.
+
+The `patternrepair` enforces the pattern structure as described [here](https://github.com/testable-eu/sast-testability-patterns/blob/master/docs/testability-patterns-structure.md).
+To do so, it is seperated into different steps:
+
+- `PatternRepair`: This will check the pattern JSON file, correct the references to the instance json files.
+- `InstanceRepair`: This will check and correct the instance json file for each instance. At the moment, only PHP patterns are supported.
+ - It generates opcode for every PHP file.
+ - It checks for the comments `// source` and `// sink` in the file in order to fill in the source and sink line in the correspoding instance json file.
+- `READMEGenerator`: This creates a README file for a pattern based on the JSON files. If you want to skip the generation of the README file, use the `--skip-readme` flag. As the README includes results of `measure` and `checkdiscoveryresults`, valid filepaths for these must be provided, when generating a README file.
+
+## Example
+
+Note: Minimum requirement for this command is a pattern and a language.
+
+### Example 1
+
+Here a simple example that will run patternrepair on the first PHP pattern without generating a new README file for that pattern.
+`tpframework patternrepair -p 1 -l php --skip-readme`
+
+### Example 2
+
+Here an example for a patternrepair, that repairs all php patterns and generates a new readme for each pattern.
+`tpframework patternrepair -a -l php --measurement-results ./your_measurement_results --checkdiscoveryrules-results ./your_results.csv`
diff --git a/qualitytests/patternrepair/__init__.py b/qualitytests/patternrepair/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/qualitytests/patternrepair/test_markdown.py b/qualitytests/patternrepair/test_markdown.py
new file mode 100644
index 0000000..616e699
--- /dev/null
+++ b/qualitytests/patternrepair/test_markdown.py
@@ -0,0 +1,80 @@
+import pytest
+
+from qualitytests.qualitytests_utils import join_resources_path
+
+from pattern_repair.README_markdown_elements import *
+from pattern_repair.README_generator import READMEGenerator
+
+
+class TestMarkdownPatternRepair:
+
+ def test_markdown_code(self):
+ code = MarkdownCode('\n\nMore\n\n\nHello\n\n\n' == coll.to_markdown()
+
+ def test_markdown_string(self):
+ s = MarkdownString("Test")
+ assert "\nTest\n" == s.to_markdown()
+
+ def test_markdown_link(self):
+ link = MarkdownLink("Test", MarkdownHeading("Heading 1", 3))
+ assert "[Test](#heading-1)" == link.to_markdown()
+
+ def test_markdown_table(self):
+ test_content = {"0::column1": ["value1", "value1.1"], "column2": ["value2"]}
+ tab = MarkdownTable(test_content)
+ expected_tab = "\n| column1 | column2 |\n"
+ expected_tab += "|-----------|-----------|\n"
+ expected_tab += "| value1 | value2 |\n"
+ expected_tab += "| value1.1 | |\n"
+ assert expected_tab == tab.to_markdown()
+
+ def test_markdown_document(self):
+ coll = MarkdownCollapsible([MarkdownString("Hello")], MarkdownString("More"))
+ doc = MarkdownDocument([coll])
+ assert '\n\nMore
\n\nHello\n\n \n' == doc.to_markdown()
+
+ def test_README_generation_one_instance(self):
+ path_to_test_pattern = join_resources_path("sample_patlib/PHP/2_global_variables")
+ path_to_tplib = join_resources_path("sample_patlib")
+ instance_jsons = [path_to_test_pattern / "1_instance_2_global_variables" / "1_instance_2_global_variables.json"]
+ md_doc = READMEGenerator(path_to_test_pattern, 'php', path_to_tplib, instance_jsons)._generate_README_elements()
+
+ assert 14 == len(md_doc.content)
+ assert isinstance(md_doc.content[0], MarkdownComment)
+ assert isinstance(md_doc.content[1], MarkdownHeading) # Global Variables
+ assert isinstance(md_doc.content[2], MarkdownString) # Tags: ...
+ assert isinstance(md_doc.content[3], MarkdownString) # Version: ...
+ assert isinstance(md_doc.content[4], MarkdownHeading) # Description
+ assert isinstance(md_doc.content[5], MarkdownString) #
+ assert isinstance(md_doc.content[6], MarkdownHeading) # Overview
+ assert isinstance(md_doc.content[7], MarkdownTable) #
+ assert isinstance(md_doc.content[8], MarkdownHeading) # Instance 1
+ assert isinstance(md_doc.content[9], MarkdownHeading) # Code
+ assert isinstance(md_doc.content[10], MarkdownCode) #
+ assert isinstance(md_doc.content[11], MarkdownHeading) # Instance Properties
+ assert isinstance(md_doc.content[12], MarkdownTable) #
+ assert isinstance(md_doc.content[13], MarkdownCollapsible) # More
+
+ assert 2 == len(md_doc.content[13].content)
+ assert isinstance(md_doc.content[13].content[0], MarkdownCollapsible) # Compile
+ assert 1 == len(md_doc.content[13].content[0].content)
+ assert isinstance(md_doc.content[13].content[0].content[0], MarkdownCode) #
+
+ assert isinstance(md_doc.content[13].content[1], MarkdownCollapsible) # Discovery
+ assert 3 == len(md_doc.content[13].content[1].content)
+ assert isinstance(md_doc.content[13].content[1].content[0], MarkdownString) #
+ assert isinstance(md_doc.content[13].content[1].content[1], MarkdownCode) #
+ assert isinstance(md_doc.content[13].content[1].content[2], MarkdownTable) #
\ No newline at end of file
diff --git a/qualitytests/patternrepair/test_pattern_repair.py b/qualitytests/patternrepair/test_pattern_repair.py
new file mode 100644
index 0000000..d64e1f8
--- /dev/null
+++ b/qualitytests/patternrepair/test_pattern_repair.py
@@ -0,0 +1,61 @@
+import pytest
+import os
+import shutil
+from pathlib import Path
+
+from pattern_repair.pattern_repair import PatternRepair
+from pattern_repair.PHP.instance_repair_php import InstanceRepairPHP
+
+from qualitytests.qualitytests_utils import join_resources_path
+
+@pytest.fixture(autouse=True)
+def run_around_tests():
+ # Code that will run before the test
+ path_to_test_pattern = join_resources_path("sample_patlib/PHP/5_pattern_to_repair")
+ path_to_save = join_resources_path("sample_patlib/PHP/5_pattern_to_repair_copy")
+ # copy the directory, to save it
+ shutil.copytree(path_to_test_pattern, path_to_save)
+
+ # A test function will be run at this point
+ yield
+
+ # Code that will run after the test
+ # restore the saved pattern
+ shutil.rmtree(path_to_test_pattern)
+ os.rename(path_to_save, path_to_test_pattern)
+ assert os.path.exists(path_to_test_pattern)
+
+class TestPatternRepair:
+ def test_repair_test_pattern_assert_files_exist(self):
+ path_to_test_pattern = join_resources_path("sample_patlib/PHP/5_pattern_to_repair")
+ instance_path = path_to_test_pattern / "1_instance_5_pattern_to_repair"
+ assert os.path.exists(instance_path)
+
+ PatternRepair(path_to_test_pattern, "PHP", join_resources_path("sample_patlib")).repair(True)
+
+ expected_pattern_json = path_to_test_pattern / "5_pattern_to_repair.json"
+ assert expected_pattern_json.is_file()
+ expected_instance_json = instance_path / "1_instance_5_pattern_to_repair.json"
+ assert expected_instance_json.is_file()
+ expected_instance_php = instance_path / "1_instance_5_pattern_to_repair.php"
+ assert expected_instance_php.is_file()
+ expected_instance_bash = instance_path / "1_instance_5_pattern_to_repair.bash"
+ assert expected_instance_bash.is_file()
+ expected_instance_sc = instance_path / "1_instance_5_pattern_to_repair.sc"
+ assert expected_instance_sc.is_file()
+ expected_docs_dir = path_to_test_pattern / "docs"
+ assert expected_docs_dir.is_dir()
+ expected_description = expected_docs_dir / "description.md"
+ assert expected_description.is_file()
+ expected_README_file = path_to_test_pattern / "README.md"
+ assert expected_README_file.is_file()
+
+ def test_finding_source_and_sink_line(self):
+ path_to_test_pattern = join_resources_path("sample_patlib/PHP/5_pattern_to_repair")
+ instance_repair = InstanceRepairPHP("PHP", path_to_test_pattern, "", join_resources_path("sample_pathlib"))
+
+ path_to_php_file = path_to_test_pattern / "1_instance_5_pattern_to_repair" / "test.php"
+
+ source, sink = instance_repair._get_source_and_sink_for_file(path_to_php_file)
+ assert 2 == source
+ assert 3 == sink
\ No newline at end of file
diff --git a/qualitytests/patternrepair/test_pattern_repair_utils.py b/qualitytests/patternrepair/test_pattern_repair_utils.py
new file mode 100644
index 0000000..0a9de3d
--- /dev/null
+++ b/qualitytests/patternrepair/test_pattern_repair_utils.py
@@ -0,0 +1,75 @@
+import pytest
+from unittest.mock import patch
+
+from qualitytests.qualitytests_utils import join_resources_path
+
+from core.exceptions import PatternDoesNotExists
+from pattern_repair.utils import (
+ assert_pattern_valid, compare_dicts,
+ get_dict_keys, get_instance_name,
+ get_files_with_ending, get_language_by_file_ending,
+ list_instances_jsons, repair_keys_of_json
+ )
+
+class TestPatternRepairUtils:
+ def test_assert_pattern_valid(self):
+ path_to_non_existing_pattern = join_resources_path("100_non_existing")
+ with pytest.raises(PatternDoesNotExists) as e_info:
+ assert_pattern_valid(path_to_non_existing_pattern)
+ assert "Specified Pattern `100_non_existing` does not exists." in str(e_info.value)
+
+ def test_compare_dicts(self):
+ o_dict = {"key1": 1, "key2": 3, "key3": 2}
+ n_dict = {"key1": 1, "key3": 3, "key4": 42}
+ assert {'key3': 2} == compare_dicts(o_dict, n_dict)
+
+ def test_get_dict_keys(self):
+ d = {
+ "key1": {
+ "key1.1": 0,
+ "key1.2": {"key1.2.1": 0}
+ },
+ "key2": 42
+ }
+ assert set(["key1:key1.1", "key1:key1.2:key1.2.1", "key2"]) == set(get_dict_keys(d))
+
+ def test_get_instance_name(self):
+ path_to_pattern = join_resources_path("sample_patlib/PHP/5_pattern_to_repair")
+ path_to_instance = path_to_pattern / "1_instance_5_pattern_to_repair"
+
+ assert "1 Instance", get_instance_name(path_to_instance)
+
+ def test_get_files_with_ending(self):
+ path_to_pattern = join_resources_path("sample_patlib/PHP/3_global_array")
+ assert [] == get_files_with_ending(path_to_pattern, ".php")
+ expected_instance_1_php_file = str(path_to_pattern / "1_instance_3_global_array" / "1_instance_3_global_array.php")
+ expected_instance_2_php_file = str(path_to_pattern / "2_instance_3_global_array" / "2_instance_3_global_array.php")
+ assert set([expected_instance_1_php_file, expected_instance_2_php_file]) == set(get_files_with_ending(path_to_pattern, ".php", True))
+
+ def test_get_language_by_file_ending(self):
+ assert "python" == get_language_by_file_ending("test.py")
+ assert "php" == get_language_by_file_ending("test.php")
+ assert "javascript" == get_language_by_file_ending("test.js")
+ assert "java" == get_language_by_file_ending("test.java")
+ assert "scala" == get_language_by_file_ending("test.sc")
+ assert "bash" == get_language_by_file_ending("test.bash")
+
+ with pytest.raises(NotImplementedError) as e_info:
+ get_language_by_file_ending("")
+ assert "The ending of the given filename is not yet supported" in str(e_info.value)
+
+ def test_list_instance_jsons(self):
+ path_to_pattern = join_resources_path("sample_patlib/PHP/3_global_array")
+ expected_instance_1_json_file = str(path_to_pattern / "1_instance_3_global_array" / "1_instance_3_global_array.json")
+ expected_instance_2_json_file = str(path_to_pattern / "2_instance_3_global_array" / "2_instance_3_global_array.json")
+ assert set([expected_instance_1_json_file, expected_instance_2_json_file]) == set(list_instances_jsons(path_to_pattern))
+
+ def test_repair_keys_of_json(self):
+ json_dict_tested = {"a": 42, "b": {"b.0": 1}}
+ json_dict_ground_truth = {"a": 42, "b": {"b.0": 1, "b.1": 1}, "c": 42, "d": 36}
+ with patch("pattern_repair.utils.read_json") as read_json_mock, \
+ patch("pattern_repair.utils.write_json") as write_json_mock:
+ read_json_mock.side_effect = [json_dict_tested, json_dict_ground_truth]
+
+ repair_keys_of_json("", "", ["d"])
+ write_json_mock.assert_called_once_with("", {"a": 42, "b": {"b.0": 1, "b.1": ""}, "c": ""})
diff --git a/qualitytests/resources/sample_patlib/PHP/2_global_variables/2_global_variables.json b/qualitytests/resources/sample_patlib/PHP/2_global_variables/2_global_variables.json
index c905791..da72bf5 100644
--- a/qualitytests/resources/sample_patlib/PHP/2_global_variables/2_global_variables.json
+++ b/qualitytests/resources/sample_patlib/PHP/2_global_variables/2_global_variables.json
@@ -5,5 +5,6 @@
"tags": ["sast", "php", "php_v7.4.9"],
"instances": [
"./1_instance_2_global_variables/1_instance_2_global_variables.json"
- ]
+ ],
+ "version": "v0"
}
\ No newline at end of file
diff --git a/qualitytests/resources/sample_patlib/PHP/3_global_array/3_global_array.json b/qualitytests/resources/sample_patlib/PHP/3_global_array/3_global_array.json
index c71f100..518489e 100644
--- a/qualitytests/resources/sample_patlib/PHP/3_global_array/3_global_array.json
+++ b/qualitytests/resources/sample_patlib/PHP/3_global_array/3_global_array.json
@@ -1,9 +1,11 @@
{
+ "description": "",
"name": "Global Array",
"family": "code_pattern_php",
"tags": ["sast", "php", "php_v7.4.9"],
"instances": [
"./1_instance_3_global_array/1_instance_3_global_array.json",
"./2_instance_3_global_array/2_instance_3_global_array.json"
- ]
+ ],
+ "version": "v0"
}
\ No newline at end of file
diff --git a/qualitytests/resources/sample_patlib/PHP/5_pattern_to_repair/1_instance_5_pattern_to_repair/test.php b/qualitytests/resources/sample_patlib/PHP/5_pattern_to_repair/1_instance_5_pattern_to_repair/test.php
new file mode 100644
index 0000000..e5b689b
--- /dev/null
+++ b/qualitytests/resources/sample_patlib/PHP/5_pattern_to_repair/1_instance_5_pattern_to_repair/test.php
@@ -0,0 +1,3 @@
+ Path:
if not output_dir:
output_dir: str = str(config.RESULT_DIR)
try:
@@ -542,6 +631,19 @@ def parse_output_dir(output_dir: str):
exit(1)
+def parse_dir_or_file(path_to_file_or_dir: str,
+ default_path: str = config.RESULT_DIR,
+ name: str = "Output directory") -> Path:
+ if not path_to_file_or_dir:
+ path_to_file_or_dir: str = str(default_path)
+ try:
+ path_to_file_or_dir_as_path: Path = Path(path_to_file_or_dir).resolve()
+ return path_to_file_or_dir_as_path
+ except Exception as e:
+ print(f"{name} is wrong: {path_to_file_or_dir}")
+ exit(1)
+
+
def parse_tool_list(tools: list[str]):
if not tools:
return config.SAST_TOOLS_ENABLED
diff --git a/tp_framework/core/errors.py b/tp_framework/core/errors.py
index 8d0e99b..a3984a1 100644
--- a/tp_framework/core/errors.py
+++ b/tp_framework/core/errors.py
@@ -84,4 +84,17 @@ def discoveryRuleParsingResultError():
def unexpectedException(e):
- return f"Unexpected exception triggered: {e}."
\ No newline at end of file
+ return f"Unexpected exception triggered: {e}."
+
+# Pattern Repair
+
+def measurementResultsDirDoesNotExist():
+ return "The directory with the measurements does not exist."
+
+
+def fileDoesNotExist():
+ return "The file you provided for does not exist or is the wrong file type."
+
+
+def templateDirDoesNotExist(not_exisitng_dir_or_file):
+ return f"Your tplib does not have {not_exisitng_dir_or_file}."
\ No newline at end of file
diff --git a/tp_framework/core/exceptions.py b/tp_framework/core/exceptions.py
index 281221e..ff7cc79 100644
--- a/tp_framework/core/exceptions.py
+++ b/tp_framework/core/exceptions.py
@@ -121,4 +121,23 @@ def __init__(self, stderr=None):
self.message = stderr
else:
self.message = errors.discoveryRuleParsingResultError()
+ super().__init__(self.message)
+
+# Pattern Repair
+
+class MeasurementResultsDoNotExist(Exception):
+ def __init__(self, message=errors.measurementResultsDirDoesNotExist()):
+ self.message = message
+ super().__init__(self.message)
+
+
+class FileDoesNotExist(Exception):
+ def __init__(self, message=errors.fileDoesNotExist()):
+ self.message = message
+ super().__init__(self.message)
+
+
+class TemplateDoesNotExist(Exception):
+ def __init__(self, message=errors.templateDirDoesNotExist('template')) -> None:
+ self.message = message
super().__init__(self.message)
\ No newline at end of file
diff --git a/tp_framework/core/utils.py b/tp_framework/core/utils.py
index 5725b3f..9f12c5b 100644
--- a/tp_framework/core/utils.py
+++ b/tp_framework/core/utils.py
@@ -17,7 +17,8 @@
import config
from core import pattern, instance
from core.exceptions import PatternDoesNotExists, LanguageTPLibDoesNotExist, TPLibDoesNotExist, InvalidSastTools, \
- DiscoveryMethodNotSupported, TargetDirDoesNotExist, InvalidSastTool, PatternFolderNotFound, InstanceDoesNotExists
+ DiscoveryMethodNotSupported, TargetDirDoesNotExist, InvalidSastTool, PatternFolderNotFound, InstanceDoesNotExists, \
+ MeasurementResultsDoNotExist, FileDoesNotExist
from core import errors
@@ -46,6 +47,7 @@ def list_tpi_paths_by_tp_id(language: str, pattern_id: int, tp_lib_dir: Path) ->
def get_tpi_id_from_jsonpath(jp: Path) -> int:
return get_id_from_name(jp.parent.name)
+
def get_pattern_dir_from_id(pattern_id: int, language: str, tp_lib_dir: Path) -> Path:
tp_lib_dir_lang_dir: Path = tp_lib_dir / language
if tp_lib_dir_lang_dir.is_dir():
@@ -196,6 +198,24 @@ def get_discovery_rules(discovery_rule_list: list[str], discovery_rule_ext: str)
return list(discovery_rules_to_run)
+################################################################################
+# Pattern Repair
+#
+
+def check_measurement_results_exist(measurement_dir: Path):
+ if not measurement_dir.is_dir():
+ e = MeasurementResultsDoNotExist()
+ logger.error(get_exception_message(e))
+ raise e
+
+
+def check_file_exist(file_path: Path, file_suffix = ".csv"):
+ if not file_path.is_file() or not file_path.suffix == file_suffix:
+ e = FileDoesNotExist(file_path)
+ logger.error(get_exception_message(e))
+ raise e
+
+
################################################################################
# Others
#
@@ -291,7 +311,6 @@ def add_loggers(output_dir_path: Path, filename: str=None, console=True):
loggermgr.add_console_logger()
-
def get_operation_build_name_and_dir(op: str, src_dir: Path | None, language: str, output_dir: Path):
now = datetime.now()
if not src_dir:
@@ -351,4 +370,4 @@ def get_file_hash(fpath, bigfile=False):
else:
while chunk := f.read(8192):
hash.update(chunk)
- return hash.hexdigest()
\ No newline at end of file
+ return hash.hexdigest()
diff --git a/tp_framework/pattern_repair/PHP/__init__.py b/tp_framework/pattern_repair/PHP/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tp_framework/pattern_repair/PHP/generate_opcode.py b/tp_framework/pattern_repair/PHP/generate_opcode.py
new file mode 100644
index 0000000..54e9078
--- /dev/null
+++ b/tp_framework/pattern_repair/PHP/generate_opcode.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python3
+
+"""
+This script can be used to generate opcode for PHP patterns
+"""
+import os
+import logging
+import time
+
+from pattern_repair.utils import get_files_with_ending, read_json, write_json
+from core import loggermgr
+
+logger = logging.getLogger(loggermgr.logger_name(__name__))
+
+
+class PHPOpcodeGenerator:
+ """This class encapsulates the opcode generation for PHP files"""
+
+ def __init__(
+ self, pattern_instance_path: str, path_to_testability_patterns: str
+ ) -> None:
+ self.pattern_instance_path = pattern_instance_path
+ self.path_to_testability_patterns = path_to_testability_patterns
+
+ def _adjust_json_file(self, bash_file_name):
+ """Adapts the JSON file of the instance, that 'compile', 'binary' points to the new opcode file
+
+ Args:
+ bash_file_name (_type_): _description_
+ """
+ json_files_paths = get_files_with_ending(self.pattern_instance_path, ".json")
+ if not len(json_files_paths) == 1:
+ logger.error(
+ f"Expected one JSON file for {self.pattern_instance_path} got {len(json_files_paths)}"
+ )
+ exit(1)
+ result_dict = read_json(json_files_paths[0])
+ result_dict["compile"][
+ "binary"
+ ] = f".{os.sep}{os.path.relpath(bash_file_name, self.pattern_instance_path)}"
+ write_json(json_files_paths[0], result_dict)
+
+ def _mask_line(self, input_line: str, php_file: str) -> str:
+ """Should masquerades the opcode line, where the path to the php file is written.
+ If `php_file` cannot be found in `input_line`, the `input_line` is returned.
+
+ Args:
+ input_line (str): any line from bash code file.
+ php_file (str): path of php file.
+
+ Returns:
+ str: masked line, everything, that is before the testability pattern lib is cut with `/../`.
+ """
+ if not php_file in input_line:
+ return input_line
+ line_prefix = input_line.split(os.sep)[0]
+ line_suffix = input_line[input_line.rfind(".php") + 4 :]
+ actual_filepath = input_line.replace(line_prefix, "").replace(line_suffix, "")
+ new_path = f"{os.sep}...{os.sep}{os.path.relpath(actual_filepath, self.path_to_testability_patterns)}"
+ return line_prefix + new_path + line_suffix
+
+ def _make_optcode_from_php_file(self, php_file_path: str) -> str:
+ """Generates opcode for a php file.
+
+ Args:
+ php_file_path (str): Path to PHP file
+
+ Returns:
+ str: File path to the corresponding file containing the opcode.
+ """
+ # define necessary paths
+ php_file_path = os.path.abspath(php_file_path)
+ bash_file_path = f'{php_file_path.strip("ph")}bash'
+
+ # opcache will only compile and cache files older than the script execution start (https://www.php.net/manual/en/function.opcache-compile-file.php)
+ # therefor we have to modify the time the php file was created
+ one_minute_ago = time.time() - 60
+ os.utime(php_file_path, (one_minute_ago, one_minute_ago))
+
+ # Generate the bash file
+ os.system(
+ f"php -d zend_extension=opcache -d opcache.enable_cli=1 -d opcache.opt_debug_level=0x10000 --syntax-check {php_file_path} 2> {bash_file_path} 1>/dev/null"
+ )
+
+ # Sanitize the opcode: on some systems, there is an error included in the bash file
+ with open(bash_file_path, "r") as file:
+ result = file.readlines()
+ for idx, line in enumerate(result):
+ if line.startswith("$_main"):
+ result = result[max(idx - 1, 0) :]
+ break
+ # mask the path to file
+ final_lines = [self._mask_line(line, php_file_path) for line in result]
+ with open(bash_file_path, "w") as file:
+ file.writelines(final_lines)
+ return bash_file_path
+
+ def generate_opcode_for_pattern_instance(self) -> str:
+ """Generates the opcode for a pattern instance, and adjusts the JSON file accordingly.
+
+ Returns:
+ str: file path to the generated opcode.
+ """
+ php_files_paths = get_files_with_ending(
+ self.pattern_instance_path, ".php", recursive=True
+ )
+ if not php_files_paths:
+ logger.warning(
+ f"Expected one PHP file for {self.pattern_instance_path}, found {len(php_files_paths)}"
+ )
+ return []
+ bash_files = []
+ for php_file_path in php_files_paths:
+ bash_files += [self._make_optcode_from_php_file(php_file_path)]
+ if len(php_files_paths) == 1:
+ self._adjust_json_file(bash_files[-1])
+ return bash_files
diff --git a/tp_framework/pattern_repair/PHP/instance_repair_php.py b/tp_framework/pattern_repair/PHP/instance_repair_php.py
new file mode 100644
index 0000000..30fe34a
--- /dev/null
+++ b/tp_framework/pattern_repair/PHP/instance_repair_php.py
@@ -0,0 +1,157 @@
+import os
+import logging
+
+from pattern_repair.instance_repair import InstanceRepair
+from pattern_repair.PHP.generate_opcode import PHPOpcodeGenerator
+from pattern_repair.utils import read_json, write_json, get_files_with_ending
+
+from core import loggermgr
+
+logger = logging.getLogger(loggermgr.logger_name(__name__))
+
+
+class InstanceRepairPHP(InstanceRepair):
+ def _get_source_and_sink_for_file(self, path_to_file: str) -> tuple:
+ """Looks for '// source' and '// sink' in a file and returns the line numbers of these lines (index starting at 1)
+
+ Args:
+ path_to_file (str): path to the file source and sink should be found in.
+
+ Returns:
+ tuple: (source_line, sink_line) if one does not exists, it returns None for that.
+ """
+ with open(path_to_file, "r") as fp:
+ file_lines = fp.readlines()
+ sink = None
+ source = None
+ for idx, line in enumerate(file_lines):
+ if "// sink" in line:
+ sink = idx + 1
+ if "// source" in line:
+ source = idx + 1
+ return (source, sink)
+
+ def _repair_json_field_with_path(
+ self, instance_dict: dict, file_ending: str, keyword1: str, keyword2: str
+ ) -> dict:
+ """Checks if the path in the JSON, identified by keyword1 and keyword2 is path to a valid file.
+
+ Args:
+ instance_dict (dict): Dict of instance
+ file_ending (str): fileending of the wanted file
+ keyword1 (str): Keyword for first level in `instance_dict`
+ keyword2 (str): Keyword in second level in `instance_dict`
+
+ Returns:
+ dict: Dict of instance
+ """
+ self._find_and_rename_file(file_ending)
+ expected_path = f".{os.sep}{self.instance_name}.{file_ending}"
+ abs_expected_path = os.path.abspath(
+ os.path.join(self.instance_path, expected_path)
+ )
+ if os.path.isfile(abs_expected_path):
+ instance_dict[keyword1][keyword2] = expected_path
+ else:
+ # check if the path inserted in the field is actually valid
+ if os.path.isfile(
+ os.path.join(self.instance_path, instance_dict[keyword1][keyword2] if instance_dict[keyword1][keyword2] else '')
+ ):
+ return instance_dict
+ logger.warning(
+ f"Could not verify {file_ending} filepath for instance {self.instance_name}"
+ )
+ return instance_dict
+
+ def _repair_json_expectation(self, instance_dict: dict) -> dict:
+ """Corrects 'expectation:source_file', 'expectation:sink_file', 'expectation:source_line', 'expectation:sink_line'
+
+ Args:
+ instance_dict (dict): Dict of instance
+
+ Returns:
+ dict: Dict of instance
+ """
+ # get paths from the JSON file
+ path_to_source_file = instance_dict["expectation"]["source_file"]
+ abs_path_to_source_file = os.path.join(self.instance_path, path_to_source_file)
+ path_to_sink_file = instance_dict["expectation"]["sink_file"]
+ abs_path_to_sink_file = os.path.join(self.instance_path, path_to_sink_file)
+ path_to_php_file = instance_dict["code"]["path"]
+ abs_path_to_php_file = os.path.join(self.instance_path, path_to_php_file)
+
+ if not path_to_php_file or not os.path.isfile(abs_path_to_php_file):
+ logger.warning(f'Could not verify "expectation" for {self.instance_name}')
+ return instance_dict
+
+ if not os.path.isfile(abs_path_to_sink_file):
+ abs_path_to_sink_file = abs_path_to_php_file
+ path_to_sink_file = path_to_php_file
+ logger.info(f"Changing sink file path to {path_to_php_file}")
+ if not os.path.isfile(abs_path_to_source_file):
+ abs_path_to_source_file = abs_path_to_php_file
+ path_to_source_file = path_to_php_file
+ logger.info(f"Changing source file path to {path_to_php_file}")
+ source0, sink0 = self._get_source_and_sink_for_file(abs_path_to_sink_file)
+ source1, sink1 = self._get_source_and_sink_for_file(abs_path_to_source_file)
+
+ # set values in instance dict
+ instance_dict["expectation"]["source_file"] = path_to_source_file
+ instance_dict["expectation"]["source_line"] = source0 if source0 else source1
+ instance_dict["expectation"]["sink_file"] = path_to_sink_file
+ instance_dict["expectation"]["sink_line"] = sink0 if sink0 else sink1
+ if not (bool(source0) or bool(source1)):
+ logger.warning(f"Could not verify source files for {self.instance_name}")
+ if not (bool(sink0) or bool(sink1)):
+ logger.warning(f"Could not verify sink files for {self.instance_name}")
+ return instance_dict
+
+ def _repair_opcode(self):
+ """Generates opcode and checks if it is empty."""
+ # remove old bash files first, before generating new ones
+ all_bash_files = get_files_with_ending(self.instance_path, ".bash", recursive=True)
+ for bash_file in all_bash_files:
+ os.remove(bash_file)
+ bash_file_paths = PHPOpcodeGenerator(
+ self.instance_path, self.path_to_testability_patterns
+ ).generate_opcode_for_pattern_instance()
+ for bash_file_path in bash_file_paths:
+ if not bash_file_path or not os.stat(bash_file_path).st_size:
+ logger.warning(f"Bash file {bash_file_path} is empty")
+
+ def _repair_instance_json(self) -> None:
+ """Repairs JSON of instance"""
+ # make sure file exists and has all the right fields
+ super().repair_instance_json()
+ instance_dict = read_json(self.instance_json_file)
+ # make sure bash filepath is correct
+ instance_dict = self._repair_json_field_with_path(
+ instance_dict, "bash", "compile", "binary"
+ )
+ # make sure PHP filepath is correct
+ instance_dict = self._repair_json_field_with_path(
+ instance_dict, "php", "code", "path"
+ )
+ # make sure discovery filepath is correct
+ instance_dict = self._repair_json_field_with_path(
+ instance_dict, "sc", "discovery", "rule"
+ )
+ # make sure expectations is correct
+ instance_dict = self._repair_json_expectation(instance_dict)
+ write_json(self.instance_json_file, instance_dict)
+
+ def _repair_num_files(self) -> None:
+ """Checks how many php and bash files are there."""
+ all_bash_files = get_files_with_ending(self.instance_path, ".bash")
+ all_php_files = get_files_with_ending(self.instance_path, ".php")
+ if len(all_bash_files) != len(all_php_files):
+ logger.warning(
+ f"Expected same number of .bash and .php files, but got {len(all_php_files)} PHP files and {len(all_bash_files)} BASH files"
+ )
+
+ def repair(self):
+ super().repair_instance_json()
+ super().repair()
+ self._find_and_rename_file("php")
+ self._repair_opcode()
+ self._repair_instance_json()
diff --git a/tp_framework/pattern_repair/README_generator.py b/tp_framework/pattern_repair/README_generator.py
new file mode 100644
index 0000000..14ffa2e
--- /dev/null
+++ b/tp_framework/pattern_repair/README_generator.py
@@ -0,0 +1,155 @@
+import logging
+
+from os import path
+from pathlib import Path
+
+from pattern_repair.README_markdown_elements import *
+from pattern_repair.utils import (
+ read_json,
+ read_csv_to_dict,
+ read_file,
+ translate_bool,
+ get_instance_name,
+)
+from pattern_repair.README_instance_generator import InstanceREADMEGenerator
+
+from core.utils import check_lang_tp_lib_path, get_id_from_name
+from core import loggermgr
+
+logger = logging.getLogger(loggermgr.logger_name(__name__))
+
+
+class READMEGenerator:
+ def __init__(
+ self,
+ path_to_pattern: str,
+ language: str,
+ tp_lib_path: str,
+ instance_jsons: list[str],
+ discovery_rule_results: str = "",
+ measurement_results: str = "",
+ masking_file: str = "",
+ ) -> None:
+ check_lang_tp_lib_path(Path(path.join(tp_lib_path, language.upper())))
+
+ self.pattern_path = path_to_pattern
+ self.pattern_dict = read_json(
+ path.join(path_to_pattern, f"{path.basename(path_to_pattern)}.json")
+ )
+ self.language = language.upper()
+ self.log_prefix = "Generating README: "
+ self.discovery_rule_results = None
+ self.measurement_results = measurement_results
+ self.masking_file = masking_file
+ self.instance_jsons = instance_jsons
+
+ if not path.isfile(discovery_rule_results):
+ logger.warning(
+ f"{self.log_prefix}Cannot locate discoveryrule results in {self.discovery_rule_results}"
+ )
+ else:
+ self.discovery_rule_results = read_csv_to_dict(discovery_rule_results)
+
+ self.readme_structure = [
+ self._comment,
+ self._heading,
+ self._tags,
+ self._pattern_description,
+ self._pattern_metadata,
+ self._instances,
+ ]
+
+ def _comment(self) -> list:
+ """Generates a Comment for the top of the README file."""
+ return [
+ MarkdownComment(
+ "This file is automatically generated. If you wish to make any changes, please use the JSON files and regenerate this file using the tpframework."
+ )
+ ]
+
+ def _heading(self) -> list:
+ """Generates the heading for the README file."""
+ return [MarkdownHeading(self.pattern_dict["name"], 1)]
+
+ def _pattern_description(self) -> list:
+ """Generates the description for the pattern."""
+ desc = self.pattern_dict["description"]
+ if path.isfile(path.join(self.pattern_path, desc)):
+ desc = read_file(path.join(self.pattern_path, desc))
+ return [MarkdownHeading("Description", 2), MarkdownString(desc)]
+
+ def _tags(self) -> list:
+ """Generates pattern tags."""
+ return [
+ MarkdownString(f'Tags: {", ".join(self.pattern_dict["tags"])}'),
+ MarkdownString(f'Version: {self.pattern_dict["version"]}'),
+ ]
+
+ def _pattern_metadata(self) -> list:
+ """Generates a table of pattern metadata, such as the instances, discovery rule discovery method and if the discovery rule is successfull on the instance."""
+ discovery_rule_exists = []
+ instance_names = []
+ discovery_rule_successfull = []
+ discovery_method = []
+ for instance_path_json in self.instance_jsons:
+ instance_dict = read_json(instance_path_json)
+ instance_path = path.dirname(instance_path_json)
+
+ instance_name = get_instance_name(path.basename(instance_path))
+ instance_names += [
+ MarkdownLink(instance_name, MarkdownHeading(instance_name, 2))
+ ]
+
+ discovery_file = path.join(
+ instance_path, instance_dict["discovery"]["rule"]
+ )
+ discovery_rule_exists += [translate_bool(path.isfile(discovery_file))]
+
+ pattern_id, instance_id = get_id_from_name(
+ path.basename(self.pattern_path)
+ ), get_id_from_name(path.basename(instance_path))
+ if self.discovery_rule_results:
+ discovery_rule_successfull += ([self.discovery_rule_results[self.language][str(pattern_id)][str(instance_id)]]
+ if self.discovery_rule_results
+ else [""]
+ )
+ if not discovery_rule_successfull[-1]:
+ logger.warning(f'{self.log_prefix}Could not find discovery rule result for {instance_name}. Assuming "error"')
+ discovery_rule_successfull[-1] = "error"
+
+ discovery_method += [instance_dict["discovery"]["method"]]
+
+ metadata_dict = {
+ "0::Instances": instance_names,
+ "1::has discovery rule": discovery_rule_exists,
+ "2::discovery method": discovery_method,
+ "3::rule successfull": discovery_rule_successfull,
+ }
+ if not self.discovery_rule_results:
+ metadata_dict.pop("3::rule successfull")
+
+ return [MarkdownHeading("Overview", 2), MarkdownTable(metadata_dict)]
+
+ def _instances(self) -> list:
+ """Generates the README elements for all instances."""
+ return InstanceREADMEGenerator(
+ self.pattern_path,
+ self.language,
+ self.measurement_results,
+ self.instance_jsons,
+ masking_file=self.masking_file,
+ ).generate_md()
+
+ def _generate_README_elements(self) -> MarkdownDocument:
+ md_elements = []
+ for f in self.readme_structure:
+ md_elements += f()
+ return MarkdownDocument(md_elements)
+
+ def generate_README(self) -> str:
+ """Entrypoint for generating a README file for that pattern.
+
+ Returns:
+ str: The generated README file following `self.readme_structure`
+ """
+ return self._generate_README_elements().to_markdown()
diff --git a/tp_framework/pattern_repair/README_instance_generator.py b/tp_framework/pattern_repair/README_instance_generator.py
new file mode 100644
index 0000000..3e3dca8
--- /dev/null
+++ b/tp_framework/pattern_repair/README_instance_generator.py
@@ -0,0 +1,383 @@
+import logging
+import re
+from os import path
+from datetime import datetime
+
+from pattern_repair.utils import (
+ read_json,
+ get_dict_keys,
+ translate_bool,
+ get_language_by_file_ending,
+ get_instance_name,
+ get_files_with_ending,
+ read_file,
+)
+from pattern_repair.README_markdown_elements import *
+
+from core import loggermgr
+
+logger = logging.getLogger(loggermgr.logger_name(__name__))
+
+
+class InstanceREADMEGenerator:
+ def __init__(
+ self,
+ path_to_pattern: str,
+ language: str,
+ path_to_pattern_measurements: str,
+ instance_jsons: list[str],
+ level: int = 2,
+ masking_file: str = "mask.json",
+ ) -> None:
+ self.language = language.upper()
+ self.log_prefix = "Generating README: "
+ self.pattern_path = path_to_pattern
+ self.level = level
+ self.pattern_measurements = (
+ path_to_pattern_measurements if path_to_pattern_measurements else ""
+ )
+
+ self.instances_jsons = instance_jsons
+ self.has_multiple_instances = len(self.instances_jsons) > 1
+ self.instance_dicts = [read_json(i_path) for i_path in self.instances_jsons]
+
+ self.current_instance = None
+ self.current_instance_dict = None
+ self.current_instance_dict_keys = None
+
+ self.instance_structure = [
+ self._instance_name,
+ self._instance_description,
+ self._instance_code,
+ self._instance_properties,
+ self._instance_more,
+ ]
+ self.instance_more_structure = [
+ self._compile,
+ self._discovery,
+ self._measurement,
+ self._remediation,
+ ]
+
+ self.mask = {}
+ if masking_file and path.isfile(masking_file):
+ self.mask = read_json(masking_file)
+ elif masking_file:
+ logger.info(f"Could not file the provided masking file: {masking_file}")
+
+ def _instance_name(self) -> list:
+ """Generates the Markdown heading for the current instance."""
+ return [MarkdownHeading(get_instance_name(self.current_instance), self.level)]
+
+ def _instance_description(self) -> list:
+ """Generates the description for the current instance."""
+ desc = (
+ self.current_instance_dict["description"]
+ if "description" in self.current_instance_dict_keys
+ else ""
+ )
+ content = self._get_file_content_if_exists(desc, debug_name="description")
+ return [MarkdownString(content)] if content else []
+
+ def _instance_code(self) -> list:
+ """Generates the Instance code for the current instance."""
+ heading = MarkdownHeading("Code", self.level + 1)
+ code = (
+ self.current_instance_dict["code"]["path"]
+ if "code:path" in self.current_instance_dict_keys
+ else ""
+ )
+ source = (
+ self.current_instance_dict["expectation"]["source_file"]
+ if "expectation:source_file" in self.current_instance_dict_keys
+ else ""
+ )
+ sink = (
+ self.current_instance_dict["expectation"]["sink_file"]
+ if "expectation:sink_file" in self.current_instance_dict_keys
+ else ""
+ )
+ if source == sink:
+ content = self._get_file_content_if_exists(code, debug_name="code")
+ return [heading, MarkdownCode(content, self.language)] if content else []
+ source_content = self._get_file_content_if_exists(
+ source, debug_name="source_file"
+ )
+ sink_content = self._get_file_content_if_exists(sink, debug_name="sink_file")
+ return [
+ heading,
+ MarkdownHeading("Source File", self.level + 2),
+ MarkdownCode(source_content, self.language),
+ MarkdownHeading("Sink File", self.level + 2),
+ MarkdownCode(sink_content, self.language),
+ ]
+
+ def _instance_properties(self) -> list:
+ """Generates the table of instance properties."""
+ properties_dict = {
+ "category": [self.current_instance_dict["properties"]["category"]],
+ "feature_vs_internal_api": [
+ self.current_instance_dict["properties"]["feature_vs_internal_api"]
+ ],
+ "input_sanitizer": [
+ translate_bool(
+ self.current_instance_dict["properties"]["input_sanitizer"]
+ )
+ ],
+ "source_and_sink": [
+ translate_bool(
+ self.current_instance_dict["properties"]["source_and_sink"]
+ )
+ ],
+ "negative_test_case": [
+ translate_bool(
+ self.current_instance_dict["properties"]["negative_test_case"]
+ )
+ ],
+ }
+ return [
+ MarkdownHeading("Instance Properties", self.level + 1),
+ MarkdownTable(properties_dict),
+ ]
+
+ def _instance_more(self) -> list:
+ """Generates the 'more' section for an instance."""
+ ret = []
+ for f in self.instance_more_structure:
+ ret += f()
+ return [MarkdownCollapsible(ret, MarkdownString("More"))]
+
+ def _compile(self) -> list:
+ """Generates the compile section for an instance."""
+ compile = (
+ self.current_instance_dict["compile"]["binary"]
+ if "compile:binary" in self.current_instance_dict_keys
+ else ""
+ )
+ content = self._get_file_content_if_exists(compile, "compile")
+ binary = MarkdownCode(content, get_language_by_file_ending(compile))
+ return (
+ [MarkdownCollapsible([binary], MarkdownHeading("Compile", self.level + 1))]
+ if content
+ else []
+ )
+
+ def _discovery(self) -> list:
+ """Generates the 'discovery' section for an instance."""
+ desc = (
+ self.current_instance_dict["discovery"]["notes"]
+ if "discovery:notes" in self.current_instance_dict_keys
+ else ""
+ )
+ desc = MarkdownString(self._get_file_content_if_exists(desc, "discovery notes"))
+ rule_path = (
+ self.current_instance_dict["discovery"]["rule"]
+ if "discovery:rule" in self.current_instance_dict_keys
+ else ""
+ )
+ rule = self._get_file_content_if_exists(rule_path, "discovery rule")
+ # get only necessary content
+ rule = re.sub("@main def main\(name .*{$", "", rule, flags=re.M)
+ rule = re.sub("importCpg.*$", "", rule, flags=re.M)
+ rule = re.sub("println\(.*\)$", "", rule, flags=re.M)
+ rule = re.sub("delete;.*$", "", rule, flags=re.M)
+ rule = re.sub(".*}.*$", "", rule)
+ rule = "\n".join([l.strip() for l in rule.split("\n")])
+ rule = (
+ MarkdownCode(rule, get_language_by_file_ending(rule_path))
+ if rule_path
+ else MarkdownString("No discovery rule yet.")
+ )
+ discovery_table = {
+ "discovery method": [self.current_instance_dict["discovery"]["method"]],
+ "expected accuracy": [
+ self.current_instance_dict["discovery"]["rule_accuracy"]
+ ],
+ }
+ discovery_table = MarkdownTable(discovery_table)
+ return [
+ MarkdownCollapsible(
+ [desc, rule, discovery_table],
+ MarkdownHeading("Discovery", self.level + 1),
+ )
+ ]
+
+ def _measurement(self) -> list:
+ """Generates the 'measurement' section for an instance."""
+ if not path.isdir(self.pattern_measurements):
+ logger.warning(
+ f"{self.log_prefix}Could not generate measurement table, because {self.pattern_measurements} does not exist"
+ )
+ return []
+ instance_measurements = path.join(
+ self.pattern_measurements, path.basename(self.current_instance)
+ )
+ measurement_table = {}
+ has_measurement = False
+ dates = []
+ ground_truth = self.current_instance_dict["expectation"]["expectation"]
+ for json_file in get_files_with_ending(instance_measurements, ".json"):
+ current_json = read_json(json_file)
+ for c_dict in current_json:
+ has_measurement = True
+ tool = f'1::{self.mask[c_dict["tool"].lower()] if c_dict["tool"].lower() in self.mask.keys() else c_dict["tool"]}'
+ date = datetime.strptime(c_dict["date"], "%Y-%m-%d %H:%M:%S").strftime(
+ "%d %b %Y"
+ )
+ dates += [date]
+ sast_tool_result = translate_bool(not (c_dict["result"] ^ ground_truth))
+ try:
+ measurement_table[tool] += [(sast_tool_result, date)]
+ measurement_table[tool] = sorted(
+ measurement_table[tool],
+ key=lambda tup: datetime.strptime(tup[1], "%d %b %Y"),
+ )
+ except KeyError:
+ measurement_table[tool] = [(sast_tool_result, date)]
+ if not has_measurement:
+ return []
+ measurement_table, sorted_dates = self._format_measurements(
+ measurement_table, dates
+ )
+ measurement_table["0::Tool"] = sorted_dates
+ measurement_table["2::Ground Truth"] = [translate_bool(ground_truth)] * len(
+ sorted_dates
+ )
+ return [
+ MarkdownCollapsible(
+ [MarkdownTable(measurement_table)],
+ MarkdownHeading("Measurement", self.level + 1),
+ is_open=True,
+ )
+ ]
+
+ def _remediation(self) -> list:
+ """Generates the 'remediation' section for an instance."""
+ note = (
+ self.current_instance_dict["remediation"]["notes"]
+ if "remediation:notes" in self.current_instance_dict_keys
+ else ""
+ )
+ note = MarkdownString(
+ self._get_file_content_if_exists(note, "remediation note")
+ )
+ transformation = (
+ self.current_instance_dict["remediation"]["transformation"]
+ if "remediation:transformation" in self.current_instance_dict_keys
+ else ""
+ )
+ transformation = MarkdownString(
+ self._get_file_content_if_exists(transformation, "transformation")
+ )
+ modeling_rule = (
+ self.current_instance_dict["remediation"]["modeling_rule"]
+ if "remediation:modeling_rule" in self.current_instance_dict_keys
+ else ""
+ )
+ modeling_rule = MarkdownString(
+ self._get_file_content_if_exists(modeling_rule, "modeling rule")
+ )
+ if any([note, transformation, modeling_rule]):
+ note = [
+ note
+ if note
+ else MarkdownString(
+ "Can you think of a transformation, that makes this tarpit less challenging for SAST tools?"
+ )
+ ]
+ transformation = (
+ [MarkdownHeading("Transformation", self.level + 2), transformation]
+ if transformation
+ else []
+ )
+ modeling_rule = (
+ [MarkdownHeading("Modeling Rule", self.level + 2), modeling_rule]
+ if modeling_rule
+ else []
+ )
+ return [
+ MarkdownCollapsible(
+ note + transformation + modeling_rule,
+ MarkdownHeading("Remediation", self.level + 1),
+ )
+ ]
+ return []
+
+ def _get_file_content_if_exists(
+ self, path_to_file: str, debug_name: str = ""
+ ) -> str:
+ """If the `path_to_file` is a valid filepath within the current instance, this will return the content of that file.
+ Provide a `debug_name` if you want a unique logging warning.
+
+ Args:
+ path_to_file (str): path to a file within the current instance.
+ debug_name (str, optional): Name, that is used in the debug output. Defaults to ''.
+
+ Returns:
+ str: content of the file or empty string.
+ """
+ content = path_to_file if path_to_file else ""
+ if path.isfile(path.join(self.current_instance, content)):
+ content = read_file(path.join(self.current_instance, content))
+ if not content:
+ logger.warning(
+ f"{self.log_prefix}Could not find {debug_name} for instance {path.basename(self.current_instance)}"
+ )
+ return ""
+ return content
+
+ def _format_measurements(self, tool_measurement_dict: dict, dates: list) -> tuple:
+ """Formats the measurements in the wanted table format:
+
+ | | Tool1 | Tool2 |
+ |--------+--------+--------|
+ | Date1 | yes | no |
+
+ Args:
+ tool_measurement_dict (dict): dict containing measurement results and date as a list of tuple for each tool.
+ dates (list): a list of measurement dates.
+
+ Returns:
+ tuple(dict, list): dict of all tools and their measurement results (one column) and a list of sorted measurement dates (first column)
+ """
+ dates_sorted = sorted(list(set(dates)))
+ formatted_measurement_table = {}
+ for tool, measurements in tool_measurement_dict.items():
+ formatted_measurements = []
+ current_measurement = measurements.pop(0)
+ for date in dates_sorted:
+ if current_measurement[1] == date:
+ formatted_measurements += [current_measurement[0]]
+ if len(measurements):
+ current_measurement = measurements.pop(0)
+ else:
+ break
+ else:
+ formatted_measurements += [""]
+ formatted_measurement_table[tool] = formatted_measurements
+ return formatted_measurement_table, dates_sorted
+
+ def generate_md(self) -> list:
+ """Entrypoint for generating Markdown for an instance,
+
+ Returns:
+ list: a list of Markdown elements following the structure in `self.instance_structure`
+ """
+ ret = []
+ for idx, _ in enumerate(self.instances_jsons):
+ self.current_instance = path.dirname(self.instances_jsons[idx])
+ self.current_instance_dict = self.instance_dicts[idx]
+ self.current_instance_dict_keys = get_dict_keys(self.current_instance_dict)
+
+ instance_md_elements = []
+ for f in self.instance_structure:
+ instance_md_elements += f()
+ if self.has_multiple_instances:
+ ret += [
+ MarkdownCollapsible(
+ instance_md_elements[1:], instance_md_elements[0], idx == 0
+ )
+ ]
+ else:
+ ret = instance_md_elements
+ return ret
diff --git a/tp_framework/pattern_repair/README_markdown_elements.py b/tp_framework/pattern_repair/README_markdown_elements.py
new file mode 100644
index 0000000..260cc7b
--- /dev/null
+++ b/tp_framework/pattern_repair/README_markdown_elements.py
@@ -0,0 +1,186 @@
+from tabulate import tabulate
+
+
+class MarkdownElement:
+ """Super class for all MarkdownElements used within generating README files for a testability pattern."""
+
+ def __init__(self, content: str):
+ self.content = content.strip()
+
+ def linkable(self) -> str:
+ """Makes it possible for a markdown Element to be used within a link.
+
+ Returns:
+ str: a string representation, that can be used in a markdown link.
+ """
+ raise NotImplementedError
+
+ def to_markdown(self):
+ raise NotImplementedError
+
+ def strip(self):
+ return self.to_markdown().strip()
+
+ def __bool__(self):
+ return bool(self.content)
+
+
+class MarkdownCode(MarkdownElement):
+ """A markdown code block.
+ Syntax:
+
+ ```
+ self.content
+ ```
+
+ """
+
+ def __init__(self, content, code_type):
+ super().__init__(content)
+ self.code_type = code_type
+
+ def to_markdown(self) -> str:
+ return f"\n```{self.code_type.lower()}\n{self.content}\n```\n"
+
+
+class MarkdownComment(MarkdownElement):
+ """A markdown comment
+ Syntax:
+
+ [//]: # ()
+
+ """
+
+ def to_markdown(self):
+ self.content = self.content.replace("\\r\\n", " ")
+ return f"\n[//]: # ({self.content})\n"
+
+
+class MarkdownHeading(MarkdownElement):
+ """A markdown heading, `self.level` indicates the number of '#'
+ Syntax example:
+
+ #
+
+ """
+
+ def __init__(self, content, level: int):
+ super().__init__(content)
+ self.level = int(level)
+ assert self.level >= 1
+
+ def to_markdown(self) -> str:
+ return f'\n{"#" * self.level} {self.content}\n'
+
+ def linkable(self) -> str:
+ return f'#{self.content.replace(" " , "-").lower()}'
+
+
+class MarkdownCollapsible(MarkdownElement):
+ """A markdown collapsible element.
+ Syntax example:
+
+
+
+
+
+
+
+
+ """
+
+ def __init__(self, content: list, heading: MarkdownElement, is_open: bool = False):
+ self.content = content
+ self.is_open = is_open
+ self.heading = heading
+
+ def to_markdown(self) -> str:
+ final = f'\n'
+ heading = (
+ self.heading.to_markdown().strip()
+ if not isinstance(self.heading, MarkdownHeading)
+ else self.heading.to_markdown()
+ )
+ final += f"\n\n{heading}
\n\n"
+ for element in self.content:
+ final += element.to_markdown()
+ final += f"\n \n"
+ return final
+
+
+class MarkdownString(MarkdownElement):
+ """Representation of a String, it is surrounded by newlines."""
+
+ def to_markdown(self) -> str:
+ return f"\n{self.content}\n"
+
+
+class MarkdownLink(MarkdownElement):
+ """A markdown link.
+ Syntax:
+
+ [self.content](self.link)
+
+ """
+
+ def __init__(self, content: str | MarkdownElement, link: MarkdownElement):
+ super().__init__(content)
+ assert isinstance(
+ link, MarkdownElement
+ ), "The link of a MarkdownLink must be a MarkdownElement."
+ self.link = link.linkable()
+
+ def to_markdown(self):
+ return f"[{self.content.strip()}]({self.link.strip()})"
+
+
+class MarkdownTable(MarkdownElement):
+ """A markdown table
+ Syntax:
+
+ | | |
+ |---|---|
+ | | |
+
+ The content must be provided as a dict, where the value for each key is a list.
+ The key will be the header and the list contains values for that column.
+ Columns will be sorted alphabetically, if you wish to sort columns yourself you can prefix them using ::.
+ """
+
+ def __init__(self, content: dict):
+ assert isinstance(
+ content, dict
+ ), "content for Markdown table must be provided as dict"
+ assert all(
+ [isinstance(v, list) for v in content.values()]
+ ), "content for Markdowntable must have lists as values"
+ self.headings = sorted(content.keys(), key=lambda x: x.lower())
+ num_rows = max([len(v) for v in content.values()])
+ self.lines = [
+ [None for _ in range(len(self.headings))] for _ in range(num_rows)
+ ]
+ for column_idx, key in enumerate(self.headings):
+ for row_index, v in enumerate(content[key]):
+ self.lines[row_index][column_idx] = v.strip() if v else ""
+
+ def to_markdown(self):
+ return f'\n{tabulate(self.lines, [h.split("::")[-1] if "::" in h else h for h in self.headings], "github")}\n'
+
+
+class MarkdownDocument(MarkdownElement):
+ """A central point, where all markdown elements are collected into one single markdown document."""
+
+ def __init__(self, content: list) -> None:
+ self.content = content
+
+ def to_markdown(self) -> str:
+ final = ""
+ for element in self.content:
+ assert isinstance(element, MarkdownElement)
+ final += element.to_markdown()
+ import re
+
+ final = re.sub("\n\n\n*", "\n\n", final)
+ return (
+ f"{final.strip()}\n" # GitHub markdown likes a newline at the end of files
+ )
diff --git a/tp_framework/pattern_repair/__init__.py b/tp_framework/pattern_repair/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tp_framework/pattern_repair/instance_repair.py b/tp_framework/pattern_repair/instance_repair.py
new file mode 100644
index 0000000..98ccb8f
--- /dev/null
+++ b/tp_framework/pattern_repair/instance_repair.py
@@ -0,0 +1,220 @@
+import logging
+import os
+import re
+import shutil
+
+from pattern_repair.utils import (
+ assert_pattern_valid,
+ repair_keys_of_json,
+ read_json,
+ write_json,
+ list_instances_jsons,
+ INSTANCE_JSON_NOT_MANDATORY_KEYS,
+ get_template_instance_json_path,
+ get_template_instance_discovery_rule_path,
+ get_files_with_ending
+)
+
+from core.utils import get_id_from_name
+
+from core import loggermgr
+
+logger = logging.getLogger(loggermgr.logger_name(__name__))
+
+
+class InstanceRepair:
+ """Super class for all language specific `InstanceRepair`."""
+
+ def __init__(
+ self,
+ language: str,
+ path_to_pattern: str,
+ instance_json_path: str,
+ path_to_tp_lib: str,
+ ) -> None:
+ assert_pattern_valid(path_to_pattern)
+
+ self.language = language
+ self.pattern_path = path_to_pattern
+ self.pattern_name = os.path.basename(self.pattern_path)
+ self.pattern_id = get_id_from_name(self.pattern_name)
+ self.instance_path = os.path.dirname(instance_json_path)
+ self.instance_name = os.path.basename(self.instance_path)
+ self.instance_json_file = instance_json_path
+ self.path_to_testability_patterns = path_to_tp_lib
+
+ def _adjust_variable_number_in_discovery_rule(
+ self, path_to_discovery_file: str
+ ) -> None:
+ """Adjusts the scala discovery file.
+
+ Args:
+ path_to_discovery_file (str): path to discovery file
+ """
+ pattern_number = int(os.path.basename(self.pattern_path).split("_")[0])
+ with open(path_to_discovery_file, "r") as fp:
+ result = fp.readlines()
+
+ # assume, that a scala files end with
+ # println()
+ # delete;
+ try:
+ println_line = result[
+ result.index(list(filter(lambda line: "delete;" in line, result))[0])
+ - 1
+ ]
+ except IndexError:
+ logger.warning(
+ f'Could not find "delete;" in {os.path.relpath(path_to_discovery_file, self.instance_path)}'
+ )
+ return
+ try:
+ real_number = re.search(r"println\(x(\d+)\)", println_line).group(1)
+ except AttributeError:
+ logger.warning(
+ f"Could not find the pattern number in {os.path.relpath(path_to_discovery_file, self.instance_path)}"
+ )
+ return
+ # determine the name for the rule in scala file
+ # if there is more than one instance, it should be _i
+ # if this rule is for multiple patterns, it should be _iall
+ rule_name = (
+ f'{self.pattern_name.lower()}_i{self.instance_name.split("_")[0]}'
+ if len(list_instances_jsons(self.pattern_path)) > 1
+ and os.path.abspath(os.path.dirname(path_to_discovery_file))
+ != os.path.abspath(self.pattern_path)
+ else f"{self.pattern_name}_iall"
+ )
+ # make sure the number and the pattern name
+ new_rule = []
+ for line in result:
+ new_line = line.replace(f"x{real_number}", f"x{pattern_number}")
+ new_rule += [
+ re.sub(
+ f"({self.pattern_name}_i(\d+|all)|ID_pattern_name_i1)",
+ rule_name,
+ new_line,
+ )
+ ]
+
+ diff = [line for line in new_rule if line not in result]
+ if diff:
+ logger.info(
+ f"Changed lines in Scala rule for instance {self.instance_name}:\n{[line.strip() for line in diff]}"
+ )
+ with open(path_to_discovery_file, "w") as fp:
+ fp.writelines(new_rule)
+
+ def _check_rule_accuracy(self):
+ """Checks that there is a rule accuracy given if there is a rule given"""
+ instance_dict = read_json(self.instance_json_file)
+ if (
+ instance_dict["discovery"]["rule"]
+ and not instance_dict["discovery"]["rule_accuracy"]
+ ):
+ logger.warning(
+ f"There is a rule, but no rule accuracy given for {self.instance_name}"
+ )
+
+ def _find_and_rename_file(self, file_ending: str):
+ """Checks if there is already an existing file with the expected name '_instance_.'.
+ If not, it gets all files with that fileending in the instance directory. If there is only one, and it is in the instance_path,
+ it will be renamed into the expected filename.
+
+ Args:
+ file_ending (str): Ending of the files (without the `.` e.g. `txt`)
+ """
+ expected_abs_filepath = os.path.join(self.instance_path, f"{self.instance_name}.{file_ending}")
+ if os.path.isfile(expected_abs_filepath):
+ return
+ # list all files with that fileending in the instance
+ files_with_this_ending = get_files_with_ending(self.instance_path, f".{file_ending}", recursive=True)
+ if len(files_with_this_ending) == 1 and os.path.exists(os.path.join(self.instance_path, os.path.basename(files_with_this_ending[0]))):
+ # There is only one file with the file ending in the instance_path directory
+ os.rename(files_with_this_ending[0], expected_abs_filepath)
+ if files_with_this_ending[0] != expected_abs_filepath:
+ logger.info(f"Renamed file from {files_with_this_ending[0]} to {expected_abs_filepath}")
+
+ def _repair_description(self) -> None:
+ """Checks if 'description' is given in an instance dict, removes the key, when it is empty."""
+ instance_dict = read_json(self.instance_json_file)
+ if "description" not in instance_dict.keys():
+ logger.warning(
+ f"Instance description for {self.instance_name} does not exist."
+ )
+ return
+ if not instance_dict["description"]:
+ instance_dict.pop("description")
+ logger.warning(
+ f"Instance description for {self.instance_name} is empty, deleting it."
+ )
+ write_json(self.instance_json_file, instance_dict)
+
+ def _repair_discovery_rule(self) -> None:
+ """Repairs the discovery rule of a pattern instance"""
+ self._find_and_rename_file("sc")
+ instance_dict = read_json(self.instance_json_file)
+ path_to_discovery_rule = os.path.join(
+ self.instance_path, f"{self.instance_name}.sc"
+ )
+ expected_file = (
+ f".{os.sep}{os.path.relpath(path_to_discovery_rule, self.instance_path)}"
+ )
+ real = (
+ instance_dict["discovery"]["rule"]
+ if instance_dict["discovery"]["rule"]
+ else ""
+ )
+ real_path = os.path.join(self.instance_path, real)
+ # check if there is already a path to a discovery rule given, and if this path is valid
+ if os.path.isfile(real_path):
+ if expected_file == real:
+ # the file path is correct, just check the structure of the file
+ self._repair_discovery_rule_structure(real_path)
+ return
+ else:
+ self._repair_discovery_rule_structure(real_path)
+ return
+ # given value is not a real file, so check if there is nevertheless a discovery rule with the expected name
+ if not os.path.isfile(path_to_discovery_rule):
+ logger.info(
+ f"Could not find discovery rule for {self.instance_name}, added sc file"
+ )
+ logger.warning(f"Please adjust discovery rule of {self.instance_name}")
+ shutil.copy(
+ get_template_instance_discovery_rule_path(
+ self.path_to_testability_patterns
+ ),
+ path_to_discovery_rule,
+ )
+ # adapt scala file
+ self._repair_discovery_rule_structure(path_to_discovery_rule)
+ # adapt JSON file
+ instance_dict["discovery"]["rule"] = expected_file
+ write_json(self.instance_json_file, instance_dict)
+
+ def _repair_discovery_rule_structure(self, path_to_discovery_file: str) -> None:
+ self._adjust_variable_number_in_discovery_rule(path_to_discovery_file)
+ self._check_rule_accuracy()
+
+ def repair_instance_json(self) -> None:
+ """Repairs the instance JSON of the pattern.
+ Meaning, it makes sure that the JSON file is there,
+ has all necessary keys and the description points to a markdown file containing the description."""
+ if not os.path.isfile(self.instance_json_file):
+ logger.info(
+ f"Could not find instance JSON for {self.instance_name}, copying template"
+ )
+ shutil.copy(
+ get_template_instance_json_path(self.path_to_testability_patterns),
+ self.instance_json_file,
+ )
+ repair_keys_of_json(
+ self.instance_json_file,
+ get_template_instance_json_path(self.path_to_testability_patterns),
+ INSTANCE_JSON_NOT_MANDATORY_KEYS,
+ )
+ self._repair_description()
+
+ def repair(self) -> str:
+ self._repair_discovery_rule()
diff --git a/tp_framework/pattern_repair/pattern_repair.py b/tp_framework/pattern_repair/pattern_repair.py
new file mode 100644
index 0000000..941aede
--- /dev/null
+++ b/tp_framework/pattern_repair/pattern_repair.py
@@ -0,0 +1,180 @@
+import os
+import shutil
+import logging
+
+from copy import deepcopy
+from pathlib import Path
+
+from pattern_repair.utils import (
+ assert_pattern_valid,
+ repair_keys_of_json,
+ get_template_pattern_json_path,
+ read_json,
+ write_json,
+ compare_dicts,
+ get_files_with_ending,
+ list_instances_jsons,
+)
+from pattern_repair.README_generator import READMEGenerator
+# This import is needed, because otherwise it would not be possible to get the instance repair class for a certain language
+from pattern_repair.PHP.instance_repair_php import InstanceRepairPHP
+
+from core.utils import check_lang_tp_lib_path, get_id_from_name
+from core import loggermgr
+
+logger = logging.getLogger(loggermgr.logger_name(__name__))
+
+
+class PatternRepair:
+ def __init__(
+ self,
+ path_to_pattern: Path,
+ language: str,
+ tp_lib_path: Path,
+ discovery_rule_results: str = "",
+ masking_file: str = "",
+ all_measurement_results: str = "",
+ ) -> None:
+ check_lang_tp_lib_path(Path(os.path.join(tp_lib_path, language.upper())))
+ assert_pattern_valid(path_to_pattern)
+
+ # user defined constants
+ self.pattern_path = path_to_pattern
+ self.pattern_name = os.path.basename(self.pattern_path)
+ self.pattern_id = get_id_from_name(self.pattern_name)
+ self.language = language
+ self.pattern_json_file = None
+ self.discovery_rule_results = discovery_rule_results
+ self.masking_file = masking_file
+ self.all_measurement_results = all_measurement_results
+ self.tp_lib_path = tp_lib_path
+
+ # get repair for specific language
+ try:
+ self.instance_repair_class = globals()[f"InstanceRepair{language.upper()}"]
+ except KeyError:
+ logger.error(
+ f"InstanceRepair{language.upper()} could not be found, maybe it is not imported?"
+ )
+ exit(1)
+
+ def _find_instances_json(self) -> list:
+ """Gets all pattern instance jsons as relative paths
+
+ Returns:
+ list: list of relative paths to JSON files.
+ """
+ #
+ pattern_instances = list_instances_jsons(self.pattern_path)
+ if not pattern_instances:
+ return []
+ # get the relative path for instances
+ pattern_instances_rel_path = [
+ f".{os.sep}{os.path.relpath(str(pattern_instance_path), self.pattern_path)}"
+ for pattern_instance_path in pattern_instances
+ ]
+ return pattern_instances_rel_path
+
+ def _repair_documentation(self) -> None:
+ """Makes sure, the pattern description is in a `./docs/description.md` and the field in the JSON file points to that markdown file."""
+ # make sure ./docs/description.md exists
+ docs_directory = os.path.join(self.pattern_path, "docs")
+ description_file_path = os.path.join(docs_directory, "description.md")
+ os.makedirs(docs_directory, exist_ok=True)
+ open(description_file_path, "a").close()
+
+ # check out the "description" field in the pattern JSON file
+ json_dict = read_json(self.pattern_json_file)
+ description_in_json = json_dict["description"]
+ rel_path_to_description = (
+ f".{os.sep}{os.path.relpath(description_file_path, self.pattern_path)}"
+ )
+ if rel_path_to_description == description_in_json:
+ # the description_in_json is already the right path
+ if not os.stat(description_file_path).st_size:
+ logger.info(f"Description for {self.pattern_name} is missing")
+ return
+
+ # set the description field point to ./docs/description.md
+ json_dict["description"] = rel_path_to_description
+ original_description = []
+ with open(description_file_path, "r") as fp:
+ original_description = fp.readlines()
+ original_description += [description_in_json]
+ with open(description_file_path, "w") as fp:
+ fp.write("\n".join(original_description))
+ write_json(self.pattern_json_file, json_dict)
+ if description_in_json:
+ logger.info(f"Changed {description_file_path} in pattern JSON")
+ else:
+ logger.info(f"Description for Pattern {self.pattern_name} is missing")
+
+ def _repair_instances(self) -> None:
+ """Repairs instances of that pattern, using the instance repair class as well."""
+ all_instances = list_instances_jsons(self.pattern_path)
+ if not all_instances:
+ logger.error(f"Pattern {self.pattern_name} has no instances")
+ exit(1)
+ for instance_json in all_instances:
+ self.instance_repair_class(
+ self.language, self.pattern_path, instance_json, self.tp_lib_path
+ ).repair()
+
+ def _repair_pattern_json(self) -> None:
+ """Repairs the JSON file of the pattern"""
+ # check if pattern json file exists, if not copy the template
+ pattern_json = os.path.join(self.pattern_path, f"{self.pattern_name}.json")
+ self.pattern_json_file = pattern_json
+ if not os.path.isfile(pattern_json):
+ logger.info("Could not find Pattern JSON, copying the template")
+ shutil.copy(get_template_pattern_json_path(self.tp_lib_path), pattern_json)
+ repair_keys_of_json(
+ self.pattern_json_file, get_template_pattern_json_path(self.tp_lib_path)
+ )
+
+ # get the content of the pattern json
+ pattern_dict = read_json(pattern_json)
+
+ # adapt the fields (name, family, tags, instances) of the pattern_dict for the fields
+ new_pattern_dict = deepcopy(pattern_dict)
+ new_pattern_dict["name"] = " ".join(self.pattern_name.split("_")[1:]).title()
+ new_pattern_dict["family"] = f"code_pattern_{self.language.lower()}"
+ if "LANG" in new_pattern_dict["tags"]:
+ new_pattern_dict["tags"] = ["sast", self.language.lower()]
+ new_pattern_dict["instances"] = self._find_instances_json()
+ new_pattern_dict["version"] = (
+ new_pattern_dict["version"] if new_pattern_dict["version"] else "v0.draft"
+ )
+
+ # compare with original dict and if something has changed write the new dict to file
+ dict_diff = compare_dicts(pattern_dict, new_pattern_dict)
+ if dict_diff:
+ write_json(pattern_json, new_pattern_dict)
+ self._repair_documentation()
+
+ def _repair_pattern_README(self) -> None:
+ """Repairs the README file of the pattern"""
+ all_md_files = get_files_with_ending(self.pattern_path, ".md")
+ if len(all_md_files) == 1:
+ os.rename(all_md_files[0], os.path.join(self.pattern_path, "README.md"))
+ pattern_measurement = os.path.join(
+ self.all_measurement_results, self.pattern_name
+ )
+ instance_jsons = list_instances_jsons(self.pattern_path)
+ new_readme = READMEGenerator(
+ self.pattern_path,
+ self.language,
+ self.tp_lib_path,
+ instance_jsons,
+ self.discovery_rule_results,
+ pattern_measurement,
+ self.masking_file,
+ ).generate_README()
+ with open(os.path.join(self.pattern_path, "README.md"), "w") as file:
+ file.write(new_readme)
+
+ def repair(self, should_include_readme: bool = True):
+ self._repair_pattern_json()
+ self._repair_instances()
+ if should_include_readme:
+ self._repair_pattern_README()
diff --git a/tp_framework/pattern_repair/pattern_repair_interface.py b/tp_framework/pattern_repair/pattern_repair_interface.py
new file mode 100644
index 0000000..1660432
--- /dev/null
+++ b/tp_framework/pattern_repair/pattern_repair_interface.py
@@ -0,0 +1,48 @@
+from pathlib import Path
+
+from core import utils
+from core.pattern import get_pattern_path_by_pattern_id
+from pattern_repair.pattern_repair import PatternRepair
+
+
+def repair_patterns(
+ language: str,
+ pattern_ids: list[int],
+ include_README: bool,
+ checkdiscoveryrule_results: Path,
+ measurement_results: Path,
+ masking_file: Path,
+ tp_lib_path: Path,
+ output_dir: Path,
+) -> None:
+ """Interface, that starts a pattern repair
+
+ Args:
+ language (str): language of the targetted patterns
+ pattern_ids (list[int]): list of pattern ids
+ checkdiscoveryrule_results (Path): results of `checkdiscoveryrules` run with tp-framework, for all patterns to repair
+ measurement_results (Path): results of `measure` run with tp-framework, for all patterns to repair
+ masking_file (Path): file that can be used to Mask the name of tools, if they should be kept secret
+ tp_lib_path (Path): Path to tesability pattern library
+ output_dir (Path): Output dir for any written data
+ """
+ print("Pattern Repair started...")
+ should_include_readme = not include_README
+ utils.check_tp_lib(tp_lib_path)
+ if should_include_readme:
+ utils.check_file_exist(checkdiscoveryrule_results)
+ utils.check_file_exist(masking_file, ".json") if masking_file else None
+ utils.check_measurement_results_exist(measurement_results)
+ output_dir.mkdir(exist_ok=True, parents=True)
+ utils.add_loggers(output_dir)
+
+ for pattern_id in pattern_ids:
+ pattern_path = get_pattern_path_by_pattern_id(language, pattern_id, tp_lib_path)
+ PatternRepair(
+ pattern_path,
+ language,
+ tp_lib_path,
+ checkdiscoveryrule_results,
+ masking_file,
+ measurement_results,
+ ).repair(should_include_readme)
diff --git a/tp_framework/pattern_repair/utils.py b/tp_framework/pattern_repair/utils.py
new file mode 100644
index 0000000..dd29130
--- /dev/null
+++ b/tp_framework/pattern_repair/utils.py
@@ -0,0 +1,298 @@
+import csv
+import json
+import logging
+
+from collections import defaultdict
+from os import path, listdir, walk
+from pathlib import Path
+
+from core.errors import templateDirDoesNotExist
+from core.exceptions import TemplateDoesNotExist, PatternDoesNotExists, FileDoesNotExist
+from core.utils import get_exception_message
+from core import loggermgr
+
+logger = logging.getLogger(loggermgr.logger_name(__name__))
+
+INSTANCE_JSON_NOT_MANDATORY_KEYS = ["description", "reporting"]
+
+
+def assert_pattern_valid(path_to_pattern: Path) -> None:
+ """Asserts that a pattern is a valid directory
+
+ Args:
+ path_to_pattern (Path): absolute path to a pattern
+
+ Raises:
+ e: PatternDoesNotExists error, when pattern does not exist.
+ """
+ if not Path(path_to_pattern).is_dir():
+ e = PatternDoesNotExists(path.basename(path_to_pattern))
+ logger.error(get_exception_message(e))
+ raise e
+
+
+def compare_dicts(old_dict, new_dict) -> dict:
+ return {
+ k: old_dict[k] for k in old_dict if k in new_dict and old_dict[k] != new_dict[k]
+ }
+
+
+def get_dict_keys(d: dict) -> list:
+ """Returns a list of keys in a multidimensional dict.
+ The keynames are seperated by `:` i.e. `level1_key:level2_key`
+
+ Args:
+ d (dict): a multidimensional dict
+
+ Returns:
+ list: all keys from all dict level
+ """
+ all_keys = []
+ current_keys = d.keys()
+ for k in current_keys:
+ if isinstance(d[k], dict):
+ sub_keys = get_dict_keys(d[k])
+ all_keys += [f"{k}:{sk}" for sk in sub_keys]
+ else:
+ all_keys += [k]
+ return all_keys
+
+
+def get_instance_name(path_to_instance) -> str:
+ return " ".join(path.basename(path_to_instance).split("_")[:2]).title()
+
+
+def get_files_with_ending(
+ path_to_dir: str, file_ending: str, recursive: bool = False
+ ) -> list:
+ """Returns all files with a certain ending. Be sure to include the `.` when passing the `file_ending` argument, i.e. `file_ending='.txt'`.
+
+ Args:
+ path_to_dir (str): Directories from which the files should be listed.
+ file_ending (str): The ending of the files, that should be filtered for.
+ recursive (bool, optional): Should the algorithm go through the directory recursivly?. Defaults to False.
+
+ Returns:
+ list: all filepaths, to files in the directory, having the `file_ending`.
+ """
+ matches = []
+ for root, _, filenames in walk(path_to_dir):
+ for filename in filter(lambda f: f.endswith(file_ending), filenames):
+ matches.append(path.join(root, filename))
+ return (
+ matches
+ if recursive
+ else sorted(
+ [
+ path.join(path_to_dir, f)
+ for f in filter(
+ lambda filename: Path(filename).suffix == file_ending,
+ listdir(path_to_dir),
+ )
+ ]
+ )
+ )
+
+
+def get_template_dir_path(tp_lib_path) -> str:
+ template_path = path.join(tp_lib_path, "pattern_template", "ID_pattern_name")
+ if not path.isdir(template_path):
+ e = TemplateDoesNotExist(templateDirDoesNotExist(template_path))
+ logger.error(get_exception_message(e))
+ raise e
+ return template_path
+
+
+def get_template_pattern_json_path(tp_lib_path) -> str:
+ template__pattern_json_path = path.join(
+ get_template_dir_path(tp_lib_path), "ID_pattern_name.json"
+ )
+ if not path.isfile(template__pattern_json_path):
+ e = TemplateDoesNotExist(templateDirDoesNotExist(template__pattern_json_path))
+ logger.error(get_exception_message(e))
+ raise e
+ return template__pattern_json_path
+
+
+def get_template_instance_path(tp_lib_path) -> str:
+ template_instance_path = path.join(
+ get_template_dir_path(tp_lib_path), "IID_instance_ID_pattern_name"
+ )
+ if not path.isdir(template_instance_path):
+ e = TemplateDoesNotExist(templateDirDoesNotExist(template_instance_path))
+ logger.error(get_exception_message(e))
+ raise e
+ return template_instance_path
+
+
+def get_template_instance_json_path(tp_lib_path) -> str:
+ template_instance_json_path = path.join(
+ get_template_instance_path(tp_lib_path), "IID_instance_ID_pattern_name.json"
+ )
+ if not path.isfile(template_instance_json_path):
+ e = TemplateDoesNotExist(templateDirDoesNotExist(template_instance_json_path))
+ logger.error(get_exception_message(e))
+ raise e
+ return template_instance_json_path
+
+
+def get_template_instance_discovery_rule_path(tp_lib_path) -> str:
+ template_instance_discovery_rule_path = path.join(
+ get_template_instance_path(tp_lib_path), "pattern_discovery_rule.sc"
+ )
+ if not path.isfile(template_instance_discovery_rule_path):
+ e = TemplateDoesNotExist(
+ templateDirDoesNotExist(template_instance_discovery_rule_path)
+ )
+ logger.error(get_exception_message(e))
+ raise e
+ return template_instance_discovery_rule_path
+
+
+def get_language_by_file_ending(filename: str) -> str:
+ """Returns the language, by simply looking at the suffix of the file
+
+ Args:
+ filename (str): name of a file
+
+ Raises:
+ NotImplementedError: if the suffix is not yet supported, the function raises a NotImplementedError.
+
+ Returns:
+ str: language
+ """
+ if Path(filename).suffix == ".py":
+ return "python"
+ if Path(filename).suffix == ".php":
+ return "php"
+ if Path(filename).suffix == ".js":
+ return "javascript"
+ if Path(filename).suffix == ".java":
+ return "java"
+ if Path(filename).suffix == ".sc":
+ return "scala"
+ if Path(filename).suffix == ".bash":
+ return "bash"
+ raise NotImplementedError(
+ f"The ending of the given filename {filename} is not yet supported"
+ )
+
+
+def list_directories(path_to_parent_directory: str):
+ return list(filter(lambda x: path.isdir(x), [path.join(path_to_parent_directory, f) for f in listdir(path_to_parent_directory)]))
+
+
+def list_instances_jsons(path_to_pattern: str | Path):
+ return [
+ path.join(instance, f"{path.basename(instance)}.json")
+ for instance in filter(
+ lambda x: path.isdir(x) and path.basename(x)[0].isdigit(),
+ list_directories(path_to_pattern),
+ )
+ ]
+
+
+def read_json(path_to_json: str) -> dict:
+ result = {}
+ try:
+ with open(path_to_json, "r") as json_file:
+ result = json.load(json_file)
+ except json.JSONDecodeError as err:
+ raise Exception(f"JSON is corrupt, please check {path_to_json}") from err
+ if not result:
+ logger.error(f"JSON file is empty")
+ return result
+
+
+def read_file(path_to_file: str) -> str:
+ try:
+ with open(path_to_file, "r") as file:
+ ret = file.read()
+ except Exception:
+ e = FileDoesNotExist(
+ f"The file {path_to_file} you wanted to read does not exist or is corrupt. Cannot read the file."
+ )
+ logger.error(get_exception_message(e))
+ raise e
+ return ret
+
+
+def read_csv_to_dict(path_to_file: str) -> dict:
+ """Reads a csv file into a dictionary, the csv file must contain the columns 'pattern_id', 'instance_id', 'language', 'successful'
+ The dict will have the form:
+ {: {: {: }}}
+
+ Args:
+ path_to_file (str): path to csv file (with discovery rule results)
+
+ Returns:
+ dict: defaultdict of dicts
+ """
+ res = []
+ with open(path_to_file, "r") as csvfile:
+ r = csv.reader(csvfile, delimiter=",")
+ headings = next(r)
+ wanted_columns = ["pattern_id", "instance_id", "language", "successful"]
+ wanted_idx = [headings.index(w) for w in wanted_columns]
+ assert len(wanted_idx) == len(
+ wanted_columns
+ ), f"Could not find wanted column names in csv {path_to_file}"
+ res = [[line[i] for i in wanted_idx] for line in r]
+ ret = defaultdict(lambda: defaultdict(lambda: defaultdict(dict)))
+ for line in res:
+ ret[line[2]][line[0]][line[1]] = line[3]
+ return ret
+
+
+def repair_dict_keys(
+ tested_dict: dict, ground_truth_dict: dict, not_mandatory_keys: list = []
+) -> None:
+ """Modifies `tested_dict` and inserts all keys from `ground_truth_dict`, that are not in `tested_dict`, except they are in `not_mandatory_keys`.
+
+ Args:
+ tested_dict (dict): Dict that has potentially missing keys.
+ ground_truth_dict (dict): Dict that has all necessary keys
+ not_mandatory_keys (list, optional): list of keys in `ground_truth_dict` that are not mandatory. Defaults to [].
+ """
+ tested_keys = set(tested_dict.keys())
+ ground_truth_keys = set(ground_truth_dict.keys())
+
+ common_keys = set.intersection(tested_keys, ground_truth_keys)
+ for k in common_keys:
+ if isinstance(tested_dict[k], dict) and isinstance(ground_truth_dict[k], dict):
+ repair_dict_keys(tested_dict[k], ground_truth_dict[k], not_mandatory_keys)
+ if isinstance(tested_dict[k], dict) != isinstance(ground_truth_dict[k], dict):
+ logger.warning(
+ f'One of the values for "{k}" is a dict, the other one is not'
+ )
+
+ missing_keys = ground_truth_keys - tested_keys
+ unexpected_keys = tested_keys - ground_truth_keys
+ for key in missing_keys:
+ if key in not_mandatory_keys:
+ continue
+ tested_dict[key] = ""
+ logger.info(f'Added "{key}"')
+ if unexpected_keys:
+ logger.warning(f'Keys "{list(unexpected_keys)}" is unexpected')
+
+
+def repair_keys_of_json(
+ path_to_json_tested: str,
+ path_to_json_ground_truth: str,
+ not_mandatory_keys: list = [],
+) -> None:
+ # checks if all keys from path_to_json_ground_truth are in path_to_json_tested, if not it adds them
+ tested_json_dict = read_json(path_to_json_tested)
+ template_json_dicts = read_json(path_to_json_ground_truth)
+ repair_dict_keys(tested_json_dict, template_json_dicts, not_mandatory_keys)
+ write_json(path_to_json_tested, tested_json_dict)
+
+
+def translate_bool(to_translate: bool) -> str:
+ return "yes" if to_translate else "no"
+
+
+def write_json(path_to_json: str, result_dict: dict) -> None:
+ with open(path_to_json, "w") as json_file:
+ json.dump(result_dict, json_file, indent=4)