From 847f33dacdaa4b676a8f30d1548f129d5efc52b7 Mon Sep 17 00:00:00 2001 From: davidt99 Date: Sun, 7 Sep 2025 18:01:23 +0300 Subject: [PATCH] feat(file): add file class --- CHANGES | 4 + examples/index_by_file.py | 21 +-- examples/index_by_sha256.py | 20 +-- intezer_sdk/file.py | 139 ++++++++++++++++++ tests/unit/test_file.py | 280 ++++++++++++++++++++++++++++++++++++ 5 files changed, 445 insertions(+), 19 deletions(-) create mode 100644 intezer_sdk/file.py create mode 100644 tests/unit/test_file.py diff --git a/CHANGES b/CHANGES index 079e5d4..e7f6a96 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,7 @@ +1.24.0 (Unreleased) +------- +- Introduce File class + 1.23.0 ------- - Add notify to Alert class that returns notified channels diff --git a/examples/index_by_file.py b/examples/index_by_file.py index f1ab1ea..7cdb104 100644 --- a/examples/index_by_file.py +++ b/examples/index_by_file.py @@ -1,26 +1,27 @@ import sys from pprint import pprint +from typing import Optional from intezer_sdk import api from intezer_sdk import consts -from intezer_sdk.index import Index +from intezer_sdk.file import File -def index_by_file_with_wait(file_path, index_as, family_name=None): # type: (str, IndexType, str) -> None +def index_by_file_with_wait(file_path: str, index_as: str, family_name: Optional[str] = None) -> None: api.set_global_api('') - index = Index(file_path=file_path, index_as=consts.IndexType.from_str(index_as), family_name=family_name) - index.send(wait=True) - pprint('Index operation:{}, Index ID:{}'.format(index.status.value, index.index_id)) + file_obj = File(file_path=file_path) + file_obj.index(consts.IndexType.from_str(index_as), family_name=family_name, wait=True) + pprint(f'Index operation: {file_obj.index_status.value}, Index ID: {file_obj.index_id}') -def index_by_file_without_wait(file_path, index_as, family_name=None): # type: (str, IndexType, str) -> None +def index_by_file_without_wait(file_path: str, index_as: str, family_name: Optional[str] = None) -> None: api.set_global_api('') - index = Index(file_path=file_path, index_as=consts.IndexType.from_str(index_as), family_name=family_name) - index.send() - index.wait_for_completion() - pprint('Index operation:{}, Index ID:{}'.format(index.status.value, index.index_id)) + file_obj = File(file_path=file_path) + file_obj.index(consts.IndexType.from_str(index_as), family_name=family_name) + file_obj.wait_for_index_completion() + pprint(f'Index operation: {file_obj.index_status.value}, Index ID: {file_obj.index_id}') if __name__ == '__main__': diff --git a/examples/index_by_sha256.py b/examples/index_by_sha256.py index 846caf4..eeca014 100644 --- a/examples/index_by_sha256.py +++ b/examples/index_by_sha256.py @@ -1,25 +1,27 @@ import sys from pprint import pprint +from typing import Optional from intezer_sdk import api from intezer_sdk import consts -from intezer_sdk.index import Index +from intezer_sdk.file import File -def index_by_sha256_with_wait(sha256, index_as, family_name=None): # type: (str, str, str) -> None +def index_by_sha256_with_wait(sha256: str, index_as: str, family_name: Optional[str] = None) -> None: api.set_global_api('') - index = Index(sha256=sha256, index_as=consts.IndexType.from_str(index_as), family_name=family_name) - index.send(wait=True) - pprint('Index operation:{}, Index ID:{}'.format(index.status.value, index.index_id)) + file_obj = File(sha256=sha256) + file_obj.index(consts.IndexType.from_str(index_as), family_name=family_name, wait=True) + pprint(f'Index operation: {file_obj.index_status.value}, Index ID: {file_obj.index_id}') -def analysis_by_hash_without_wait(sha256, index_as, family_name=None): # type: (str, str, str) -> None +def index_by_sha256_without_wait(sha256: str, index_as: str, family_name: Optional[str] = None) -> None: api.set_global_api('') - index = Index(sha256=sha256, index_as=consts.IndexType.from_str(index_as), family_name=family_name) - index.send(wait=True) - pprint('Index operation:{}, Index ID:{}'.format(index.status, index.index_id)) + file_obj = File(sha256=sha256) + file_obj.index(consts.IndexType.from_str(index_as), family_name=family_name) + file_obj.wait_for_index_completion() + pprint(f'Index operation: {file_obj.index_status.value}, Index ID: {file_obj.index_id}') if __name__ == '__main__': diff --git a/intezer_sdk/file.py b/intezer_sdk/file.py new file mode 100644 index 0000000..2282c5a --- /dev/null +++ b/intezer_sdk/file.py @@ -0,0 +1,139 @@ +from typing import IO +from typing import Optional +from typing import Union + +from intezer_sdk import consts +from intezer_sdk._api import IntezerApi +from intezer_sdk.api import IntezerApiClient +from intezer_sdk.api import get_global_api +from intezer_sdk.index import Index + + +class File: + """ + File is a class for file-related operations including indexing and downloading. + It provides a unified interface for file management operations. + """ + + def __init__(self, + file_path: str = None, + sha256: str = None, + api: IntezerApiClient = None): + """ + File is a class for file-related operations including indexing and downloading. + + :param file_path: The path to the file. + :param sha256: The sha256 hash of the file. + :param api: The API connection to Intezer. + """ + if (sha256 is not None) == (file_path is not None): + raise ValueError('Choose between sha256 or file_path') + + self._file_path = file_path + self._sha256 = sha256 + self._api = IntezerApi(api or get_global_api()) + self._index: Optional[Index] = None + + @property + def sha256(self) -> str: + """Get the SHA256 hash of the file.""" + return self._sha256 + + @property + def file_path(self) -> str: + """Get the file path.""" + return self._file_path + + def index(self, + index_as: consts.IndexType, + family_name: str = None, + wait: Union[bool, int] = False): + """ + Index the file. + + :param index_as: The type of the index (trusted or malicious). + :param family_name: The family name to index as (mandatory if index_as is malicious). + :param wait: Whether to wait for the indexing to complete. + """ + if self._sha256: + self._index = Index(sha256=self._sha256, + index_as=index_as, + family_name=family_name, + api=self._api.api) + else: + self._index = Index(file_path=self._file_path, + index_as=index_as, + family_name=family_name, + api=self._api.api) + + self._index.send(wait=wait) + + def unset_indexing(self, wait: Union[bool, int] = False): + """ + Unset the indexing request (only works for sha256-based files). + + :param wait: Whether to wait for the operation to complete. + """ + if not self._sha256: + raise ValueError('Unset indexing is only supported for sha256-based files') + + if not self._index: + self._index = Index(sha256=self._sha256, + index_as=consts.IndexType.TRUSTED, + api=self._api.api) + + self._index.unset_indexing(wait=wait) + + def wait_for_index_completion(self, interval: int = None, sleep_before_first_check=False): + """ + Blocks until the index is completed. + + :param interval: The interval to wait between checks. + :param sleep_before_first_check: Whether to sleep before the first status check. + """ + if not self._index: + raise ValueError('No index operation in progress') + + self._index.wait_for_completion(interval, sleep_before_first_check) + + def check_index_status(self): + """ + Check the index status. + + :return: The index status code. + """ + if not self._index: + raise ValueError('No index operation in progress') + + return self._index.check_status() + + @property + def index_status(self): + """Get the current index status.""" + if not self._index: + return None + return self._index.status + + @property + def index_id(self): + """Get the index ID.""" + if not self._index: + return None + return self._index.index_id + + def download(self, + path: str = None, + output_stream: IO = None, + password_protection: str = None): + """ + Download the file (only works for sha256-based files). + + ``path`` or ``output_stream`` must be provided. + :param path: A path to where to save the file, it can be either a directory or non-existing file path. + :param output_stream: A file-like object to write the file's content to. + :param password_protection: Set password protection to download file as zip with password. + """ + if not self._sha256: + raise ValueError('Download is only supported for sha256-based files') + + self._api.download_file_by_sha256(self._sha256, path, output_stream, password_protection) diff --git a/tests/unit/test_file.py b/tests/unit/test_file.py new file mode 100644 index 0000000..2280c07 --- /dev/null +++ b/tests/unit/test_file.py @@ -0,0 +1,280 @@ +import datetime +import io +from http import HTTPStatus +from unittest.mock import mock_open +from unittest.mock import patch + +import responses + +from intezer_sdk import consts +from intezer_sdk import errors +from intezer_sdk.file import File +from tests.unit.base_test import BaseTest + + +class FileSpec(BaseTest): + def test_file_initialization_with_both_sha256_and_file_path_raises_value_error(self): + # Act + Assert + with self.assertRaises(ValueError): + File(sha256='a', file_path='/path/to/file') + + def test_file_initialization_with_neither_sha256_nor_file_path_raises_value_error(self): + # Act + Assert + with self.assertRaises(ValueError): + File() + + def test_file_initialization_with_sha256_sets_properties_correctly(self): + # Arrange + Act + file_obj = File(sha256='test_sha256') + + # Assert + self.assertEqual(file_obj.sha256, 'test_sha256') + self.assertIsNone(file_obj.file_path) + + def test_file_initialization_with_file_path_sets_properties_correctly(self): + # Arrange + Act + file_obj = File(file_path='/path/to/file') + + # Assert + self.assertEqual(file_obj.file_path, '/path/to/file') + self.assertIsNone(file_obj.sha256) + + def test_index_malicious_without_family_name_raises_value_error(self): + # Arrange + file_obj = File(sha256='a') + + # Act + Assert + with self.assertRaises(ValueError): + file_obj.index(consts.IndexType.MALICIOUS) + + def test_trusted_index_by_sha256_status_changes_to_created(self): + # Arrange + with responses.RequestsMock() as mock: + mock.post( + url=f'{self.full_url}/files/a/index', + status=HTTPStatus.CREATED, + json={'result_url': '/files/index/testindex'}) + file_obj = File(sha256='a') + + # Act + file_obj.index(consts.IndexType.TRUSTED) + + # Assert + self.assertEqual(file_obj.index_status, consts.IndexStatusCode.CREATED) + self.assertEqual(file_obj.index_id, 'testindex') + + def test_malicious_index_by_sha256_status_changes_to_created(self): + # Arrange + with responses.RequestsMock() as mock: + mock.post( + url=f'{self.full_url}/files/a/index', + status=HTTPStatus.CREATED, + json={'result_url': '/files/index/testindex'}) + file_obj = File(sha256='a') + + # Act + file_obj.index(consts.IndexType.MALICIOUS, family_name='WannaCry') + + # Assert + self.assertEqual(file_obj.index_status, consts.IndexStatusCode.CREATED) + + def test_index_by_file_path_status_changes_to_created(self): + # Arrange + with responses.RequestsMock() as mock: + mock.post( + url=f'{self.full_url}/files/index', + status=HTTPStatus.CREATED, + json={'result_url': '/files/index/testindex'}) + file_obj = File(file_path='a') + + with patch(self.patch_prop, mock_open(read_data='data')): + # Act + file_obj.index(consts.IndexType.TRUSTED) + + # Assert + self.assertEqual(file_obj.index_status, consts.IndexStatusCode.CREATED) + + def test_reindexing_creates_new_index_object(self): + # Arrange + with responses.RequestsMock() as mock: + mock.post( + url=f'{self.full_url}/files/a/index', + status=HTTPStatus.CREATED, + json={'result_url': '/files/index/testindex1'}) + mock.post( + url=f'{self.full_url}/files/a/index', + status=HTTPStatus.CREATED, + json={'result_url': '/files/index/testindex2'}) + file_obj = File(sha256='a') + + # Act + file_obj.index(consts.IndexType.TRUSTED) + first_index_id = file_obj.index_id + + # Reindex with different type + file_obj.index(consts.IndexType.MALICIOUS, family_name='TestFamily') + second_index_id = file_obj.index_id + + # Assert + self.assertEqual(first_index_id, 'testindex1') + self.assertEqual(second_index_id, 'testindex2') + self.assertEqual(file_obj.index_status, consts.IndexStatusCode.CREATED) + + def test_failed_index_raises_index_failed_error(self): + # Arrange + with responses.RequestsMock() as mock: + mock.post( + url=f'{self.full_url}/files/a/index', + status=HTTPStatus.CREATED, + json={'result_url': '/files/index/testindex'}) + mock.get( + url=f'{self.full_url}/files/index/testindex', + status=HTTPStatus.OK, + json={'result_url': '/files/index/testindex', + 'status': 'failed'}) + file_obj = File(sha256='a') + + # Act + Assert + with self.assertRaises(errors.IndexFailedError): + file_obj.index(consts.IndexType.TRUSTED, wait=True) + + def test_index_by_sha256_succeeds_status_changes_to_finished(self): + # Arrange + with responses.RequestsMock() as mock: + mock.post( + url=f'{self.full_url}/files/a/index', + status=HTTPStatus.CREATED, + json={'result_url': '/files/index/testindex'}) + mock.get( + url=f'{self.full_url}/files/index/testindex', + status=HTTPStatus.ACCEPTED) + mock.get( + url=f'{self.full_url}/files/index/testindex', + status=HTTPStatus.OK, + json={'result_url': '/files/index/testindex', + 'status': 'succeeded'}) + file_obj = File(sha256='a') + + # Act + file_obj.index(consts.IndexType.TRUSTED, wait=True) + + # Assert + self.assertEqual(file_obj.index_status, consts.IndexStatusCode.FINISHED) + + def test_index_waits_specific_time_until_completion(self): + # Arrange + with responses.RequestsMock() as mock: + mock.post( + url=f'{self.full_url}/files/a/index', + status=HTTPStatus.CREATED, + json={'result_url': '/files/index/testindex'}) + mock.get( + url=f'{self.full_url}/files/index/testindex', + status=HTTPStatus.OK, + json={'result_url': '/files/index/testindex', + 'status': 'succeeded'}) + file_obj = File(sha256='a') + wait = 1 + + # Act + start = datetime.datetime.utcnow() + file_obj.index(consts.IndexType.TRUSTED, wait=1) + duration = (datetime.datetime.utcnow() - start).total_seconds() + + # Assert + self.assertEqual(file_obj.index_status, consts.IndexStatusCode.FINISHED) + self.assertGreater(duration, wait) + + def test_check_index_status_before_index_sent_raises_error(self): + # Arrange + file_obj = File(sha256='a') + + # Act + Assert + with self.assertRaises(ValueError): + file_obj.check_index_status() + + def test_unset_indexing_for_sha256_file_succeeds(self): + # Arrange + sha256 = 'a' + file_obj = File(sha256=sha256) + with responses.RequestsMock() as mock: + mock.delete( + url=f'{self.full_url}/files/{sha256}/index', + status=HTTPStatus.OK) + + # Act + file_obj.unset_indexing() + + def test_unset_indexing_for_file_path_raises_value_error(self): + # Arrange + file_obj = File(file_path='/path/to/file') + + # Act + Assert + with self.assertRaises(ValueError): + file_obj.unset_indexing() + + def test_download_for_sha256_file_succeeds(self): + # Arrange + sha256 = 'a' + file_obj = File(sha256=sha256) + output_stream = io.BytesIO() + + with responses.RequestsMock() as mock: + mock.get( + url=f'{self.full_url}/files/{sha256}/download', + status=HTTPStatus.OK, + body=b'file_content') + + # Act + file_obj.download(output_stream=output_stream) + + # Assert + output_stream.seek(0) + self.assertEqual(output_stream.read(), b'file_content') + + def test_download_for_file_path_raises_value_error(self): + # Arrange + file_obj = File(file_path='/path/to/file') + + # Act + Assert + with self.assertRaises(ValueError): + file_obj.download(output_stream=io.BytesIO()) + + def test_download_with_password_protection_to_path_succeeds(self): + # Arrange + sha256 = 'a' + file_obj = File(sha256=sha256) + + with responses.RequestsMock() as mock: + mock.get( + url=f'{self.full_url}/files/{sha256}/download', + status=HTTPStatus.OK, + body=b'zip_file_content') + + with patch(self.patch_prop, mock_open()) as mock_file: + # Act + file_obj.download(path='/tmp', password_protection='password123') + + # Assert + mock_file.assert_called_once() + mock_file().write.assert_called_with(b'zip_file_content') + + def test_download_to_path_succeeds(self): + # Arrange + sha256 = 'a' + file_obj = File(sha256=sha256) + + with responses.RequestsMock() as mock: + mock.get( + url=f'{self.full_url}/files/{sha256}/download', + status=HTTPStatus.OK, + body=b'file_content', + headers={'content-disposition': 'attachment; filename=test.exe'}) + + with patch(self.patch_prop, mock_open()) as mock_file: + # Act + file_obj.download(path='/tmp') + + # Assert + mock_file.assert_called_once() + mock_file().write.assert_called_with(b'file_content')