From 93ba7376098d9a3b6d039475e15778b0ffd024de Mon Sep 17 00:00:00 2001 From: Julien Rische Date: Fri, 3 Oct 2025 17:39:36 +0200 Subject: [PATCH 1/3] Fix DoS vulnerability based on unbounded TCP buffering In Application.__handle_recv(), the next part of the TCP exchange is received and queued to the io.BytesIO stream. Then, the content of the stream was systematically exported to a buffer. However, this buffer is only used if the data transfer is finished, causing a waste of processing resources if the message is received in multiple parts. On top of these unnecessary operations, this function does not handle length limits properly: it accepts to receive chunks of data with both an individual and total length larger than the maximum theoretical length of a Kerberos message, and will continue to wait for data as long as the input stream's length is not exactly the same as the one provided in the header of the response (even if the stream is already longer than the expected length). If the kdcproxy service is not protected against DNS discovery abuse, the attacker could take advantage of these problems to operate a denial-of-service attack (CVE-2025-59089). After this commit, kdcproxy will interrupt the receiving of a message after it exceeds the maximum length of a Kerberos message or the length indicated in the message header. Also it will only export the content of the input stream to a buffer once the receiving process has ended. Signed-off-by: Julien Rische --- kdcproxy/__init__.py | 51 +++++++++++++++++++------------- tests.py | 70 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 21 deletions(-) diff --git a/kdcproxy/__init__.py b/kdcproxy/__init__.py index ce96a0c..d7fb61e 100644 --- a/kdcproxy/__init__.py +++ b/kdcproxy/__init__.py @@ -149,6 +149,7 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): if self.sock_type(sock) == socket.SOCK_STREAM: # Remove broken TCP socket from readers rsocks.remove(sock) + read_buffers.pop(sock) else: if reply is not None: return reply @@ -174,7 +175,7 @@ def __handle_recv(self, sock, read_buffers): if self.sock_type(sock) == socket.SOCK_DGRAM: # For UDP sockets, recv() returns an entire datagram # package. KDC sends one datagram as reply. - reply = sock.recv(1048576) + reply = sock.recv(self.MAX_LENGTH) # If we proxy over UDP, we will be missing the 4-byte # length prefix. So add it. reply = struct.pack("!I", len(reply)) + reply @@ -186,30 +187,38 @@ def __handle_recv(self, sock, read_buffers): if buf is None: read_buffers[sock] = buf = io.BytesIO() - part = sock.recv(1048576) - if not part: - # EOF received. Return any incomplete data we have on the theory - # that a decode error is more apparent than silent failure. The - # client will fail faster, at least. - read_buffers.pop(sock) - reply = buf.getvalue() - return reply + part = sock.recv(self.MAX_LENGTH) + if part: + # Data received, accumulate it in a buffer. + buf.write(part) - # Data received, accumulate it in a buffer. - buf.write(part) + reply = buf.getbuffer() + if len(reply) < 4: + # We don't have the length yet. + return None - reply = buf.getvalue() - if len(reply) < 4: - # We don't have the length yet. - return None + # Got enough data to check if we have the full package. + (length, ) = struct.unpack("!I", reply[0:4]) + length += 4 # add prefix length - # Got enough data to check if we have the full package. - (length, ) = struct.unpack("!I", reply[0:4]) - if length + 4 == len(reply): - read_buffers.pop(sock) - return reply + if length > self.MAX_LENGTH: + raise ValueError('Message length exceeds the maximum length ' + 'for a Kerberos message (%i > %i)' + % (length, self.MAX_LENGTH)) - return None + if len(reply) > length: + raise ValueError('Message length exceeds its expected length ' + '(%i > %i)' % (len(reply), length)) + + if len(reply) < length: + return None + + # Else (if part is None), EOF was received. Return any incomplete data + # we have on the theory that a decode error is more apparent than + # silent failure. The client will fail faster, at least. + + read_buffers.pop(sock) + return buf.getvalue() def __filter_addr(self, addr): if addr[0] not in (socket.AF_INET, socket.AF_INET6): diff --git a/tests.py b/tests.py index cd82781..2a1ad6e 100644 --- a/tests.py +++ b/tests.py @@ -20,6 +20,8 @@ # THE SOFTWARE. import os +import socket +import struct import unittest from base64 import b64decode try: @@ -122,6 +124,74 @@ def test_no_server(self): kpasswd=True) self.assertEqual(response.status_code, 503) + @mock.patch("socket.getaddrinfo", return_value=addrinfo) + @mock.patch("socket.socket") + def test_tcp_message_length_exceeds_max(self, m_socket, m_getaddrinfo): + # Test that TCP messages with length > MAX_LENGTH raise ValueError + # Create a message claiming to be larger than MAX_LENGTH + max_len = self.app.MAX_LENGTH + # Length prefix claiming message is larger than allowed + oversized_length = max_len + 1 + malicious_msg = struct.pack("!I", oversized_length) + + # Mock socket to return the malicious length prefix + mock_sock = m_socket.return_value + mock_sock.recv.return_value = malicious_msg + mock_sock.getsockopt.return_value = socket.SOCK_STREAM + + # Manually call the receive method to test it + read_buffers = {} + with self.assertRaises(ValueError) as cm: + self.app._Application__handle_recv(mock_sock, read_buffers) + + self.assertIn("exceeds the maximum length", str(cm.exception)) + self.assertIn(str(max_len), str(cm.exception)) + + @mock.patch("socket.getaddrinfo", return_value=addrinfo) + @mock.patch("socket.socket") + def test_tcp_message_data_exceeds_expected_length( + self, m_socket, m_getaddrinfo + ): + # Test that receiving more data than expected raises ValueError + # Create a message with length = 100 but send more data + expected_length = 100 + length_prefix = struct.pack("!I", expected_length) + # Send more data than the length prefix indicates + extra_data = b"X" * (expected_length + 10) + malicious_msg = length_prefix + extra_data + + mock_sock = m_socket.return_value + mock_sock.recv.return_value = malicious_msg + mock_sock.getsockopt.return_value = socket.SOCK_STREAM + + read_buffers = {} + with self.assertRaises(ValueError) as cm: + self.app._Application__handle_recv(mock_sock, read_buffers) + + self.assertIn("exceeds its expected length", str(cm.exception)) + + @mock.patch("socket.getaddrinfo", return_value=addrinfo) + @mock.patch("socket.socket") + def test_tcp_eof_returns_buffered_data(self, m_socket, m_getaddrinfo): + # Test that EOF returns any buffered data + initial_data = b"\x00\x00\x00\x10" # Length = 16 + mock_sock = m_socket.return_value + mock_sock.getsockopt.return_value = socket.SOCK_STREAM + + # First recv returns some data, second returns empty (EOF) + mock_sock.recv.side_effect = [initial_data, b""] + + read_buffers = {} + # First call buffers the data + result = self.app._Application__handle_recv(mock_sock, read_buffers) + self.assertIsNone(result) # Not complete yet + + # Second call gets EOF and returns buffered data + result = self.app._Application__handle_recv(mock_sock, read_buffers) + self.assertEqual(result, initial_data) + # Buffer should be cleaned up + self.assertNotIn(mock_sock, read_buffers) + def decode(data): data = data.replace(b'\\n', b'') From 0254c168919de84867d31a87698287900d560e9f Mon Sep 17 00:00:00 2001 From: Julien Rische Date: Fri, 3 Oct 2025 17:40:25 +0200 Subject: [PATCH 2/3] Use DNS discovery for declared realms only Allowing the use of DNS discovery for any requested realm (i.e. querying SRV records from the DNS zone matching the realm name) created a server-side request forgery vulnerability (CVE-2025-59088). An attacker could take advantage of a DNS zone they control to have kdcproxy direct their request to any IP addresses (including loopback and internal network) and port, allowing network and firewall rules probing, and data exfiltration. This commit mitigates this risk by making the global "use_dns" parameter apply only to realms declared in the kdcproxy configuration, and other configurations if their modules are enabled. To accommodate cases where realm hierarchies (like AD forests) are proxied, support for wildcards is added for realm section names. This can be used to have any "sub-realm" considered known, and therefore allowed to use DNS to discover their KDCs. The new "dns_realm_discovery" parameter can be enabled (if "use_dns" is not globally disabled) to allow use of DNS discovery for unknown realms too, restoring the previous unsafe behavior. For any KDC address obtained by DNS discovery, a warning is logged if the port is not a standard Kerberos port. This warning can be silenced using the "silence_port_warn" configuration parameter. Signed-off-by: Julien Rische --- README | 77 ++-- kdcproxy/config/__init__.py | 238 ++++++++-- kdcproxy/config/mit.py | 23 +- tests.py | 843 +++++++++++++++++++++++++++++++++++- 4 files changed, 1093 insertions(+), 88 deletions(-) diff --git a/README b/README index 9458ef7..bf999db 100644 --- a/README +++ b/README @@ -45,25 +45,43 @@ may still need it). This permits the use of longer timeouts and prevents possible lockouts when the KDC packets contain OTP token codes (which should preferably be sent to only one server). -Automatic Configuration ------------------------ -By default, no configuration is necessary. In this case, kdcproxy will use -REALM DNS SRV record lookups to determine remote KDC locations. - -Master Configuration File +Main Configuration File ------------------------- -If you wish to have more detailed configuration, the first place you can -configure kdcproxy is the master configuration file. This file exists at the -location specified in the environment variable KDCPROXY_CONFIG. If this -variable is unspecified, the default locations are +The location of kdcproxy's main configuration file is specified by the +`KDCPROXY_CONFIG` environment variable. If not set, the default locations are `/usr/local/etc/kdcproxy.conf` or `/etc/kdcproxy.conf`. This configuration file takes precedence over all other configuration modules. This file is an -ini-style configuration with a special section **[global]**. Two parameters -are available in this section: **configs** and **use_dns**. - -The **use_dns** allows you to enable or disable use of DNS SRV record lookups. - -The **configs** parameter allows you to load other configuration modules for +ini-style configuration with a special **[global]** section, wildcard realm +sections, and exact realm sections. + +Exact realm sections are named after the realms that kdcproxy is expected to +receive requests for. Wildcard realm sections differ from exact realm sections +by being prefixed by a '\*' character. Such sections will match with realms +having either all or their final labels in common with the section. As an +example, **[\*EXAMPLE.COM]** will match with `EXAMPLE.COM`, `SUB.EXAMPLE.COM`, +and `SUB.SUB.EXAMPLE.COM`, but not `MYEXAMPLE.COM`. + +The following parameters can be set on any of these sections, with exact realm +parameters having higher precedence, followed by wildcard realm parameters, and +then global parameters: + +**use_dns** (boolean): Allows querying DNS SRV records (aka. DNS discovery) to +find KDCs associated with the requested realm in case they are not explicitly +set in the configuration (main one, or configuration module-provided). By +default (or if explicitly enabled globally), this mechanism is **activated only +for realms explicitly declared** in the main (an empty section named after the +realm, or a matching wildcard realm section is enough) or module-provided +configuration. To allow use of DNS discovery for any requested realm, see the +**dns_realm_discovery** parameter. + +**silence_port_warn** (boolean): When DNS SRV records are used to discover KDC +addresses, kdcproxy will write a warning in the logs in case a non-standard +port is found in the DNS response. Setting this parameter to `true` will +silence such warnings. + +The following parameters are specific to the **[global]** section: + +**configs** (string): Allows you to load other configuration modules for finding configuration in other places. The configuration modules specified in here will have priority in the order listed. For instance, if you wished to read configuration from MIT libkrb5, you would set the following: @@ -71,11 +89,19 @@ read configuration from MIT libkrb5, you would set the following: [global] configs = mit -Aside from the **[global]** section, you may also specify manual configuration -for realms. In this case, each section is the name of the realm and the -parameters are **kerberos** or **kpasswd**. These specify the locations of the -remote servers for krb5 AS requests and kpasswd requests, respectively. For -example: +**dns_realm_discovery** (boolean): When **use_dns** is not disabled globally, +kdcproxy is allowed to query SRV records to find KDCs of the realms declared in +its configuration only. This protects kdcproxy from attacks based on +server-side request forgery (CVE-2025-59088). Allowing DNS discovery for +unknown realms too is possible by also setting **dns_realm_discovery** to true, +yet heavily discouraged: + + [global] + dns_realm_discovery = true + +Exact realm sections have 2 specific parameters: **kerberos** and **kpasswd**. +These specify the locations of the remote servers for Kerberos ticket requests, +and kpasswd requests, respectively. For example: [EXAMPLE.COM] kerberos = kerberos+tcp://kdc.example.com:88 @@ -95,11 +121,10 @@ forwarding requests. The port number is optional. Possible schemes are: MIT libkrb5 ----------- -If you load the **mit** config module in the master configuration file, -kdcproxy will also read the config using libkrb5 (usually /etc/krb5.conf). If -this module is used, kdcproxy will respect the DNS settings from the -**[libdefaults]** section and the realm configuration from the **[realms]** -section. +If you load the **mit** config module in the main configuration file, kdcproxy +will also read the config using libkrb5 (usually /etc/krb5.conf). If this +module is used, kdcproxy will respect the realm configuration from the +**[realms]** section. For more information, see the documentation for MIT's krb5.conf. diff --git a/kdcproxy/config/__init__.py b/kdcproxy/config/__init__.py index 8e17c5b..034fcf3 100644 --- a/kdcproxy/config/__init__.py +++ b/kdcproxy/config/__init__.py @@ -20,7 +20,6 @@ # THE SOFTWARE. import importlib -import itertools import logging import os @@ -35,18 +34,32 @@ logging.basicConfig() logger = logging.getLogger('kdcproxy') +SRV_KRB = 'kerberos' +SRV_KPWD = 'kpasswd' +SRV_KPWD_ADM = 'kerberos-adm' + class IResolver(object): def lookup(self, realm, kpasswd=False): + # type: (str, bool) -> Iterable[str] "Returns an iterable of remote server URIs." raise NotImplementedError() class IConfig(IResolver): - def use_dns(self): - "Returns whether or not DNS should be used. Returns None if not set." + def realm_configured(self, realm): + # type: (str) -> bool + """Check if a realm is declared in the configuration.""" + raise NotImplementedError() + + def param(self, realm, param): + # type: (str, str) -> bool + """Get a configuration parameter value for a realm. + + None can be passed as realm to query global parameters only. + """ raise NotImplementedError() @@ -54,19 +67,43 @@ class KDCProxyConfig(IConfig): GLOBAL = "global" default_filenames = ["/usr/local/etc/kdcproxy.conf", "/etc/kdcproxy.conf"] + GLOBAL_PARAMS = { + 'dns_realm_discovery': False, + } + GENERAL_PARAMS = { + 'use_dns': True, + 'silence_port_warn': False, + } + RESOLV_PARAMS = [SRV_KRB, SRV_KPWD] + + @staticmethod + def __get_cfg_param(cp, section, param, typ): + """Retrieve a typed parameter from a configuration section.""" + try: + if typ is bool: + return cp.getboolean(section, param) + elif typ is str: + return cp.get(section, param) + else: + raise ValueError( + 'Configuration parameters cannot have "%s" type' % + typ.__name__) + except configparser.Error: + return None + def __init__(self, filenames=None): - self.__cp = configparser.ConfigParser() + cp = configparser.ConfigParser() if filenames is None: filenames = os.environ.get("KDCPROXY_CONFIG", None) if filenames is None: filenames = self.default_filenames try: - self.__cp.read(filenames) + cp.read(filenames) except configparser.Error: logger.error("Unable to read config file(s): %s", filenames) try: - mod = self.__cp.get(self.GLOBAL, "configs") + mod = cp.get(self.GLOBAL, "configs") try: importlib.import_module("kdcproxy.config." + mod) except ImportError as e: @@ -74,23 +111,98 @@ def __init__(self, filenames=None): except configparser.Error: pass + self.__config = dict() + + for section in cp.sections(): + self.__config.setdefault(section, {}) + for param in self.GENERAL_PARAMS.keys(): + value = self.__get_cfg_param(cp, section, param, bool) + if value is not None: + self.__config[section][param] = value + if section == self.GLOBAL: + for param in self.GLOBAL_PARAMS.keys(): + value = self.__get_cfg_param(cp, section, param, bool) + if value is not None: + self.__config[section][param] = value + elif not section.startswith('*'): + for service in self.RESOLV_PARAMS: + servers = self.__get_cfg_param(cp, section, service, str) + if servers: + self.__config[section][service] = ( + tuple(servers.split()) + ) + + def __global_forbidden(self, realm): + """Raise ValueError if realm name is 'global'.""" + if realm == self.GLOBAL: + raise ValueError('"%s" is not allowed as realm name' % realm) + def lookup(self, realm, kpasswd=False): - service = "kpasswd" if kpasswd else "kerberos" - try: - servers = self.__cp.get(realm, service) - return map(lambda s: s.strip(), servers.strip().split(" ")) - except configparser.Error: + self.__global_forbidden(realm) + service = SRV_KPWD if kpasswd else SRV_KRB + if realm in self.__config and service in self.__config[realm]: + return self.__config[realm][service] + else: return () - def use_dns(self): - try: - return self.__cp.getboolean(self.GLOBAL, "use_dns") - except configparser.Error: - return None + def realm_configured(self, realm): + """Check if a realm is declared in the configuration. + + Matches exact realm sections or wildcard realm sections. + """ + self.__global_forbidden(realm) + + if realm in self.__config: + return True + + realm_labels = realm.split('.') + for i in range(len(realm_labels)): + rule = '*' + '.'.join(realm_labels[i:]) + if rule in self.__config: + return True + + return False + + def param(self, realm, param): + """Get a configuration parameter value for a realm. + + None can be passed as realm to query global parameters only. + Precedence: exact realm, wildcard realm, global, default. + """ + self.__global_forbidden(realm) + + if realm is not None: + if param in self.__config.get(realm, {}): + # Parameter found in realm section + return self.__config[realm][param] + + realm_labels = realm.split('.') + for i in range(len(realm_labels)): + rule = '*' + '.'.join(realm_labels[i:]) + if param in self.__config.get(rule, {}): + # Parameter found in realm matching rule + return self.__config[rule][param] + + if param in self.__config.get(self.GLOBAL, {}): + # Fallback to global section + return self.__config[self.GLOBAL][param] + + if param in self.GENERAL_PARAMS: + # Fallback to default value if general parameter not set + return self.GENERAL_PARAMS[param] + + if param in self.GLOBAL_PARAMS: + # Fallback to default value if global parameter not set + return self.GLOBAL_PARAMS[param] + + raise ValueError('Configuration parameter "%s" does not exist' % param) class DNSResolver(IResolver): + def __init__(self, log_warning=None): + self.__log_warning = log_warning + def __dns(self, service, protocol, realm): query = '_%s._%s.%s' % (service, protocol, realm) @@ -109,48 +221,38 @@ def __dns(self, service, protocol, realm): yield (host, entry.port) def lookup(self, realm, kpasswd=False): - service = "kpasswd" if kpasswd else "kerberos" + service = SRV_KPWD if kpasswd else SRV_KRB for protocol in ("tcp", "udp"): - servers = tuple(self.__dns(service, protocol, realm)) + sv = service + servers = tuple(self.__dns(sv, protocol, realm)) if not servers and kpasswd: - servers = self.__dns("kerberos-adm", protocol, realm) + sv = SRV_KPWD_ADM + servers = self.__dns(sv, protocol, realm) for host, port in servers: + if self.__log_warning: + self.__log_warning(sv, protocol, realm, kpasswd, host, + port) yield "%s://%s:%d" % (service, host, port) class MetaResolver(IResolver): - SCHEMES = ("kerberos", "kerberos+tcp", "kerberos+udp", - "kpasswd", "kpasswd+tcp", "kpasswd+udp", - "http", "https",) - def __init__(self): - self.__resolvers = [] - for i in itertools.count(0): - allsub = IConfig.__subclasses__() - if not i < len(allsub): - break + STANDARD_PORTS = {SRV_KRB: 88, SRV_KPWD: 464} + def __init__(self): + self.__config = KDCProxyConfig() + self.__dns_resolver = DNSResolver(self.__log_warning) + self.__extra_configs = [] + for cfgcls in IConfig.__subclasses__(): + if cfgcls is KDCProxyConfig: + continue try: - self.__resolvers.append(allsub[i]()) + self.__extra_configs.append(cfgcls()) except Exception as e: - fmt = (allsub[i], repr(e)) - logging.log(logging.WARNING, - "Error instantiating %s due to %s" % fmt) - assert self.__resolvers - - # See if we should use DNS - dns = None - for cfg in self.__resolvers: - tmp = cfg.use_dns() - if tmp is not None: - dns = tmp - break - - # If DNS is enabled, append the DNSResolver at the end - if dns in (None, True): - self.__resolvers.append(DNSResolver()) + logging.warning("Error instantiating %s due to %s", cfgcls, + repr(e)) def __unique(self, items): "Removes duplicate items from an iterable while maintaining order." @@ -161,10 +263,52 @@ def __unique(self, items): unique.remove(item) yield item + def __silenced_port_warn(self, realm): + """Check if port warnings are silenced for a realm.""" + return self.__config.param(realm, 'silence_port_warn') + + def __log_warning(self, service, protocol, realm, kpasswd, host, port): + """Log a warning if a KDC uses a non-standard port.""" + if not self.__silenced_port_warn(realm): + expected_port = self.STANDARD_PORTS[SRV_KPWD if kpasswd + else SRV_KRB] + if port != expected_port: + logger.warning( + 'DNS SRV record _%s._%s.%s. points to KDC %s with ' + 'non-standard port %i (%i expected)', + service, protocol, realm, host, port, expected_port) + + def __realm_configured(self, realm): + """Check if realm is declared in any configuration source.""" + if self.__config.realm_configured(realm): + return True + for c in self.__extra_configs: + if c.realm_configured(realm): + return True + return False + + def __dns_discovery_allowed(self, realm): + """Check if DNS discovery is allowed for a realm.""" + return ( + self.__realm_configured(realm) + or self.__config.param(None, 'dns_realm_discovery') + ) and self.__config.param(realm, 'use_dns') + def lookup(self, realm, kpasswd=False): - for r in self.__resolvers: - servers = tuple(self.__unique(r.lookup(realm, kpasswd))) + servers = tuple(self.__unique(self.__config.lookup(realm, kpasswd))) + if servers: + return servers + + for c in self.__extra_configs: + servers = tuple(self.__unique(c.lookup(realm, kpasswd))) if servers: return servers + # The scope of realms we are allowed to use DNS discovery for depends + # on the configuration + if self.__dns_discovery_allowed(realm): + servers = tuple(self.__unique( + self.__dns_resolver.lookup(realm, kpasswd))) + return servers + return () diff --git a/kdcproxy/config/mit.py b/kdcproxy/config/mit.py index 1af4167..cd80f6b 100644 --- a/kdcproxy/config/mit.py +++ b/kdcproxy/config/mit.py @@ -232,19 +232,9 @@ class MITConfig(IConfig): def __init__(self, *args, **kwargs): self.__config = {} with KRB5Profile() as prof: - # Load DNS setting - self.__config["dns"] = prof.get_bool("libdefaults", - "dns_fallback", - default=True) - if "dns_lookup_kdc" in dict(prof.section("libdefaults")): - self.__config["dns"] = prof.get_bool("libdefaults", - "dns_lookup_kdc", - default=True) - # Load all configured realms - self.__config["realms"] = {} for realm, values in prof.section("realms"): - rconf = self.__config["realms"].setdefault(realm, {}) + rconf = self.__config.setdefault(realm, {}) for server, hostport in values: if server not in self.CONFIG_KEYS: continue @@ -261,7 +251,7 @@ def __init__(self, *args, **kwargs): rconf.setdefault(server, []).append(parsed.geturl()) def lookup(self, realm, kpasswd=False): - rconf = self.__config.get("realms", {}).get(realm, {}) + rconf = self.__config.get(realm, {}) if kpasswd: servers = list(rconf.get('kpasswd_server', [])) @@ -271,8 +261,13 @@ def lookup(self, realm, kpasswd=False): return tuple(servers) - def use_dns(self, default=True): - return self.__config["dns"] + def realm_configured(self, realm): + """Check if a realm is declared in the MIT krb5 configuration.""" + return realm in self.__config + + def param(self, realm, param): + """Always None. MIT krb5 config only provides server addresses.""" + return None if __name__ == "__main__": diff --git a/tests.py b/tests.py index 2a1ad6e..af64f82 100644 --- a/tests.py +++ b/tests.py @@ -19,9 +19,11 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. +import contextlib import os import socket import struct +import tempfile import unittest from base64 import b64decode try: @@ -298,11 +300,24 @@ def test_kpasswdreq(self): class KDCProxyConfigTests(unittest.TestCase): + @contextlib.contextmanager + def temp_config_file(self, content): + with tempfile.NamedTemporaryFile( + mode="w", delete=False, suffix=".conf" + ) as f: + f.write(content) + config_file = f.name + + try: + yield config_file + finally: + os.remove(config_file) + def test_mit_config(self): with mock.patch.dict('os.environ', {'KRB5_CONFIG': KRB5_CONFIG}): cfg = mit.MITConfig() - self.assertIs(cfg.use_dns(), False) + self.assertIs(cfg.param('KDCPROXY.TEST', 'use_dns'), None) self.assertEqual( cfg.lookup('KDCPROXY.TEST'), ( @@ -382,6 +397,832 @@ def test_dns_config(self, m_query): m_query.assert_any_call('_kpasswd._udp.KDCPROXY.TEST', RDTYPE_SRV) m_query.assert_any_call('_kerberos-adm._udp.KDCPROXY.TEST', RDTYPE_SRV) + def test_kdcproxy_config_realm_configured(self): + with self.temp_config_file( + """[REALM1.TEST] + kerberos = kerberos://kdc1.realm1.test:88 + [REALM2.TEST] + kpasswd = kpasswd://kpwd.realm2.test:464\n""" + ) as config_file: + cfg = config.KDCProxyConfig(filenames=[config_file]) + + # Test configured realms + self.assertTrue(cfg.realm_configured("REALM1.TEST")) + self.assertTrue(cfg.realm_configured("REALM2.TEST")) + + # Test unconfigured realm + self.assertFalse(cfg.realm_configured("UNKNOWN.TEST")) + + # Test that 'global' cannot be used as realm name + with self.assertRaises(ValueError): + cfg.realm_configured("global") + + def test_kdcproxy_config_param(self): + with self.temp_config_file( + """[global] + silence_port_warn = true + [REALM1.TEST] + use_dns = false + kerberos = kerberos://kdc1.realm1.test:88 + [REALM2.TEST] + kerberos = kerberos://kdc2.realm2.test:88""" + ) as config_file: + cfg = config.KDCProxyConfig(filenames=[config_file]) + + # Test realm-specific parameter overrides global + self.assertFalse(cfg.param("REALM1.TEST", "use_dns")) + + # Test fallback to global parameter + self.assertTrue(cfg.param("REALM1.TEST", "silence_port_warn")) + self.assertTrue(cfg.param("REALM2.TEST", "use_dns")) + self.assertTrue(cfg.param("REALM2.TEST", "silence_port_warn")) + + # Test invalid parameter + with self.assertRaises(ValueError): + cfg.param("REALM1.TEST", "invalid_param") + + # Test that 'global' cannot be used as realm name + with self.assertRaises(ValueError): + cfg.param("global", "use_dns") + + def test_kdcproxy_config_lookup(self): + with self.temp_config_file( + "[REALM.TEST]\n" + "kerberos = kerberos://kdc1.test:88 " + "kerberos://kdc2.test:88\n" + "kpasswd = kpasswd://kpwd.test:464" + ) as config_file: + cfg = config.KDCProxyConfig(filenames=[config_file]) + + # Test kerberos lookup + self.assertEqual( + cfg.lookup("REALM.TEST"), + ("kerberos://kdc1.test:88", "kerberos://kdc2.test:88"), + ) + + # Test kpasswd lookup + self.assertEqual( + cfg.lookup("REALM.TEST", kpasswd=True), + ("kpasswd://kpwd.test:464",), + ) + + # Test unconfigured realm + self.assertEqual(cfg.lookup("UNKNOWN.TEST"), ()) + + # Test that 'global' cannot be used as realm name + with self.assertRaises(ValueError): + cfg.lookup("global") + + @mock.patch("dns.resolver.query") + def test_dns_blocked_for_undeclared_realms(self, m_query): + with mock.patch.object(config.KDCProxyConfig, "default_filenames", []): + resolver = config.MetaResolver() + + # DNS should NOT be used for unconfigured realm + result = resolver.lookup("UNCONFIGURED.TEST") + self.assertEqual(result, ()) + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_use_dns_false_disables_dns_discovery(self, m_query): + # Test exact realm section + with self.temp_config_file( + """[global] + use_dns = false + [REALM.TEST] + ; Exact realm declared but no servers specified""" + ) as config_file: + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # DNS should NOT be used when use_dns is false for exact realm + result = resolver.lookup("REALM.TEST") + self.assertEqual(result, ()) + m_query.assert_not_called() + + # Test wildcard realm section + m_query.reset_mock() + with self.temp_config_file( + """[global] + use_dns = false + [*EXAMPLE.COM] + ; Wildcard realm declared but no servers specified""" + ) as config_file: + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # DNS should NOT be used when use_dns is false for wildcard + # realm + result = resolver.lookup("SUB.EXAMPLE.COM") + self.assertEqual(result, ()) + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_use_dns_true_enables_dns_for_declared_realms(self, m_query): + # Test exact realm section + with self.temp_config_file( + """[global] + use_dns = true + [REALM.TEST] + ; Exact realm declared but no servers specified""" + ) as config_file: + tcp_srv = [self.mksrv("0 0 88 kdc.realm.test.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # DNS SHOULD be used when exact realm is declared and use_dns + # is true + result = resolver.lookup("REALM.TEST") + self.assertEqual(result, ("kerberos://kdc.realm.test:88",)) + self.assertEqual(m_query.call_count, 2) + + # Test wildcard realm section + m_query.reset_mock() + with self.temp_config_file( + """[global] + use_dns = true + [*EXAMPLE.COM] + ; Wildcard realm declared but no servers specified""" + ) as config_file: + tcp_srv = [self.mksrv("0 0 88 kdc.sub.example.com.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # DNS SHOULD be used when wildcard realm matches and use_dns + # is true + result = resolver.lookup("SUB.EXAMPLE.COM") + self.assertEqual( + result, ("kerberos://kdc.sub.example.com:88",) + ) + self.assertEqual(m_query.call_count, 2) + + @mock.patch("logging.Logger.warning") + @mock.patch("dns.resolver.query") + def test_dns_discovery_warns_on_nonstandard_port( + self, m_query, m_log_warning + ): + # Test exact realm section + with self.temp_config_file( + """[REALM.TEST]""" + ) as config_file: + # DNS returns KDC on non-standard port + tcp_srv = [self.mksrv("0 0 1088 kdc.realm.test.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("REALM.TEST") + + # Should return the server + self.assertEqual(result, ("kerberos://kdc.realm.test:1088",)) + + # Should log warning about non-standard port for exact realm + m_log_warning.assert_called_once() + args = m_log_warning.call_args[0] + self.assertIn("non-standard port", args[0]) + self.assertEqual(args[5], 1088) # port + self.assertEqual(args[6], 88) # expected port + + # Test wildcard realm section + m_query.reset_mock() + m_log_warning.reset_mock() + with self.temp_config_file( + """[*EXAMPLE.COM]""" + ) as config_file: + # DNS returns KDC on non-standard port + tcp_srv = [self.mksrv("0 0 1088 kdc.sub.example.com.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("SUB.EXAMPLE.COM") + + # Should return the server + self.assertEqual( + result, ("kerberos://kdc.sub.example.com:1088",) + ) + + # Should log warning about non-standard port for wildcard realm + m_log_warning.assert_called_once() + args = m_log_warning.call_args[0] + self.assertIn("non-standard port", args[0]) + self.assertEqual(args[5], 1088) # port + self.assertEqual(args[6], 88) # expected port + + @mock.patch("logging.Logger.warning") + @mock.patch("dns.resolver.query") + def test_silence_port_warn_suppresses_nonstandard_port_warnings( + self, m_query, m_log_warning + ): + # Test exact realm section + with self.temp_config_file( + """[REALM.TEST] + silence_port_warn = true""" + ) as config_file: + # DNS returns KDC on non-standard port + tcp_srv = [self.mksrv("0 0 1088 kdc.realm.test.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("REALM.TEST") + + # Should return the server + self.assertEqual(result, ("kerberos://kdc.realm.test:1088",)) + + # Should NOT log warning when silenced for exact realm + m_log_warning.assert_not_called() + + # Test wildcard realm section + m_query.reset_mock() + m_log_warning.reset_mock() + with self.temp_config_file( + """[*EXAMPLE.COM] + silence_port_warn = true""" + ) as config_file: + # DNS returns KDC on non-standard port + tcp_srv = [self.mksrv("0 0 1088 kdc.sub.example.com.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("SUB.EXAMPLE.COM") + + # Should return the server + self.assertEqual( + result, ("kerberos://kdc.sub.example.com:1088",) + ) + + # Should NOT log warning when silenced for wildcard realm + m_log_warning.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_configured_servers_preferred_over_dns_discovery(self, m_query): + # Create a config with servers configured + with self.temp_config_file( + """[REALM.TEST] + kerberos = kerberos://configured-kdc.test:88""" + ) as config_file: + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("REALM.TEST") + + # Should return configured server, not DNS + self.assertEqual( + result, ("kerberos://configured-kdc.test:88",) + ) + + # DNS should not be queried when servers are configured + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_mit_realm_prefers_configured_servers_over_dns(self, m_query): + # Test that realm in MIT config uses configured servers even when + # use_dns = true + with self.temp_config_file( + """[global] + use_dns = true + configs = mit""" + ) as config_file: + with mock.patch.dict( + "os.environ", {"KRB5_CONFIG": KRB5_CONFIG} + ), mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("KDCPROXY.TEST") + + # Should return MIT-configured servers (from tests.krb5.conf) + self.assertEqual( + result, + ( + "kerberos://k1.kdcproxy.test.:88", + "kerberos://k2.kdcproxy.test.:1088", + ), + ) + + # DNS should NOT be queried when servers are in MIT config + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_mit_realm_uses_configured_servers_when_use_dns_false( + self, m_query + ): + # Test that realm in MIT config uses configured servers when + # use_dns = false + with self.temp_config_file( + """[global] + use_dns = false + configs = mit""" + ) as config_file: + with mock.patch.dict( + "os.environ", {"KRB5_CONFIG": KRB5_CONFIG} + ), mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("KDCPROXY.TEST") + + # Should return MIT-configured servers + self.assertEqual( + result, + ( + "kerberos://k1.kdcproxy.test.:88", + "kerberos://k2.kdcproxy.test.:1088", + ), + ) + + # DNS should NOT be queried + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_mit_kpasswd_prefers_configured_servers_over_dns(self, m_query): + # Test that kpasswd servers from MIT config are used even when + # use_dns = true + with self.temp_config_file( + """[global] + use_dns = true + configs = mit""" + ) as config_file: + with mock.patch.dict( + "os.environ", {"KRB5_CONFIG": KRB5_CONFIG} + ), mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("KDCPROXY.TEST", kpasswd=True) + + # Should return MIT-configured kpasswd servers + self.assertEqual( + result, + ( + "kpasswd://adm.kdcproxy.test.:1749", + "kpasswd://adm.kdcproxy.test.", + ), + ) + + # DNS should NOT be queried + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_kdcproxy_declared_realm_uses_dns_when_no_servers(self, m_query): + # Test that a realm in kdcproxy.conf (but not MIT) will use DNS when no + # servers are configured + with self.temp_config_file( + """[global] + configs = mit + [REALM.TEST] + ; Realm section exists but no servers configured""" + ) as config_file: + tcp_srv = [self.mksrv("0 0 88 kdc.realm.test.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.dict( + "os.environ", {"KRB5_CONFIG": KRB5_CONFIG} + ), mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("REALM.TEST") + + # Should use DNS since realm is in config but has no servers + self.assertEqual(result, ("kerberos://kdc.realm.test:88",)) + self.assertEqual(m_query.call_count, 2) + + @mock.patch("dns.resolver.query") + def test_realm_specific_use_dns_overrides_global(self, m_query): + # Test that realm-specific use_dns overrides global setting for a realm + # that's in MIT config + with self.temp_config_file( + """[global] + use_dns = true + configs = mit + [KDCPROXY.TEST] + use_dns = false""" + ) as config_file: + with mock.patch.dict( + "os.environ", {"KRB5_CONFIG": KRB5_CONFIG} + ), mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # First check: should return MIT servers + result = resolver.lookup("KDCPROXY.TEST") + self.assertEqual( + result, + ( + "kerberos://k1.kdcproxy.test.:88", + "kerberos://k2.kdcproxy.test.:1088", + ), + ) + + # DNS should not be queried due to realm override + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_kdcproxy_servers_override_mit_servers(self, m_query): + # Test that servers configured in kdcproxy.conf take precedence over + # MIT config servers + with self.temp_config_file( + """[global] + configs = mit + [KDCPROXY.TEST] + kerberos = kerberos://override.test:88""" + ) as config_file: + with mock.patch.dict( + "os.environ", {"KRB5_CONFIG": KRB5_CONFIG} + ), mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("KDCPROXY.TEST") + + # Should return kdcproxy.conf servers, not MIT servers + self.assertEqual(result, ("kerberos://override.test:88",)) + + # DNS should not be queried + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_undeclared_realm_blocks_dns_despite_use_dns_true(self, m_query): + # Test that a realm NOT in MIT and NOT in kdcproxy.conf will NOT use + # DNS even with use_dns = true (security restriction) + with self.temp_config_file( + """[global] + use_dns = true + configs = mit""" + ) as config_file: + with mock.patch.dict( + "os.environ", {"KRB5_CONFIG": KRB5_CONFIG} + ), mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("UNCONFIGURED.REALM") + + # Should return empty - no DNS lookup + self.assertEqual(result, ()) + + # DNS should NOT be queried for unconfigured realm + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_mit_declared_realm_without_servers_uses_dns(self, m_query): + # Test that a realm in MIT config but WITHOUT KDC servers configured + # will use DNS + + # Create a krb5.conf with a realm section but no kdc entries + with tempfile.NamedTemporaryFile( + mode="w", delete=False, suffix=".conf" + ) as krb5_file: + krb5_file.write( + """[libdefaults] + default_realm = EMPTY.REALM + + [realms] + EMPTY.REALM = { + default_domain = empty.realm + }""" + ) + krb5_conf = krb5_file.name + + # Create kdcproxy.conf + with self.temp_config_file( + """[global] + configs = mit""" + ) as config_file: + tcp_srv = [self.mksrv("0 0 88 kdc.empty.realm.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.dict( + "os.environ", {"KRB5_CONFIG": krb5_conf} + ), mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + result = resolver.lookup("EMPTY.REALM") + + # Should use DNS because: + # 1. Realm is in MIT config (realm_configured returns True) + # 2. No servers configured in MIT config + # 3. use_dns enabled globally by default + self.assertEqual(result, ("kerberos://kdc.empty.realm:88",)) + + # DNS SHOULD be queried + self.assertEqual(m_query.call_count, 2) + m_query.assert_any_call( + "_kerberos._tcp.EMPTY.REALM", RDTYPE_SRV + ) + m_query.assert_any_call( + "_kerberos._udp.EMPTY.REALM", RDTYPE_SRV + ) + os.remove(krb5_conf) + + def test_kdcproxy_config_realm_wildcard_matching(self): + # Test realm matching with wildcard patterns + with self.temp_config_file( + """[global] + use_dns = false + [SPECIFIC.SUB.EXAMPLE.COM] + kerberos = kerberos://specific.example.com:88 + [*SUB.EXAMPLE.COM] + use_dns = true + [*EXAMPLE.COM] + silence_port_warn = true""" + ) as config_file: + cfg = config.KDCProxyConfig(filenames=[config_file]) + + # Test exact match + self.assertTrue(cfg.realm_configured("SPECIFIC.SUB.EXAMPLE.COM")) + self.assertEqual( + cfg.lookup("SPECIFIC.SUB.EXAMPLE.COM"), + ("kerberos://specific.example.com:88",), + ) + + # Test wildcard matching for *SUB.EXAMPLE.COM + self.assertTrue(cfg.realm_configured("OTHER.SUB.EXAMPLE.COM")) + # Wildcard sections don't support kerberos/kpasswd params + self.assertEqual(cfg.lookup("OTHER.SUB.EXAMPLE.COM"), ()) + + # Test wildcard matching for *EXAMPLE.COM + self.assertTrue(cfg.realm_configured("FOO.EXAMPLE.COM")) + self.assertEqual(cfg.lookup("FOO.EXAMPLE.COM"), ()) + + # Test wildcard matches exact realm name (EXAMPLE.COM matches + # *EXAMPLE.COM) + self.assertTrue(cfg.realm_configured("EXAMPLE.COM")) + self.assertTrue(cfg.param("EXAMPLE.COM", "silence_port_warn")) + + # Test multi-level subdomain matches wildcard + self.assertTrue(cfg.realm_configured("A.B.C.EXAMPLE.COM")) + + # Test non-matching realm (MYEXAMPLE.COM should NOT match + # *EXAMPLE.COM) + self.assertFalse(cfg.realm_configured("MYEXAMPLE.COM")) + self.assertEqual(cfg.lookup("MYEXAMPLE.COM"), ()) + + # Test other non-matching realm + self.assertFalse(cfg.realm_configured("OTHER.DOMAIN")) + self.assertEqual(cfg.lookup("OTHER.DOMAIN"), ()) + + def test_kdcproxy_config_param_wildcard_matching(self): + # Test parameter lookup with wildcard patterns + with self.temp_config_file( + """[global] + use_dns = false + silence_port_warn = false + [*EXAMPLE.COM] + use_dns = true + silence_port_warn = true + [SPECIFIC.EXAMPLE.COM] + silence_port_warn = false""" + ) as config_file: + cfg = config.KDCProxyConfig(filenames=[config_file]) + + # Test exact match takes precedence for parameters + self.assertTrue(cfg.param("SPECIFIC.EXAMPLE.COM", "use_dns")) + self.assertFalse( + cfg.param("SPECIFIC.EXAMPLE.COM", "silence_port_warn") + ) + + # Test wildcard parameter matching + self.assertTrue(cfg.param("OTHER.EXAMPLE.COM", "use_dns")) + self.assertTrue( + cfg.param("OTHER.EXAMPLE.COM", "silence_port_warn") + ) + + # Test fallback to global when no wildcard match + self.assertFalse(cfg.param("OTHER.DOMAIN", "use_dns")) + self.assertFalse(cfg.param("OTHER.DOMAIN", "silence_port_warn")) + + def test_wildcard_specificity_determines_priority(self): + # Test that more specific wildcards take precedence + with self.temp_config_file( + """[global] + use_dns = false + [*EXAMPLE.COM] + silence_port_warn = true + [*SUB.EXAMPLE.COM] + use_dns = true""" + ) as config_file: + cfg = config.KDCProxyConfig(filenames=[config_file]) + + # More specific wildcard (*SUB.EXAMPLE.COM) should match first + self.assertTrue(cfg.param("FOO.SUB.EXAMPLE.COM", "use_dns")) + # Should also get parameter from broader wildcard + self.assertTrue( + cfg.param("FOO.SUB.EXAMPLE.COM", "silence_port_warn") + ) + + # Broader wildcard should match other subdomains + self.assertTrue( + cfg.param("FOO.OTHER.EXAMPLE.COM", "silence_port_warn") + ) + # Should fallback to global for use_dns + self.assertFalse(cfg.param("FOO.OTHER.EXAMPLE.COM", "use_dns")) + + @mock.patch("dns.resolver.query") + def test_kdcproxy_config_exact_realm_priority_over_wildcard(self, m_query): + # Test that exact realm sections take precedence over wildcard sections + with self.temp_config_file( + """[global] + use_dns = false + silence_port_warn = false + [*EXAMPLE.COM] + use_dns = true + silence_port_warn = true + [SPECIFIC.EXAMPLE.COM] + kerberos = kerberos://specific-kdc.example.com:88 + use_dns = false""" + ) as config_file: + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # Exact realm section should take priority + self.assertTrue( + resolver._MetaResolver__config.realm_configured( + "SPECIFIC.EXAMPLE.COM" + ) + ) + + # Should get kerberos from exact realm section + result = resolver.lookup("SPECIFIC.EXAMPLE.COM") + self.assertEqual( + result, + ("kerberos://specific-kdc.example.com:88",), + ) + + # DNS should NOT be called because: + # 1. Exact realm has configured servers + # 2. Exact realm has use_dns=false (takes priority over + # wildcard) + m_query.assert_not_called() + + # Verify exact realm's use_dns=false takes priority + self.assertFalse( + resolver._MetaResolver__config.param( + "SPECIFIC.EXAMPLE.COM", "use_dns" + ) + ) + + # Should get silence_port_warn from wildcard since not in exact + # section + self.assertTrue( + resolver._MetaResolver__config.param( + "SPECIFIC.EXAMPLE.COM", "silence_port_warn" + ) + ) + + def test_dns_realm_discovery_param_defaults_false(self): + # Test the dns_realm_discovery global parameter + with self.temp_config_file( + """[global] + dns_realm_discovery = true""" + ) as config_file: + cfg = config.KDCProxyConfig(filenames=[config_file]) + + # Test that dns_realm_discovery can be read + self.assertTrue(cfg.param(None, "dns_realm_discovery")) + + # Test default value when not specified + cfg2 = config.KDCProxyConfig(filenames=[]) + self.assertFalse(cfg2.param(None, "dns_realm_discovery")) + + @mock.patch("dns.resolver.query") + def test_dns_realm_discovery_true_allows_undeclared_realms(self, m_query): + # Test that dns_realm_discovery allows DNS for unconfigured realms + with self.temp_config_file( + """[global] + dns_realm_discovery = true""" + ) as config_file: + tcp_srv = [self.mksrv("0 0 88 kdc.unconfigured.test.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # DNS SHOULD be used for unconfigured realm when + # dns_realm_discovery = true + result = resolver.lookup("UNCONFIGURED.TEST") + self.assertEqual( + result, ("kerberos://kdc.unconfigured.test:88",) + ) + self.assertEqual(m_query.call_count, 2) + + @mock.patch("dns.resolver.query") + def test_dns_realm_discovery_false_blocks_undeclared_realms(self, m_query): + # Test that dns_realm_discovery=false restricts DNS to configured + # realms + with self.temp_config_file( + """[global] + dns_realm_discovery = false""" + ) as config_file: + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # DNS should NOT be used for unconfigured realm when + # dns_realm_discovery = false + result = resolver.lookup("UNCONFIGURED.TEST") + self.assertEqual(result, ()) + m_query.assert_not_called() + + @mock.patch("dns.resolver.query") + def test_wildcard_realm_uses_dns_despite_dns_realm_discovery_false( + self, m_query + ): + # Test that wildcard-matched realms can use DNS discovery + with self.temp_config_file( + """[global] + dns_realm_discovery = false + [*EXAMPLE.COM]""" + ) as config_file: + tcp_srv = [self.mksrv("0 0 88 kdc.sub.example.com.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # DNS SHOULD be used for wildcard-matched realm even when + # dns_realm_discovery = false + result = resolver.lookup("SUB.EXAMPLE.COM") + self.assertEqual( + result, ("kerberos://kdc.sub.example.com:88",) + ) + self.assertEqual(m_query.call_count, 2) + + @mock.patch("dns.resolver.query") + def test_use_dns_defaults_to_true(self, m_query): + # Test that use_dns defaults to true when not set + with self.temp_config_file( + """[REALM.TEST] + ; Realm declared but use_dns not specified""" + ) as config_file: + tcp_srv = [self.mksrv("0 0 88 kdc.realm.test.")] + udp_srv = [] + m_query.side_effect = [tcp_srv, udp_srv] + + with mock.patch.object( + config.KDCProxyConfig, "default_filenames", [config_file] + ): + resolver = config.MetaResolver() + + # DNS SHOULD be used when use_dns is not set (defaults to true) + result = resolver.lookup("REALM.TEST") + self.assertEqual(result, ("kerberos://kdc.realm.test:88",)) + self.assertEqual(m_query.call_count, 2) + + @mock.patch("dns.resolver.query") + def test_dns_realm_discovery_defaults_to_false(self, m_query): + # Test that dns_realm_discovery defaults to false for security + with mock.patch.object(config.KDCProxyConfig, "default_filenames", []): + resolver = config.MetaResolver() + + # DNS should NOT be used for unconfigured realm by default + result = resolver.lookup("UNCONFIGURED.TEST") + self.assertEqual(result, ()) + m_query.assert_not_called() + if __name__ == "__main__": unittest.main() From e91bdca8fd17127979bdcc16f4b07c93c8f84f2a Mon Sep 17 00:00:00 2001 From: Julien Rische Date: Mon, 6 Oct 2025 12:55:30 +0200 Subject: [PATCH 3/3] Update setup.py and tox.ini Signed-off-by: Julien Rische --- setup.py | 1 - tox.ini | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 8829c67..8c2f849 100644 --- a/setup.py +++ b/setup.py @@ -64,7 +64,6 @@ def read(fname): "Development Status :: 5 - Production/Stable", "Environment :: Web Environment", "Intended Audience :: System Administrators", - "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.8", diff --git a/tox.ini b/tox.ini index 9a640f1..1eb9fe2 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] minversion = 2.3.1 -envlist = py38,py39,py310,py311,pep8,py3pep8,doc +envlist = py38,py39,py310,py311,py312,py314,pep8,py3pep8,doc skip_missing_interpreters = true [testenv] @@ -29,6 +29,7 @@ deps = docutils markdown rst2html + setuptools basepython = python3 commands = doc8 --allow-long-titles README @@ -45,4 +46,4 @@ show-source = true max-line-length = 79 application-import-names = kdcproxy # N815 is camelCase names; N813 is for changing case on import; N818 Exception name -ignore = N815, N813, N818 +extend-ignore = N815, N813, N818