From ba195437b3d87e450f8838092d5a2ff25690cd0e Mon Sep 17 00:00:00 2001 From: azoxlpf <213314124+azoxlpf@users.noreply.github.com> Date: Thu, 30 Apr 2026 10:55:19 +0200 Subject: [PATCH] refactor change-password --- nxc/modules/change-password.py | 321 ++++++++++++++++++++++----------- 1 file changed, 213 insertions(+), 108 deletions(-) diff --git a/nxc/modules/change-password.py b/nxc/modules/change-password.py index 20158e00a6..6b60e34ff6 100644 --- a/nxc/modules/change-password.py +++ b/nxc/modules/change-password.py @@ -1,38 +1,51 @@ +import contextlib import sys + from impacket.dcerpc.v5 import samr, epm, transport -from impacket.dcerpc.v5.rpcrt import DCERPCException +from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE +from impacket.ldap.ldap import LDAPSessionError, MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE +from nxc.parsers.ldap_results import parse_result_attributes from nxc.helpers.misc import CATEGORY class NXCModule: """ - Module for changing or resetting user passwords - Module by Fagan Afandiyev, termanix and NeffIsBack + Module for changing or resetting user passwords. + Module by Fagan Afandiyev, termanix and NeffIsBack. + Refactored by azoxlpf to support SMB (SAMR) and LDAP backends, including Kerberos auth. """ name = "change-password" - description = "Change or reset user passwords via various protocols" - supported_protocols = ["smb"] + description = "Change or reset user passwords via SAMR (SMB) or LDAP" + supported_protocols = ["smb", "ldap"] category = CATEGORY.PRIVILEGE_ESCALATION + MUST_CHANGE_STATUSES = ( + "STATUS_PASSWORD_MUST_CHANGE", + "STATUS_PASSWORD_EXPIRED", + "STATUS_NOLOGON_WORKSTATION_TRUST_ACCOUNT", + ) + def options(self, context, module_options): """ Required (one of): NEWPASS The new password of the user. - NEWNTHASH The new NT hash of the user. + NEWNTHASH The new NT hash of the user (SMB protocol only). Optional: USER The user account if the target is not the current user. Examples -------- - If STATUS_PASSWORD_MUST_CHANGE, STATUS_PASSWORD_EXPIRED or STATUS_NOLOGON_WORKSTATION_TRUST_ACCOUNT (Change password for current user) - netexec smb -u username -p oldpass -M change-password -o NEWNTHASH='nthash' - netexec smb -u username -H oldnthash -M change-password -o NEWPASS='newpass' + Self password/hash change (e.g. STATUS_PASSWORD_MUST_CHANGE / STATUS_PASSWORD_EXPIRED): + nxc smb -u username -p oldpass -M change-password -o NEWPASS='newpass' + nxc smb -u username -H oldnthash -M change-password -o NEWNTHASH='newnthash' + nxc ldap -u username -p oldpass -M change-password -o NEWPASS='newpass' - If want to change other user's password (with forcechangepassword priv or admin rights) - netexec smb -u username -p password -M change-password -o USER='target_user' NEWPASS='target_user_newpass' - netexec smb -u username -p password -M change-password -o USER='target_user' NEWNTHASH='target_user_newnthash' + Reset another user's password (ForceChangePassword right or admin): + nxc smb -u admin -p password -M change-password -o USER='target' NEWPASS='newpass' + nxc smb -u admin -p password -M change-password -o USER='target' NEWNTHASH='newnthash' + nxc ldap -u admin -p password -M change-password -o USER='target' NEWPASS='newpass' """ self.newpass = module_options.get("NEWPASS") self.newhash = module_options.get("NEWNTHASH") @@ -42,117 +55,209 @@ def options(self, context, module_options): context.log.fail("Either NEWPASS or NEWNTHASH is required!") sys.exit(1) - def authenticate(self, context, connection, protocol, anonymous=False): - # Authenticate to the target using DCE/RPC with either user credentials or a null session. Establishes a connection and binds to the SAMR service. - try: - # Map to the SAMR endpoint on the target - string_binding = epm.hept_map(connection.host, samr.MSRPC_UUID_SAMR, protocol=protocol) - rpctransport = transport.DCERPCTransportFactory(string_binding) - rpctransport.setRemoteHost(connection.host) - - if anonymous: - rpctransport.set_credentials("", "", "", "", "", "") - rpctransport.set_kerberos(False, None) - context.log.info("Connecting with null session credentials.") - else: - rpctransport.set_credentials( - connection.username, - connection.password, - connection.domain, - connection.lmhash, - connection.nthash, - aesKey=connection.aesKey, - ) - context.log.info(f"Connecting as {connection.domain}\\{connection.username}") - - # Connect to the DCE/RPC endpoint and bind to the SAMR service - dce = rpctransport.get_dce_rpc() - dce.connect() - context.log.info("[+] Successfully connected to DCE/RPC") - dce.bind(samr.MSRPC_UUID_SAMR) - context.log.info("[+] Successfully bound to SAMR") - return dce - except DCERPCException as e: - context.log.fail(f"DCE/RPC Exception: {e!s}") - raise - def on_login(self, context, connection): self.context = context - target_username = self.target_user or connection.username - target_domain = connection.domain - - # Grab all creds from the connection to use for authentication - self.oldpass = connection.password - self.oldhash = connection.nthash + self.connection = connection + self.target_username = self.target_user or connection.username + self.target_domain = connection.domain + self.is_self_change = self.target_username.lower() == connection.username.lower() - new_lmhash, new_nthash = "", "" - - # Parse new hash values if provided if self.newhash: try: - new_lmhash, new_nthash = self.newhash.split(":") + _, self.new_nthash = self.newhash.split(":") except ValueError: - new_nthash = self.newhash + self.new_nthash = self.newhash + else: + self.new_nthash = "" + + if connection.args.protocol == "smb": + self.do_samr() + elif connection.args.protocol == "ldap": + self.do_ldap() + def db_remove_credential(self): try: - self.dce = self.authenticate(context, connection, protocol="ncacn_np", anonymous=False) + db = self.context.db + domain = self.connection.domain + rows = db.get_user(domain, self.computer_name) if hasattr(db, "get_user") else db.get_credentials(filter_term=self.computer_name) + db.remove_credentials([row[0] for row in rows]) except Exception as e: - # Handle specific errors like password expiration or must be change - if "STATUS_PASSWORD_MUST_CHANGE" in str(e) or "STATUS_PASSWORD_EXPIRED" in str(e) or "STATUS_NOLOGON_WORKSTATION_TRUST_ACCOUNT" in str(e): - context.log.warning("Password must be changed. Trying with null session.") - self.dce = self.authenticate(context, connection, protocol="ncacn_ip_tcp", anonymous=True) - elif "STATUS_LOGON_FAILURE" in str(e): - context.log.fail("Authentication failure: wrong credentials.") - return False - else: - raise + self.context.log.debug(f"Could not remove credentials from DB: {e}") + def db_add_credential(self): + if self.new_nthash: + self.context.db.add_credential("hash", self.target_domain, self.target_username, self.new_nthash) + else: + self.context.db.add_credential("plaintext", self.target_domain, self.target_username, self.newpass) + + def do_samr(self): + dce = self.samr_connect() + if dce is None: + return try: - # Perform the SMB SAMR password change - self._smb_samr_change(context, connection, target_username, target_domain, self.oldhash, self.newpass, new_nthash) + self.samr_execute(dce) + finally: + with contextlib.suppress(Exception): + dce.disconnect() + + def samr_connect(self): + try: + return self.samr_bind() + except Exception as e: + err = str(e) - # Remove user if exists to avoid outdated credentials when we update plaintext password, but hash exists (or vice versa) - user = self.context.db.get_user(target_domain, target_username) - user_ids = [row[0] for row in user] - self.context.db.remove_credentials(user_ids) + if "STATUS_LOGON_FAILURE" in err: + self.context.log.fail("Authentication failure: wrong credentials.") + return None - # Store the new credentials in the database - if new_nthash: - self.context.db.add_credential("hash", target_domain, target_username, new_nthash) + must_change = not self.connection.kerberos and any(s in err for s in self.MUST_CHANGE_STATUSES) + if not must_change: + self.context.log.fail(f"Failed to connect to SAMR: {e}") + return None + + self.context.log.warning("Password must be changed. Falling back to null session over ncacn_ip_tcp.") + try: + return self.samr_bind(dce_protocol="ncacn_ip_tcp", anonymous=True) + except Exception as e2: + self.context.log.fail(f"Failed to bind to SAMR with null session: {e2}") + return None + + def samr_bind(self, dce_protocol="ncacn_np", anonymous=False): + target = self.connection.host if not self.connection.kerberos else f"{self.connection.hostname}.{self.connection.domain}" + string_binding = epm.hept_map(target, samr.MSRPC_UUID_SAMR, protocol=dce_protocol) + rpc_transport = transport.DCERPCTransportFactory(string_binding) + rpc_transport.setRemoteHost(target) + + if anonymous: + rpc_transport.set_credentials("", "", "", "", "", "") + rpc_transport.set_kerberos(False, None) + self.context.log.info("Connecting with null session credentials.") + else: + rpc_transport.set_credentials( + self.connection.username, + self.connection.password, + self.connection.domain, + self.connection.lmhash, + self.connection.nthash, + aesKey=self.connection.aesKey, + ) + rpc_transport.set_kerberos(self.connection.kerberos, kdcHost=self.connection.kdcHost) + self.context.log.info(f"Connecting as {self.connection.domain}\\{self.connection.username} (kerberos={self.connection.kerberos})") + + dce = rpc_transport.get_dce_rpc() + if not anonymous and self.connection.kerberos: + dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE) + dce.connect() + dce.bind(samr.MSRPC_UUID_SAMR) + return dce + + def samr_execute(self, dce): + if self.is_self_change and not self.connection.password and not self.connection.nthash: + self.context.log.fail("Self-change over SAMR requires the current password (-p) or NT hash (-H).") + return + + try: + if not self.is_self_change: + user_handle = self.samr_open_user(dce, self.target_username) + samr.hSamrSetNTInternal1(dce, user_handle, self.newpass, self.new_nthash) + self.context.log.success(f"Successfully reset password for '{self.target_username}'") + elif self.newpass: + samr.hSamrUnicodeChangePasswordUser2(dce, "\x00", self.target_username, self.connection.password, self.newpass, "", self.connection.nthash) + self.context.log.success(f"Successfully changed password for '{self.target_username}'") else: - self.context.db.add_credential("plaintext", target_domain, target_username, self.newpass) + user_handle = self.samr_open_user(dce, self.target_username) + samr.hSamrChangePasswordUser(dce, user_handle, self.connection.password, "", self.connection.nthash, "aad3b435b51404eeaad3b435b51404ee", self.new_nthash) + self.context.log.success(f"Successfully changed password hash for '{self.target_username}'") + self.context.log.highlight("Note: password must be changed at next logon.") + + self.db_remove_credential() + self.db_add_credential() except Exception as e: - if "STATUS_ACCESS_DENIED" in str(e): - self.context.log.fail(f"STATUS_ACCESS_DENIED while changing password for user: {target_username}") - elif "STATUS_NONE_MAPPED" in str(e): - self.context.log.fail(f"User '{target_username}' not found or not resolvable") + err = str(e) + if "STATUS_ACCESS_DENIED" in err: + action = "change" if self.is_self_change else "reset" + self.context.log.fail(f"{self.connection.username} does not have the right to {action} password for '{self.target_username}'") + elif "STATUS_NONE_MAPPED" in err: + self.context.log.fail(f"User '{self.target_username}' not found or not resolvable") + elif "STATUS_PASSWORD_RESTRICTION" in err: + self.context.log.fail(f"Password does not meet the domain policy for '{self.target_username}'") + elif "STATUS_WRONG_PASSWORD" in err: + self.context.log.fail("Wrong current password or NT hash provided for self-change") else: - context.log.fail(f"SMB-SAMR password change failed: {e}") - finally: - self.dce.disconnect() - - def _smb_samr_change(self, context, connection, target_username, target_domain, oldHash, newPassword, newHash): - # Reset the password for a different user - if target_username != connection.username: - user_handle = self._hSamrOpenUser(connection, target_username) - samr.hSamrSetNTInternal1(self.dce, user_handle, newPassword, newHash) - context.log.success(f"Successfully changed password for {target_username}") + self.context.log.fail(f"SMB-SAMR password change failed: {e}") + + def samr_open_user(self, dce, username): + server_handle = samr.hSamrConnect(dce, self.connection.host + "\x00")["ServerHandle"] + domain_sid = samr.hSamrLookupDomainInSamServer(dce, server_handle, self.connection.domain)["DomainId"] + domain_handle = samr.hSamrOpenDomain(dce, server_handle, domainId=domain_sid)["DomainHandle"] + user_rid = samr.hSamrLookupNamesInDomain(dce, domain_handle, (username,))["RelativeIds"]["Element"][0] + return samr.hSamrOpenUser(dce, domain_handle, userId=user_rid)["UserHandle"] + + def do_ldap(self): + if self.newhash: + self.context.log.fail("Cannot set NEWNTHASH over LDAP. Use the smb protocol instead.") + return + if self.is_self_change and not self.connection.password: + self.context.log.fail("Cannot perform self password change over LDAP without plaintext (-p). Use the smb protocol instead.") + return + + user_dn = self.ldap_find_user_dn(self.target_username) + if user_dn is None: + return + + if self.is_self_change: + self.ldap_self_change(user_dn) else: - # Change password for the current user - if newPassword: - # Change the password with new password - samr.hSamrUnicodeChangePasswordUser2(self.dce, "\x00", target_username, self.oldpass, newPassword, "", oldHash) + self.ldap_admin_reset(user_dn) + + def ldap_find_user_dn(self, username): + try: + resp = self.connection.search( + searchFilter=f"(sAMAccountName={username})", + attributes=["distinguishedName"], + ) + parsed = parse_result_attributes(resp) + if not parsed: + self.context.log.fail(f"User '{username}' not found in LDAP") + return None + return parsed[0]["distinguishedName"] + except Exception as e: + self.context.log.fail(f"LDAP search failed for '{username}': {e}") + return None + + def ldap_self_change(self, dn): + old_encoded = f'"{self.connection.password}"'.encode("utf-16-le") + new_encoded = f'"{self.newpass}"'.encode("utf-16-le") + try: + self.connection.ldap_connection.modify(dn, {"unicodePwd": [(MODIFY_DELETE, [old_encoded]), (MODIFY_ADD, [new_encoded]),]},) + self.context.log.success(f"Successfully changed password for '{self.target_username}'") + self.db_remove_credential() + self.db_add_credential() + except LDAPSessionError as e: + self.handle_ldap_error(e, action="change") + + def ldap_admin_reset(self, dn): + new_encoded = f'"{self.newpass}"'.encode("utf-16-le") + try: + self.connection.ldap_connection.modify(dn, {"unicodePwd": [(MODIFY_REPLACE, [new_encoded])]}) + self.context.log.success(f"Successfully reset password for '{self.target_username}'") + self.db_remove_credential() + self.db_add_credential() + except LDAPSessionError as e: + self.handle_ldap_error(e, action="reset") + + def handle_ldap_error(self, exc, action): + err = str(exc) + if "noSuchObject" in err: + self.context.log.fail(f"User '{self.target_username}' was not found") + elif "insufficientAccessRights" in err: + self.context.log.fail(f"Insufficient rights to {action} password for '{self.target_username}'") + elif "unwillingToPerform" in err: + if action == "change": + self.context.log.fail(f"Server unwilling to perform: wrong current password or policy violation for '{self.target_username}'") else: - # Change the password with new hash - user_handle = self._hSamrOpenUser(connection, target_username) - samr.hSamrChangePasswordUser(self.dce, user_handle, self.oldpass, "", oldHash, "aad3b435b51404eeaad3b435b51404ee", newHash) - context.log.highlight("Note: Target user must change password at next logon.") - context.log.success(f"Successfully changed password for {target_username}") - - def _hSamrOpenUser(self, connection, username): - """Connect to the target server and retrieve the user handle""" - server_handle = samr.hSamrConnect(self.dce, connection.host + "\x00")["ServerHandle"] - domain_sid = samr.hSamrLookupDomainInSamServer(self.dce, server_handle, connection.domain)["DomainId"] - domain_handle = samr.hSamrOpenDomain(self.dce, server_handle, domainId=domain_sid)["DomainHandle"] - user_rid = samr.hSamrLookupNamesInDomain(self.dce, domain_handle, (username,))["RelativeIds"]["Element"][0] - return samr.hSamrOpenUser(self.dce, domain_handle, userId=user_rid)["UserHandle"] + self.context.log.fail(f"Server unwilling to perform password reset for '{self.target_username}'") + elif "constraintViolation" in err: + self.context.log.fail(f"Constraint violation for '{self.target_username}': new password does not meet the domain policy") + else: + self.context.log.fail(f"Failed to {action} password for '{self.target_username}': {exc}")