|
1 | 1 | import datetime |
2 | 2 | import io |
3 | 3 | from http import HTTPStatus |
4 | | -import json |
5 | | -import os |
6 | 4 | from unittest.mock import mock_open |
7 | 5 | from unittest.mock import patch |
8 | 6 |
|
@@ -281,29 +279,104 @@ def test_download_to_path_succeeds(self): |
281 | 279 | mock_file.assert_called_once() |
282 | 280 | mock_file().write.assert_called_with(b'file_content') |
283 | 281 |
|
284 | | - def test_code_reuse_by_block(self): |
285 | | - TEST_HASH = '73c677dd3b264e7eb80e26e78ac9df1dba30915b5ce3b1bc1c83db52b9c6b30e' |
286 | | - |
287 | | - def load_response_json(file_name: str) -> dict: |
288 | | - path_to_file = os.path.join(os.path.dirname(__file__), '..', 'resources', file_name) |
289 | | - with open(path_to_file, 'rb') as file: |
290 | | - return json.load(file) |
291 | | - |
| 282 | + def test_get_code_blocks_returns_operation_when_wait_is_true(self): |
| 283 | + # Arrange |
| 284 | + test_hash = '73c677dd3b264e7eb80e26e78ac9df1dba30915b5ce3b1bc1c83db52b9c6b30e' |
| 285 | + result_url = f'/analyses/51ea282b-0542-4578-a44a-e60fdfb0d3ec/code-reuse-by-code-block' |
| 286 | + result = { |
| 287 | + 'blocks': { |
| 288 | + '101220': { |
| 289 | + 'code_reuse': ['Common'], |
| 290 | + 'software_type': 'common' |
| 291 | + }, |
| 292 | + '101244': { |
| 293 | + 'code_reuse': ['WannaCry'], |
| 294 | + 'software_type': 'malware' |
| 295 | + } |
| 296 | + } |
| 297 | + } |
| 298 | + |
292 | 299 | with responses.RequestsMock() as mock: |
293 | 300 | mock.post( |
294 | | - url=consts.ANALYZE_URL + |
295 | | - f'/api/v2-0/files/{TEST_HASH}/code-reuse-by-code-block', |
296 | | - status=HTTPStatus.OK, |
297 | | - json=load_response_json('code_reuse_block_response.json')) |
| 301 | + url=f'{self.full_url}/files/{test_hash}/code-reuse-by-code-block', |
| 302 | + status=HTTPStatus.OK, |
| 303 | + json={'result_url': result_url}) |
298 | 304 | mock.get( |
299 | | - url=consts.ANALYZE_URL + |
300 | | - '/api/v2-0/analyses/51ea282b-0542-4578-a44a-e60fdfb0d3ec/code-reuse-by-code-block', |
301 | | - status=HTTPStatus.OK, |
302 | | - json=load_response_json('code_reuse_block_report.json')) |
| 305 | + url=f'{self.full_url}{result_url}', |
| 306 | + status=HTTPStatus.OK, |
| 307 | + json={'status': 'succeeded', 'result': result}) |
| 308 | + |
| 309 | + file_object = File(sha256=test_hash) |
| 310 | + |
| 311 | + # Act |
| 312 | + operation = file_object.get_code_blocks(wait=True) |
| 313 | + |
| 314 | + # Assert |
| 315 | + self.assertEqual(operation.status, consts.AnalysisStatusCode.FINISHED) |
| 316 | + self.assertIsNotNone(operation.result) |
| 317 | + self.assertIn('blocks', operation.result) |
| 318 | + self.assertEqual(len(operation.result['blocks']), 2) |
| 319 | + self.assertEqual(operation.result['blocks']['101220']['software_type'], 'common') |
| 320 | + self.assertEqual(operation.result['blocks']['101244']['software_type'], 'malware') |
| 321 | + |
| 322 | + def test_get_code_blocks_for_file_path_raises_value_error(self): |
| 323 | + # Arrange |
| 324 | + file_obj = File(file_path='/path/to/file') |
| 325 | + |
| 326 | + # Act + Assert |
| 327 | + with self.assertRaises(ValueError): |
| 328 | + file_obj.get_code_blocks() |
| 329 | + |
| 330 | + def test_get_code_blocks_without_wait_returns_operation_in_progress(self): |
| 331 | + # Arrange |
| 332 | + test_hash = 'a' * 64 |
| 333 | + result_url = f'/analyses/test-id/code-reuse-by-code-block' |
| 334 | + |
| 335 | + with responses.RequestsMock() as mock: |
| 336 | + mock.post( |
| 337 | + url=f'{self.full_url}/files/{test_hash}/code-reuse-by-code-block', |
| 338 | + status=HTTPStatus.OK, |
| 339 | + json={'result_url': result_url}) |
303 | 340 |
|
304 | | - file_object = File(sha256=TEST_HASH) |
305 | | - report = file_object.get_code_blocks() |
| 341 | + file_object = File(sha256=test_hash) |
306 | 342 |
|
307 | | - self.assertEqual(len(report), 2527) |
308 | | - self.assertEqual(len([x for x in report if x.is_common]), 1371) |
309 | | - self.assertEqual(len([x for x in report if x.software_type == 'malware']), 171) |
| 343 | + # Act |
| 344 | + operation = file_object.get_code_blocks() |
| 345 | + |
| 346 | + # Assert |
| 347 | + self.assertEqual(operation.status, consts.AnalysisStatusCode.IN_PROGRESS) |
| 348 | + self.assertIsNone(operation.result) |
| 349 | + |
| 350 | + def test_get_code_blocks_raises_hash_does_not_exist_error_when_file_not_found(self): |
| 351 | + # Arrange |
| 352 | + test_hash = 'a' * 64 |
| 353 | + |
| 354 | + with responses.RequestsMock() as mock: |
| 355 | + mock.post( |
| 356 | + url=f'{self.full_url}/files/{test_hash}/code-reuse-by-code-block', |
| 357 | + status=HTTPStatus.NOT_FOUND, |
| 358 | + json={'error': 'File not found'}) |
| 359 | + |
| 360 | + file_object = File(sha256=test_hash) |
| 361 | + |
| 362 | + # Act + Assert |
| 363 | + with self.assertRaises(errors.HashDoesNotExistError): |
| 364 | + file_object.get_code_blocks() |
| 365 | + |
| 366 | + def test_get_code_blocks_raises_value_error_when_file_is_not_code_item(self): |
| 367 | + # Arrange |
| 368 | + test_hash = 'a' * 64 |
| 369 | + |
| 370 | + with responses.RequestsMock() as mock: |
| 371 | + mock.post( |
| 372 | + url=f'{self.full_url}/files/{test_hash}/code-reuse-by-code-block', |
| 373 | + status=HTTPStatus.CONFLICT, |
| 374 | + json={'error': 'Not a code item'}) |
| 375 | + |
| 376 | + file_object = File(sha256=test_hash) |
| 377 | + |
| 378 | + # Act + Assert |
| 379 | + with self.assertRaises(ValueError) as context: |
| 380 | + file_object.get_code_blocks() |
| 381 | + |
| 382 | + self.assertEqual(str(context.exception), 'sha256 is not a code item') |
0 commit comments