diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml new file mode 100644 index 00000000..0191c36d --- /dev/null +++ b/.github/workflows/python-test.yml @@ -0,0 +1,23 @@ +name: Python tests + +on: + push: + paths: + - "**.py" + pull_request: + paths: + - "**.py" + +jobs: + agentless_api_call_test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.13" + - name: Run Agentless API Call unit tests + run: | + cd aws_quickstart + python -B -S -m unittest datadog_agentless_api_call_test.py -v diff --git a/aws_quickstart/datadog_agentless_api_call.py b/aws_quickstart/datadog_agentless_api_call.py index 3f7d8bb1..40356628 100644 --- a/aws_quickstart/datadog_agentless_api_call.py +++ b/aws_quickstart/datadog_agentless_api_call.py @@ -30,7 +30,7 @@ def call_datadog_agentless_api(event, method): worker_dspm_policy_arn = event["ResourceProperties"].get("WorkerDSPMPolicyArn") # Make the url Request - url = "https://api." + dd_site + "/api/v2/agentless_scanning/accounts/aws" + url = f"https://api.{dd_site}/api/v2/agentless_scanning/accounts/aws" headers = { "DD-API-KEY": api_key, "DD-APPLICATION-KEY": app_key, @@ -39,16 +39,19 @@ def call_datadog_agentless_api(event, method): } if method == "DELETE": - url = url + "/" + account_id - request = Request(url, headers=headers) - request.get_method = lambda: method + url = f"{url}/{account_id}" + request = Request(url, headers=headers, method="DELETE") try: return urllib.request.urlopen(request) except HTTPError as e: - if e.code != 404: - raise e - else: + if e.status < 500: + # For most client errors, the best option is to continue with the + # stack deletion, since users have no way to fix the request, and + # at least this way they can clean up the scanner resources. return e + else: + raise + elif method == "POST": values = { "meta": { @@ -79,13 +82,11 @@ def call_datadog_agentless_api(event, method): } data = json.dumps(values) data = data.encode("utf-8") # data should be bytes - url_account_id = url + "/" + account_id - if is_agentless_scanning_enabled(url_account_id, headers): - request = Request(url_account_id, data=data, headers=headers) - request.get_method = lambda: "PATCH" + url_account = f"{url}/{account_id}" + if is_agentless_scanning_enabled(url_account, headers): + request = Request(url_account, data=data, headers=headers, method="PATCH") else: - request = Request(url, data=data, headers=headers) - request.get_method = lambda: "POST" + request = Request(url, data=data, headers=headers, method="POST") request.add_header("Content-Type", "application/vnd.api+json; charset=utf-8") request.add_header("Content-Length", len(data)) response = urllib.request.urlopen(request) @@ -95,17 +96,17 @@ def call_datadog_agentless_api(event, method): return None -def is_agentless_scanning_enabled(url_account_id, headers): +def is_agentless_scanning_enabled(url_account, headers): """Check if agentless scanning is already enabled for the account""" + request = Request(url_account, headers=headers, method="GET") try: - request = Request(url_account_id, headers=headers) - request.get_method = lambda: "GET" - response = urllib.request.urlopen(request) - return response.getcode() == 200 + urllib.request.urlopen(request) except HTTPError as e: - if e.code != 404: - raise e - return False + if e.status == 404: + return False + else: + raise + return True def handler(event, context): @@ -114,24 +115,14 @@ def handler(event, context): if event["RequestType"] == "Create": LOGGER.info("Received Create request.") response = call_datadog_agentless_api(event, "POST") - if response.getcode() == 201 or response.getcode() == 204: - send_response( - event, - context, - "SUCCESS", - { - "Message": "Datadog AWS Agentless Scanning Integration created successfully.", - }, - ) - else: - LOGGER.error("Failed - unexpected status code: %d", response.getcode()) - send_response( - event, - context, - "FAILED", - {"Message": "Http response: {}".format(response.msg)}, - ) - + send_response( + event, + context, + "SUCCESS", + { + "Message": f"Datadog Agentless Scanning activated (status: {response.status}).", + }, + ) elif event["RequestType"] == "Update": LOGGER.info("Received Update request.") send_response( @@ -143,25 +134,14 @@ def handler(event, context): elif event["RequestType"] == "Delete": LOGGER.info("Received Delete request.") response = call_datadog_agentless_api(event, "DELETE") - - if response.getcode() == 200: - send_response( - event, - context, - "SUCCESS", - { - "Message": "Datadog AWS Agentless Scanning Integration deleted successfully.", - }, - ) - else: - LOGGER.error("Failed - unexpected status code: %d", response.getcode()) - send_response( - event, - context, - "FAILED", - {"Message": "Http response: {}".format(response.msg)}, - ) - + send_response( + event, + context, + "SUCCESS", + { + "Message": f"Datadog Agentless Scanning deactivated (status: {response.status}).", + }, + ) else: LOGGER.error( "Failed - received unexpected request: %s", event["RequestType"] @@ -178,7 +158,7 @@ def handler(event, context): event, context, "FAILED", - {"Message": "Exception during processing: {}".format(e)}, + {"Message": f"Exception during processing: {e}"}, ) @@ -204,12 +184,11 @@ def send_response(event, context, response_status, response_data): LOGGER.info("ResponseBody: %s", response_body) opener = build_opener(HTTPHandler) - request = Request(event["ResponseURL"], data=formatted_response) + request = Request(event["ResponseURL"], data=formatted_response, method="PUT") request.add_header("Content-Type", "application/json; charset=utf-8") request.add_header("Content-Length", len(formatted_response)) - request.get_method = lambda: "PUT" response = opener.open(request) - LOGGER.info("Status code: %s", response.getcode()) + LOGGER.info("Status code: %s", response.status) LOGGER.info("Status message: %s", response.msg) diff --git a/aws_quickstart/datadog_agentless_api_call_test.py b/aws_quickstart/datadog_agentless_api_call_test.py new file mode 100644 index 00000000..1187b898 --- /dev/null +++ b/aws_quickstart/datadog_agentless_api_call_test.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 + +import json +import unittest +from unittest.mock import patch, Mock +from urllib.error import HTTPError + +# Import the functions to test +from datadog_agentless_api_call import ( + call_datadog_agentless_api, + is_agentless_scanning_enabled, +) + + +class TestCallDatadogAgentlessAPI(unittest.TestCase): + """Test cases for call_datadog_agentless_api function""" + + def setUp(self): + """Set up test fixtures""" + self.base_event = { + "ResourceProperties": { + "TemplateVersion": "1.0.0", + "APIKey": "0123456789abcdef0123456789abcdef", + "APPKey": "0123456789abcdef0123456789abcdef12345678", + "DatadogSite": "datadoghq.com", + "AccountId": "123456789012", + "Hosts": "true", + "Containers": "false", + "Lambdas": "true", + "SensitiveData": "false", + }, + "StackId": "arn:aws:cloudformation:us-east-1:358251252154:stack/DatadogAgentlessIntegration/22b23bca-de8b-451c-99e4-c69b9ad20ec7", + } + site = self.base_event["ResourceProperties"]["DatadogSite"] + self.url = f"https://api.{site}/api/v2/agentless_scanning/accounts/aws" + + def create_mock_response(self, status_code, headers={}, data=b""): + """Helper method to create a mock HTTP response""" + response = Mock() + response.status = status_code + response.headers = headers + response.read.return_value = data + return response + + def create_mock_http_error(self, status_code, headers={}, data=b""): + """Helper method to create a mock HTTPError""" + response = self.create_mock_response(status_code, headers, data) + return HTTPError(self.url, status_code, "Test Error", headers, response) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + @patch("datadog_agentless_api_call.is_agentless_scanning_enabled") + def test_post_success_200(self, mock_is_enabled, mock_urlopen): + """Test successful POST request with 200 status code""" + mock_is_enabled.return_value = False + mock_response = self.create_mock_response(200) + mock_urlopen.return_value = mock_response + + result = call_datadog_agentless_api(self.base_event, "POST") + + self.assertEqual(result.status, 200) + + # Verify that the request was made with the POST method + call_args = mock_urlopen.call_args[0][0] + self.assertEqual(call_args.get_method(), "POST") + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + @patch("datadog_agentless_api_call.is_agentless_scanning_enabled") + def test_post_success_201(self, mock_is_enabled, mock_urlopen): + """Test successful POST request with 201 status code""" + mock_is_enabled.return_value = False + mock_response = self.create_mock_response(201) + mock_urlopen.return_value = mock_response + + result = call_datadog_agentless_api(self.base_event, "POST") + + self.assertEqual(result.status, 201) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + @patch("datadog_agentless_api_call.is_agentless_scanning_enabled") + def test_post_success_204(self, mock_is_enabled, mock_urlopen): + """Test successful POST request with 204 status code""" + mock_is_enabled.return_value = False + mock_response = self.create_mock_response(204) + mock_urlopen.return_value = mock_response + + result = call_datadog_agentless_api(self.base_event, "POST") + + self.assertEqual(result.status, 204) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + @patch("datadog_agentless_api_call.is_agentless_scanning_enabled") + def test_post_error_400(self, mock_is_enabled, mock_urlopen): + """Test POST request with 400 error""" + mock_is_enabled.return_value = False + mock_error = self.create_mock_http_error(400) + mock_urlopen.side_effect = mock_error + + with self.assertRaises(HTTPError): + call_datadog_agentless_api(self.base_event, "POST") + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + @patch("datadog_agentless_api_call.is_agentless_scanning_enabled") + def test_post_error_404(self, mock_is_enabled, mock_urlopen): + """Test POST request with 404 error""" + mock_is_enabled.return_value = False + mock_error = self.create_mock_http_error(404) + mock_urlopen.side_effect = mock_error + + with self.assertRaises(HTTPError): + call_datadog_agentless_api(self.base_event, "POST") + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + @patch("datadog_agentless_api_call.is_agentless_scanning_enabled") + def test_post_error_500(self, mock_is_enabled, mock_urlopen): + """Test POST request with 500 error""" + mock_is_enabled.return_value = False + mock_error = self.create_mock_http_error(500) + mock_urlopen.side_effect = mock_error + + with self.assertRaises(HTTPError): + call_datadog_agentless_api(self.base_event, "POST") + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + @patch("datadog_agentless_api_call.is_agentless_scanning_enabled") + def test_post_patch_when_enabled(self, mock_is_enabled, mock_urlopen): + """Test POST request uses PATCH when agentless scanning is already enabled""" + mock_is_enabled.return_value = True + mock_response = self.create_mock_response(200) + mock_urlopen.return_value = mock_response + + result = call_datadog_agentless_api(self.base_event, "POST") + + self.assertEqual(result.status, 200) + + # Verify that the request was made with the PATCH method + call_args = mock_urlopen.call_args[0][0] + self.assertEqual(call_args.get_method(), "PATCH") + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + def test_delete_success_200(self, mock_urlopen): + """Test successful DELETE request with 200 status code""" + mock_response = self.create_mock_response(200) + mock_urlopen.return_value = mock_response + + result = call_datadog_agentless_api(self.base_event, "DELETE") + + self.assertEqual(result.status, 200) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + def test_delete_success_204(self, mock_urlopen): + """Test successful DELETE request with 204 status code""" + mock_response = self.create_mock_response(204) + mock_urlopen.return_value = mock_response + + result = call_datadog_agentless_api(self.base_event, "DELETE") + + self.assertEqual(result.status, 204) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + def test_delete_error_404_returns_error(self, mock_urlopen): + """Test DELETE request with 404 error returns the error instead of raising""" + mock_error = self.create_mock_http_error(404) + mock_urlopen.side_effect = mock_error + + result = call_datadog_agentless_api(self.base_event, "DELETE") + + self.assertEqual(result.status, 404) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + def test_delete_error_500_raises_exception(self, mock_urlopen): + """Test DELETE request with 500 error raises exception""" + mock_error = self.create_mock_http_error(500) + mock_urlopen.side_effect = mock_error + + with self.assertRaises(HTTPError): + call_datadog_agentless_api(self.base_event, "DELETE") + + def test_unsupported_method_returns_none(self): + """Test that unsupported HTTP methods return None""" + result = call_datadog_agentless_api(self.base_event, "PUT") + self.assertIsNone(result) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + @patch("datadog_agentless_api_call.is_agentless_scanning_enabled") + def test_post_request_payload_structure(self, mock_is_enabled, mock_urlopen): + """Test that POST request payload has correct structure""" + mock_is_enabled.return_value = False + mock_response = self.create_mock_response(200) + mock_urlopen.return_value = mock_response + + call_datadog_agentless_api(self.base_event, "POST") + + # Get the request that was made + call_args = mock_urlopen.call_args[0][0] + request_data = call_args.data.decode("utf-8") + payload = json.loads(request_data) + + # Verify payload structure + self.assertIn("data", payload) + self.assertIn("type", payload["data"]) + self.assertEqual(payload["data"]["type"], "aws_scan_options") + + +class TestIsAgentlessScanningEnabled(unittest.TestCase): + """Test cases for is_agentless_scanning_enabled function""" + + def setUp(self): + """Set up test fixtures""" + self.account_id = "123456789012" + self.url = f"https://api.datadoghq.com/api/v2/agentless_scanning/accounts/aws/{self.account_id}" + self.headers = { + "DD-API-KEY": "0123456789abcdef0123456789abcdef", + "DD-APPLICATION-KEY": "0123456789abcdef0123456789abcdef12345678", + } + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + def test_enabled_returns_true(self, mock_urlopen): + """Test that function returns True when agentless scanning is enabled""" + mock_response = Mock() + mock_response.status = 200 + mock_urlopen.return_value = mock_response + + result = is_agentless_scanning_enabled(self.url, self.headers) + + self.assertTrue(result) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + def test_disabled_returns_false(self, mock_urlopen): + """Test that function returns False when agentless scanning is disabled (404)""" + mock_error = HTTPError(self.url, 404, "Not Found", {}, Mock()) + mock_urlopen.side_effect = mock_error + + result = is_agentless_scanning_enabled(self.url, self.headers) + + self.assertFalse(result) + + @patch("datadog_agentless_api_call.urllib.request.urlopen") + def test_other_error_raises_exception(self, mock_urlopen): + """Test that function raises exception for non-404 errors""" + mock_error = HTTPError(self.url, 500, "Internal Server Error", {}, Mock()) + mock_urlopen.side_effect = mock_error + + with self.assertRaises(HTTPError): + is_agentless_scanning_enabled(self.url, self.headers) + + +if __name__ == "__main__": + unittest.main() diff --git a/aws_quickstart/datadog_agentless_delegate_role.yaml b/aws_quickstart/datadog_agentless_delegate_role.yaml index b7881df0..d7c0c76a 100644 --- a/aws_quickstart/datadog_agentless_delegate_role.yaml +++ b/aws_quickstart/datadog_agentless_delegate_role.yaml @@ -341,7 +341,7 @@ Resources: LoggingConfig: ApplicationLogLevel: "INFO" LogFormat: "JSON" - Runtime: "python3.11" + Runtime: "python3.13" Timeout: 30 Code: ZipFile: | diff --git a/aws_quickstart/datadog_agentless_scanning.yaml b/aws_quickstart/datadog_agentless_scanning.yaml index b7337c53..c167d2d5 100644 --- a/aws_quickstart/datadog_agentless_scanning.yaml +++ b/aws_quickstart/datadog_agentless_scanning.yaml @@ -1075,7 +1075,7 @@ Resources: LoggingConfig: ApplicationLogLevel: "INFO" LogFormat: "JSON" - Runtime: "python3.11" + Runtime: "python3.13" Timeout: 30 Code: ZipFile: |