From 4995d486a60ebc72359d3cdea06a80c115009cf9 Mon Sep 17 00:00:00 2001 From: Prof_Calculus Date: Wed, 6 Nov 2024 10:34:43 +0530 Subject: [PATCH] Support ipv6 address in hostname for kmip client --- kmip/services/server/server.py | 19 ++++++- .../tests/unit/services/server/test_server.py | 56 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/kmip/services/server/server.py b/kmip/services/server/server.py index 534ab61d..01317575 100644 --- a/kmip/services/server/server.py +++ b/kmip/services/server/server.py @@ -28,6 +28,7 @@ from kmip.core import exceptions from kmip.core import policy as operation_policy +from kmip.core.exceptions import InvalidField from kmip.services import auth from kmip.services.server import config from kmip.services.server import engine @@ -228,6 +229,19 @@ def _setup_configuration( if database_path: self.config.set_setting('database_path', database_path) + def __get_address_family(self, hostname): + """ + Determine whether the given hostname is an IPv4 or IPv6 address. + Return socket.AF_INET for IPv4 and socket.AF_INET6 for IPv6. + """ + try: + return socket.getaddrinfo(hostname, None)[0][0] + except Exception: + self._logger.exception( + "Failed to get address family of hostname {}.".format(hostname) + ) + raise InvalidField("Invalid hostname: {}".format(hostname)) + def start(self): """ Prepare the server to start serving connections. @@ -268,7 +282,10 @@ def interrupt_handler(trigger, frame): # Create a TCP stream socket and configure it for immediate reuse. socket.setdefaulttimeout(10) - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket = socket.socket( + self.__get_address_family(self.config.settings.get('hostname')), + socket.SOCK_STREAM + ) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._logger.debug( diff --git a/kmip/tests/unit/services/server/test_server.py b/kmip/tests/unit/services/server/test_server.py index a9e9f194..4f9b3e5c 100644 --- a/kmip/tests/unit/services/server/test_server.py +++ b/kmip/tests/unit/services/server/test_server.py @@ -633,3 +633,59 @@ def test_as_context_manager(self, logging_mock, engine_mock): self.assertTrue(s.start.called) self.assertTrue(s.stop.called) + + @mock.patch('multiprocessing.Manager') + @mock.patch('kmip.services.server.monitor.PolicyDirectoryMonitor') + @mock.patch('kmip.services.server.engine.KmipEngine') + @mock.patch('kmip.services.server.server.KmipServer._setup_logging') + def test_ipv6_support(self, manager_mock, monitor_mock, engine_mock, logging_mock): + """ + Test that the right calls and log messages are triggered when stopping + the server results in an error while shutting down the policy monitor. + """ + a_mock = mock.MagicMock() + b_mock = mock.MagicMock() + ip_and_expected_address_family = [ + ('::1', socket.AF_INET6), + ('127.0.0.1', socket.AF_INET), + ] + + for (ip, expected_address_family) in ip_and_expected_address_family: + s = server.KmipServer( + hostname=ip, + port=5696, + auth_suite='Basic', + config_path=None, + policy_path=None, + tls_cipher_suites='TLS_RSA_WITH_AES_128_CBC_SHA' + ) + s._logger = mock.MagicMock() + + with mock.patch('socket.socket') as socket_mock: + with mock.patch('ssl.wrap_socket') as ssl_mock: + ssl_mock.return_value = b_mock + socket_mock.return_value = a_mock + s.start() + socket_mock.assert_called_once_with( + expected_address_family, + socket.SOCK_STREAM + ) + + s = server.KmipServer( + hostname='invalid_ip', + port=5696, + auth_suite='Basic', + config_path=None, + policy_path=None, + tls_cipher_suites='TLS_RSA_WITH_AES_128_CBC_SHA' + ) + + with mock.patch('socket.socket') as socket_mock: + with mock.patch('ssl.wrap_socket') as ssl_mock: + ssl_mock.return_value = b_mock + socket_mock.return_value = a_mock + self.assertRaisesRegex( + exceptions.InvalidField, + "Invalid hostname: invalid_ip", + s.start + )