diff --git a/nxc/helpers/ldap.py b/nxc/helpers/ldap.py new file mode 100644 index 0000000000..e2f99cc5a7 --- /dev/null +++ b/nxc/helpers/ldap.py @@ -0,0 +1,79 @@ + +from datetime import datetime, timezone +from impacket.ldap import ldap as ldap_impacket +from impacket.ldap import ldapasn1 as ldapasn1_impacket + + +def parse_ldap_timestamp(ts): + """Convert LDAP generalized time (e.g., '20240603160917.0Z') + to a human-readable string + """ + if not ts: + return "" + try: + cleaned = ts.replace(".0Z", "").replace("Z", "") + dt = datetime.strptime(cleaned, "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc) + return dt.strftime("%Y-%m-%d %H:%M:%S UTC") + except (ValueError, TypeError): + return ts + + +def query_ldap_gpos(host, domain, username, password, lmhash, nthash, aes_key, kdc_host, kerberos, logger): + """Query LDAP for GPO metadata + Returns dict of {GUID: {displayName, ...}} or None on failure + + This function is standalone so it can be reused by other modules + (e.g., a future list_gpos LDAP module) + """ + base_dn = ",".join(f"DC={part}" for part in domain.split(".")) + search_filter = "(objectClass=groupPolicyContainer)" + attributes = ["cn", "displayName", "gPCFileSysPath", "versionNumber", "whenCreated", "whenChanged"] + + try: + ldap_url = f"ldap://{host}" + logger.debug(f"Connecting to LDAP at {ldap_url} for GPO metadata") + ldap_connection = ldap_impacket.LDAPConnection(url=ldap_url, baseDN=base_dn, dstIp=host) + + if kerberos: + ldap_connection.kerberosLogin(username, password, domain, lmhash, nthash, aes_key, kdcHost=kdc_host) + else: + ldap_connection.login(username, password, domain, lmhash, nthash) + + search_base = f"CN=Policies,CN=System,{base_dn}" + logger.debug(f"Searching LDAP: base={search_base}, filter={search_filter}") + resp = ldap_connection.search( + searchFilter=search_filter, + attributes=attributes, + searchBase=search_base, + sizeLimit=0, + ) + + gpo_map = {} + for entry in resp: + if not isinstance(entry, ldapasn1_impacket.SearchResultEntry): + continue + + attrs = {} + for attribute in entry["attributes"]: + attr_name = str(attribute["type"]) + vals = [str(val) for val in attribute["vals"].components] + attrs[attr_name] = vals[0] if len(vals) == 1 else vals + + guid = attrs.get("cn", "").upper() + if guid: + if not guid.startswith("{"): + guid = f"{{{guid}}}" + gpo_map[guid] = { + "displayName": attrs.get("displayName", "Unknown"), + "gPCFileSysPath": attrs.get("gPCFileSysPath", ""), + "versionNumber": attrs.get("versionNumber", "0"), + "whenCreated": attrs.get("whenCreated", ""), + "whenChanged": attrs.get("whenChanged", ""), + } + + logger.debug(f"LDAP returned {len(gpo_map)} GPOs") + return gpo_map + + except Exception as e: + logger.debug(f"LDAP GPO query failed: {e}") + return None diff --git a/nxc/helpers/path.py b/nxc/helpers/path.py new file mode 100644 index 0000000000..a4dd96029e --- /dev/null +++ b/nxc/helpers/path.py @@ -0,0 +1,13 @@ +from pathlib import PurePosixPath + + +def sanitize_filename(name: str) -> str: + """Strip path traversal components from an SMB filename. + + Follows the pattern from spider_plus.py — filters '..' and '.' from + PurePosixPath.parts to prevent directory traversal attacks from + malicious SMB servers. + """ + parts = PurePosixPath(name.replace("\\", "/")).parts + clean = [p for p in parts if p not in ("..", ".", "/")] + return str(PurePosixPath(*clean)) if clean else "" diff --git a/nxc/helpers/smb.py b/nxc/helpers/smb.py new file mode 100644 index 0000000000..3cdb1b8aa2 --- /dev/null +++ b/nxc/helpers/smb.py @@ -0,0 +1,102 @@ +from impacket.dcerpc.v5 import transport, srvs +from impacket.dcerpc.v5.dtypes import OWNER_SECURITY_INFORMATION, GROUP_SECURITY_INFORMATION, DACL_SECURITY_INFORMATION +from impacket.ldap import ldaptypes + +from nxc.protocols.ldap.constants import ACCESS_MASK_TO_TEXT_LOOKUP + + +def get_share_security_descriptor(connection, share_name, path, sec_info_flags=None): + r"""Get the security descriptor for a file/folder on an SMB share via SRVS RPC. + + Args: + connection: An NXC SMB connection object with host, username, password, etc. + share_name: The SMB share name (e.g. "SYSVOL"). + path: The path within the share (e.g. "domain.local\\Policies\\{GUID}"). + sec_info_flags: Security information flags to request. Defaults to + OWNER | GROUP | DACL. + + Returns: + An SR_SECURITY_DESCRIPTOR parsed from the raw response. + """ + if sec_info_flags is None: + sec_info_flags = OWNER_SECURITY_INFORMATION | GROUP_SECURITY_INFORMATION | DACL_SECURITY_INFORMATION + + rpc_transport = transport.SMBTransport( + connection.host, + connection.host, + filename=r"\srvsvc", + smb_connection=connection.conn, + ) + + rpc_transport.set_credentials( + connection.username, + connection.password, + connection.domain, + connection.lmhash, + connection.nthash, + connection.aesKey, + ) + rpc_transport.set_kerberos(connection.kerberos, connection.kdcHost) + + dce = rpc_transport.get_dce_rpc() + dce.connect() + dce.bind(srvs.MSRPC_UUID_SRVS) + + # SRVS RPC requires null-terminated strings + rpc_share = f"{share_name}\x00" + rpc_path = f"\\{path.replace('/', chr(92))}\x00" + + raw = srvs.hNetrpGetFileSecurity(dce, rpc_share, rpc_path, sec_info_flags) + dce.disconnect() + + return ldaptypes.SR_SECURITY_DESCRIPTOR(raw) + + +def parse_dacl_aces(sd, sid_resolver=None): + """Parse ACEs from a security descriptor's DACL into structured data. + + Args: + sd: An SR_SECURITY_DESCRIPTOR (from impacket ldaptypes). + sid_resolver: Optional callable that takes a SID string and returns + a human-readable name. If None, SID names are left as empty strings. + + Returns: + A dict with keys: + owner: {"sid": str, "name": str} + group: {"sid": str, "name": str} + aces: list of {"ace_type": str, "sid": str, "sid_name": str, "permissions": list[str], "raw_mask": int} + If there is no DACL, the ``aces`` list will be empty. + """ + owner_sid = sd["OwnerSid"].formatCanonical() if sd["OwnerSid"] else "" + group_sid = sd["GroupSid"].formatCanonical() if sd["GroupSid"] else "" + + result = { + "owner": {"sid": owner_sid, "name": sid_resolver(owner_sid) if sid_resolver and owner_sid else ""}, + "group": {"sid": group_sid, "name": sid_resolver(group_sid) if sid_resolver and group_sid else ""}, + "aces": [], + } + + if not sd["Dacl"]: + return result + + for i in range(sd["Dacl"]["AceCount"]): + ace = sd["Dacl"]["Data"][i] + ace_type = ace["TypeName"] + access_mask = int(ace["Ace"]["Mask"]["Mask"]) + sid = ace["Ace"]["Sid"].formatCanonical() + + permissions = [mask_name for mask_value, mask_name in ACCESS_MASK_TO_TEXT_LOOKUP.items() if access_mask & mask_value] + + sid_name = "" + if sid_resolver: + sid_name = sid_resolver(sid) + + result["aces"].append({ + "ace_type": ace_type, + "sid": sid, + "sid_name": sid_name, + "permissions": permissions, + "raw_mask": access_mask, + }) + + return result diff --git a/nxc/modules/gpos.py b/nxc/modules/gpos.py new file mode 100644 index 0000000000..328776492e --- /dev/null +++ b/nxc/modules/gpos.py @@ -0,0 +1,276 @@ +import json +import ntpath +import re +import shutil +from datetime import datetime, timezone +from pathlib import Path +from impacket.dcerpc.v5.lsat import DCERPCSessionError +from nxc.config import process_secret +from nxc.helpers.misc import CATEGORY +from nxc.helpers.ldap import parse_ldap_timestamp, query_ldap_gpos +from nxc.helpers.smb import get_share_security_descriptor, parse_dacl_aces +from nxc.paths import NXC_PATH +from nxc.protocols.smb.samrfunc import LSAQuery + +GUID_PATTERN = re.compile(r"^\{[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}\}$") + + +class NXCModule: + """Module by @Marshall-Hallenbeck + Do things with Group Policy Objects (GPOs) in Active Directory + """ + + name = "gpos" + description = "Do things with Group Policy Objects (GPOs) in Active Directory" + supported_protocols = ["smb"] + category = CATEGORY.ENUMERATION + + def __init__(self): + self.context = None + self.module_options = None + self.gpo_name = None + self.fuzzy_search = False + self.all_props = False + self.download = True + self.download_dest = "Retrieved_GPOs" + self.list_permissions = False + + def options(self, context, module_options): + """ + NAME Name of the GPO (default retrieve all GPOs) + FUZZY Fuzzy search for name of GPOs (using wildcards) + ALL_PROPS Retrieve all properties of the GPO (default is name, guid, and sysfile path) + DOWNLOAD Download the GPOs to the local machine (default is True) + DEST Destination folder for downloaded GPOs (default is "Retrieved_GPOs") + LIST_PERMISSIONS List permissions for the GPOs (default is False) + """ + self.gpo_name = module_options.get("NAME") + self.fuzzy_search = module_options.get("FUZZY", "False").lower() == "true" + self.all_props = module_options.get("ALL_PROPS", "False").lower() == "true" + self.download = module_options.get("DOWNLOAD", "True").lower() == "true" + self.download_dest = module_options.get("DEST", "Retrieved_GPOs") + self.list_permissions = module_options.get("LIST_PERMISSIONS", "False").lower() == "true" + context.log.debug(f"Module options: {self.gpo_name=}, {self.fuzzy_search=}, {self.all_props=}, {self.download=}, {self.download_dest=}, {self.list_permissions=}") + + def on_login(self, context, connection): + gpo_ldap_map = query_ldap_gpos( + connection.host, + connection.domain, + connection.username, + connection.password, + connection.lmhash, + connection.nthash, + connection.aesKey, + connection.kdcHost, + connection.kerberos, + context.log, + ) + + if gpo_ldap_map is not None: + context.log.display(f"Queried {len(gpo_ldap_map)} GPOs via LDAP, enumerating SYSVOL via SMB") + else: + context.log.display("LDAP query failed, falling back to SMB-only (names may show as 'Unknown')") + + policies = connection.conn.listPath("SYSVOL", f"{connection.domain}\\Policies\\*") + context.log.debug(f"Found policies path: {connection.domain}\\Policies") + + gpos_found = [] + + for item in policies: + item_name = item.get_longname() + context.log.debug(f"Item: {item}, Item name: {item_name}") + if item_name not in [".", ".."] and item.is_directory() and GUID_PATTERN.match(item_name): + gpo_guid = item_name + gpo_path = ntpath.join(f"{connection.domain}", "Policies", f"{gpo_guid}") + + ldap_data = gpo_ldap_map.get(gpo_guid.upper()) if gpo_ldap_map else None + display_name = ldap_data["displayName"] if ldap_data else self.get_gpo_display_name_from_sysvol(context, connection, gpo_path) + + if self.gpo_name: + match = self.gpo_name.lower() in display_name.lower() if self.fuzzy_search else self.gpo_name.lower() == display_name.lower() + if not match: + continue + + gpos_found.append((gpo_guid, gpo_path, display_name, ldap_data)) + + context.log.success(f"GPO Found: '{display_name}'") + context.log.highlight(f"Display Name: {display_name}") + context.log.highlight(f"GUID: {gpo_guid}") + context.log.highlight(f"GPO Path: \\\\SYSVOL\\{gpo_path}") + + if self.all_props and ldap_data: + context.log.highlight(f"Version: {ldap_data['versionNumber']}") + context.log.highlight(f"Created: {parse_ldap_timestamp(ldap_data['whenCreated'])}") + context.log.highlight(f"Modified: {parse_ldap_timestamp(ldap_data['whenChanged'])}") + + if self.list_permissions: + self.get_folder_security_info(context, connection, gpo_guid, gpo_path) + + if self.download: + context.log.display(f"Downloading GPO {gpo_guid} from SYSVOL share") + gpo_dest = Path(self.download_dest) / gpo_guid + self.download_gpo(context, connection, gpo_path, gpo_dest, gpo_guid) + + context.log.success(f"GPOs Found: {len(gpos_found)}") + + if gpos_found: + self.save_gpo_loot(context, connection, gpos_found) + + def save_gpo_loot(self, context, connection, gpos_found): + """Save GPO information to the NXC loot directory in text and JSON format.""" + loot_dir = Path(NXC_PATH) / "modules" / "gpos" / connection.domain / connection.host + loot_dir.mkdir(parents=True, exist_ok=True) + + all_gpos_json = [] + + for gpo_guid, gpo_path, display_name, ldap_data in gpos_found: + gpo_dir = loot_dir / gpo_guid + gpo_dir.mkdir(parents=True, exist_ok=True) + + gpo_info = { + "guid": gpo_guid, + "displayName": display_name, + "sysvolPath": f"\\\\SYSVOL\\{gpo_path}", + } + if ldap_data: + gpo_info["versionNumber"] = ldap_data["versionNumber"] + gpo_info["whenCreated"] = parse_ldap_timestamp(ldap_data["whenCreated"]) + gpo_info["whenChanged"] = parse_ldap_timestamp(ldap_data["whenChanged"]) + gpo_info["gPCFileSysPath"] = ldap_data["gPCFileSysPath"] + + all_gpos_json.append(gpo_info) + + json_path = gpo_dir / "info.json" + with open(json_path, "w") as f: + json.dump(gpo_info, f, indent=4) + + txt_path = gpo_dir / "info.txt" + with open(txt_path, "w") as f: + f.write(f"Display Name: {display_name}\n") + f.write(f"GUID: {gpo_guid}\n") + f.write(f"SYSVOL Path: \\\\SYSVOL\\{gpo_path}\n") + if ldap_data: + f.write(f"Version: {ldap_data['versionNumber']}\n") + f.write(f"Created: {parse_ldap_timestamp(ldap_data['whenCreated'])}\n") + f.write(f"Modified: {parse_ldap_timestamp(ldap_data['whenChanged'])}\n") + f.write(f"gPCFileSysPath: {ldap_data['gPCFileSysPath']}\n") + + all_json_path = loot_dir / "gpos.json" + with open(all_json_path, "w") as f: + json.dump(all_gpos_json, f, indent=4) + + all_txt_path = loot_dir / "gpos.txt" + with open(all_txt_path, "w") as f: + f.write(f"GPOs enumerated from {connection.domain} ({connection.host})\n") + f.write(f"Date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n") + f.write(f"Total: {len(all_gpos_json)}\n") + f.write("=" * 60 + "\n\n") + for gpo in all_gpos_json: + f.write(f"Display Name: {gpo['displayName']}\n") + f.write(f"GUID: {gpo['guid']}\n") + f.write(f"SYSVOL Path: {gpo['sysvolPath']}\n") + if "versionNumber" in gpo: + f.write(f"Version: {gpo['versionNumber']}\n") + f.write(f"Created: {gpo['whenCreated']}\n") + f.write(f"Modified: {gpo['whenChanged']}\n") + f.write("-" * 60 + "\n\n") + + context.log.display(f"GPO info saved to {loot_dir}") + + def get_gpo_display_name_from_sysvol(self, context, connection, gpo_path): + """Try to get the display name of a GPO by reading GPT.ini from SYSVOL via SMB""" + try: + ini_path = f"{gpo_path}\\GPT.ini" + context.log.debug(f"GPT.ini path: {ini_path}") + ini_content = bytearray() + + def callback(data): + ini_content.extend(data) + + connection.conn.getFile("SYSVOL", ini_path, callback) + ini_text = ini_content.decode("utf-8", errors="replace") + + for line in ini_text.splitlines(): + if line.lower().startswith("displayname="): + return line.strip().split("=", 1)[1] + + return "Unknown" + except Exception as e: + context.log.debug(f"Could not read GPO display name: {e}") + return ntpath.basename(gpo_path) + + def download_gpo(self, context, smb_conn, sysvol_path, dest_folder, guid): + """Download a GPO from the SYSVOL share using the appropriate method""" + context.log.debug(f"Downloading GPO {guid} from {sysvol_path} to {dest_folder}") + + original_share = smb_conn.args.share + + try: + smb_conn.args.share = "SYSVOL" + smb_conn.download_folder(sysvol_path, dest_folder, recursive=True) + context.log.success(f"GPO {guid} downloaded to {dest_folder}") + except Exception as e: + context.log.fail(f"Error downloading GPO {guid}: {e}") + if Path(dest_folder).exists(): + try: + shutil.rmtree(dest_folder) + except Exception as cleanup_error: + context.log.debug(f"Error cleaning up directory: {cleanup_error}") + + context.log.highlight("To manually download this GPO, use the following command:") + context.log.highlight(f"nxc smb {smb_conn.host} -u '{smb_conn.username}' -p '{process_secret(smb_conn.password)}' --share SYSVOL --get-folder '{sysvol_path}' '{dest_folder}' --recursive") + finally: + smb_conn.args.share = original_share + + def get_folder_security_info(self, context, connection, gpo_guid, gpo_path): + """Get security information for a GPO folder and analyze permissions.""" + context.log.debug(f"Getting security info for GPO path: {gpo_path}") + + sd = get_share_security_descriptor(connection, "SYSVOL", gpo_path) + context.log.success("Retrieved security information") + + try: + lsa_query = LSAQuery( + connection.username, + connection.password, + connection.domain, + connection.port, + connection.host, + connection.host, + connection.kdcHost, + connection.aesKey, + connection.kerberos, + logger=context.log, + ) + except Exception as lsa_error: + context.log.debug(f"Error creating LSAQuery: {lsa_error}") + return + + def resolve_sid(sid): + try: + return lsa_query.lookup_sids([sid])[0] + except DCERPCSessionError as e: + context.log.debug(f"Error looking up SID {sid}: {e}") + return "Unknown (potentially deleted user)" + + parsed = parse_dacl_aces(sd, sid_resolver=resolve_sid) + if not parsed: + return + + context.log.highlight(f"Security Descriptor for GPO {gpo_guid}:") + + if parsed["owner"]["sid"]: + context.log.highlight(f" Owner SID: {parsed['owner']['sid']} - {parsed['owner']['name']}") + if parsed["group"]["sid"]: + context.log.highlight(f" Group SID: {parsed['group']['sid']} - {parsed['group']['name']}") + + if not parsed["aces"]: + context.log.highlight(" No DACL found - no access restrictions") + return + + context.log.highlight(f" DACL: {len(parsed['aces'])} ACEs (Access Control Entries)") + for i, ace in enumerate(parsed["aces"]): + perm_str = ", ".join(ace["permissions"]) if ace["permissions"] else f"0x{ace['raw_mask']:08x}" + context.log.highlight(f" ACE {i + 1}: {ace['ace_type']}") + context.log.highlight(f" SID: {ace['sid']} - {ace['sid_name']}") + context.log.highlight(f" Permissions: {perm_str}") diff --git a/nxc/protocols/ldap/constants.py b/nxc/protocols/ldap/constants.py new file mode 100644 index 0000000000..2c54555717 --- /dev/null +++ b/nxc/protocols/ldap/constants.py @@ -0,0 +1,124 @@ +from impacket.ldap.ldaptypes import ACCESS_MASK + +# https://learn.microsoft.com/en-us/windows/win32/secauthz/well-known-sids +# https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-dtyp/81d92bba-d22b-4a8c-908a-554ab29148ab?source=recommendations +WELL_KNOWN_SIDS = { + "S-1-0": "Null Authority", + "S-1-0-0": "Nobody", + "S-1-1": "World Authority", + "S-1-1-0": "Everyone", + "S-1-2": "Local Authority", + "S-1-2-0": "Local", + "S-1-2-1": "Console Logon", + "S-1-3": "Creator Authority", + "S-1-3-0": "Creator Owner", + "S-1-3-1": "Creator Group", + "S-1-3-2": "Creator Owner Server", + "S-1-3-3": "Creator Group Server", + "S-1-3-4": "Owner Rights", + "S-1-4": "Non-unique Authority", + "S-1-5": "NT Authority", + "S-1-5-1": "Dialup", + "S-1-5-2": "Network", + "S-1-5-3": "Batch", + "S-1-5-4": "Interactive", + "S-1-5-6": "Service", + "S-1-5-7": "Anonymous", + "S-1-5-8": "Proxy", + "S-1-5-9": "Enterprise Domain Controllers", + "S-1-5-10": "Principal Self", + "S-1-5-11": "Authenticated Users", + "S-1-5-12": "Restricted Code", + "S-1-5-13": "Terminal Server Users", + "S-1-5-14": "Remote Interactive Logon", + "S-1-5-15": "This Organization", + "S-1-5-17": "This Organization", + "S-1-5-18": "Local System", + "S-1-5-19": "NT Authority", + "S-1-5-20": "NT Authority", + "S-1-5-21-0-0-0-496": "Compounded Authentication", # https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-dtyp/11e1608c-6169-4fbc-9c33-373fc9b224f4#Appendix_A_13 + "S-1-5-21-0-0-0-497": "Claims Valid", # https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-dtyp/11e1608c-6169-4fbc-9c33-373fc9b224f4#Appendix_A_14 + "S-1-5-32-544": "Administrators", + "S-1-5-32-545": "Users", + "S-1-5-32-546": "Guests", + "S-1-5-32-547": "Power Users", + "S-1-5-32-548": "Account Operators", + "S-1-5-32-549": "Server Operators", + "S-1-5-32-550": "Print Operators", + "S-1-5-32-551": "Backup Operators", + "S-1-5-32-552": "Replicators", + "S-1-5-32-553": "RAS and IAS Servers", + "S-1-5-32-554": r"BUILTIN\Pre-Windows 2000 Compatible Access", + "S-1-5-32-555": r"BUILTIN\Remote Desktop Users", + "S-1-5-32-557": r"BUILTIN\Incoming Forest Trust Builders", + "S-1-5-32-556": r"BUILTIN\Network Configuration Operators", + "S-1-5-32-558": r"BUILTIN\Performance Monitor Users", + "S-1-5-32-559": r"BUILTIN\Performance Log Users", + "S-1-5-32-560": r"BUILTIN\Windows Authorization Access Group", + "S-1-5-32-561": r"BUILTIN\Terminal Server License Servers", + "S-1-5-32-562": r"BUILTIN\Distributed COM Users", + "S-1-5-32-568": r"BUILTIN\DCOM Users", + "S-1-5-32-569": r"BUILTIN\Cryptographic Operators", + "S-1-5-32-571": r"BUILTIN\Cacheable Principals", + "S-1-5-32-572": r"BUILTIN\Non-Cacheable Principals", + "S-1-5-32-573": r"BUILTIN\Event Log Readers", + "S-1-5-32-574": r"BUILTIN\Certificate Service DCOM Access", + "S-1-5-32-575": r"BUILTIN\RDS Remote Access Servers", + "S-1-5-32-576": r"BUILTIN\RDS Endpoint Servers", + "S-1-5-32-577": r"BUILTIN\RDS Management Servers", + "S-1-5-32-578": r"BUILTIN\Hyper-V Administrators", + "S-1-5-32-579": r"BUILTIN\Access Control Assistance Operators", + "S-1-5-32-580": r"BUILTIN\Remote Management Users", + "S-1-5-32-581": r"BUILTIN\Default Account", + "S-1-5-32-582": r"BUILTIN\Storage Replica Administrators", + "S-1-5-32-583": r"BUILTIN\Device Owners", + "S-1-5-32-584": r"BUILTIN\User Mode Hardware Operators", + "S-1-5-32-585": r"BUILTIN\OpenSSH Users", + "S-1-5-33": "Write Restricted Code", + "S-1-5-64-10": "NTLM Authentication", + "S-1-5-64-14": "SChannel Authentication", + "S-1-5-64-21": "Digest Authority", + "S-1-5-65-1": "This Organization Certificate", # see https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-pac/39f588d6-21e3-4e09-a9f2-d8f7b9b998bf + "S-1-5-80": "NT Service", + "S-1-5-80-0": "All Services", + "S-1-5-83-0": r"NT VIRTUAL MACHINE\Virtual Machines", + "S-1-5-84-0-0-0-0-0": "User Mode Drivers", + "S-1-5-1000": "Other Organization", + "S-1-16-0": "Untrusted Mandatory Level", + "S-1-16-4096": "Low Mandatory Level", + "S-1-16-8192": "Medium Mandatory Level", + "S-1-16-8448": "Medium Plus Mandatory Level", + "S-1-16-12288": "High Mandatory Level", + "S-1-16-16384": "System Mandatory Level", + "S-1-16-20480": "Protected Process Mandatory Level", + "S-1-16-28672": "Secure Process Mandatory Level", + "S-1-18-1": "Authentication Authority Asserted Identity", + "S-1-18-2": "Service Asserted Identity", + "S-1-18-3": "Fresh public key identity", + "S-1-18-5": "MFA Key Property", + "S-1-18-6": "Attested Key Property", +} + +ACCESS_MASK_TO_TEXT_LOOKUP = { + ACCESS_MASK.GENERIC_READ: "GENERIC_READ", + ACCESS_MASK.GENERIC_WRITE: "GENERIC_WRITE", + ACCESS_MASK.GENERIC_EXECUTE: "GENERIC_EXECUTE", + ACCESS_MASK.GENERIC_ALL: "GENERIC_ALL", + ACCESS_MASK.MAXIMUM_ALLOWED: "MAXIMUM_ALLOWED", + ACCESS_MASK.ACCESS_SYSTEM_SECURITY: "ACCESS_SYSTEM_SECURITY", + ACCESS_MASK.SYNCHRONIZE: "SYNCHRONIZE", + ACCESS_MASK.WRITE_OWNER: "WRITE_OWNER", + ACCESS_MASK.WRITE_DACL: "WRITE_DACL", + ACCESS_MASK.READ_CONTROL: "READ_CONTROL", + ACCESS_MASK.DELETE: "DELETE", + # file system rights + 0x00000001: "FILE_READ_DATA", + 0x00000002: "FILE_WRITE_DATA", + 0x00000004: "FILE_APPEND_DATA", + 0x00000008: "FILE_READ_EA", + 0x00000010: "FILE_WRITE_EA", + 0x00000020: "FILE_EXECUTE", + 0x00000040: "FILE_DELETE_CHILD", + 0x00000080: "FILE_READ_ATTRIBUTES", + 0x00000100: "FILE_WRITE_ATTRIBUTES" +} diff --git a/nxc/protocols/smb.py b/nxc/protocols/smb.py index 3270356d52..6c74627597 100755 --- a/nxc/protocols/smb.py +++ b/nxc/protocols/smb.py @@ -4,6 +4,9 @@ import re import struct import ipaddress +from pathlib import Path + +from nxc.helpers.path import sanitize_filename from Cryptodome.Hash import MD4 from textwrap import dedent @@ -35,7 +38,7 @@ from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.dcomrt import DCOMConnection from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login -from impacket.smb3structs import FILE_SHARE_WRITE, FILE_SHARE_DELETE, SMB2_0_IOCTL_IS_FSCTL +from impacket.smb3structs import FILE_READ_DATA, FILE_WRITE_DATA, FILE_SHARE_WRITE, FILE_SHARE_DELETE, SMB2_0_IOCTL_IS_FSCTL from impacket.dcerpc.v5 import tsts as TSTS from nxc.config import process_secret, host_info_colors, check_guest_account @@ -1937,24 +1940,108 @@ def put_file(self): for src, dest in self.args.put_file: self.put_file_single(src, dest) - def get_file_single(self, remote_path, download_path): + def download_file(self, share_name, remote_path, dest_file, access_mode=FILE_READ_DATA): + try: + self.logger.debug(f"Getting file from {share_name}:{remote_path} with access mode {access_mode}") + self.conn.getFile(share_name, remote_path, dest_file, shareAccessMode=access_mode) + return True + except SessionError as e: + if "STATUS_SHARING_VIOLATION" in str(e): + self.logger.debug(f"Sharing violation on {remote_path}: {e}") + else: + self.logger.debug(f"SessionError when attempting to download file {remote_path}: {e}") + return False + except Exception as e: + self.logger.debug(f"Other error when attempting to download file {remote_path}: {e}") + return False + + def get_file_single(self, remote_path, download_path, silent=False): share_name = self.args.share - self.logger.display(f'Copying "{remote_path}" to "{download_path}"') + if not silent: + self.logger.display(f"Copying '{remote_path}' to '{download_path}'") if self.args.append_host: download_path = f"{self.hostname}-{remote_path}" with open(download_path, "wb+") as file: - try: - self.conn.getFile(share_name, remote_path, file.write) - self.logger.success(f'File "{remote_path}" was downloaded to "{download_path}"') - except Exception as e: - self.logger.fail(f'Error writing file "{remote_path}" from share "{share_name}": {e}') - if os.path.getsize(download_path) == 0: - os.remove(download_path) + if self.download_file(share_name, remote_path, file.write): + if not silent: + self.logger.success(f"File '{remote_path}' was downloaded to '{download_path}'") + else: + self.logger.debug("Opening with READ alone failed, trying to open file with READ/WRITE access") + if self.download_file(share_name, remote_path, file.write, FILE_READ_DATA | FILE_WRITE_DATA): + if not silent: + self.logger.success(f"File '{remote_path}' was downloaded to '{download_path}'") + else: + if not silent: + self.logger.fail(f"Error downloading file '{remote_path}' from share '{share_name}'") def get_file(self): for src, dest in self.args.get_file: self.get_file_single(src, dest) + def download_folder(self, folder, dest, recursive=False, silent=False, base_dir=None, ignore_empty=False): + self.logger.debug(f"Downloading folder with args: {folder}, {dest}, Recursive: {recursive}, Silent: {silent}, Base dir: {base_dir}, Ignore empty: {ignore_empty}") + normalized_folder = ntpath.normpath(folder) + base_folder = os.path.basename(normalized_folder) + self.logger.debug(f"Base folder: {base_folder}") + + try: + items = self.conn.listPath(self.args.share, ntpath.join(folder, "*")) + except SessionError as e: + self.logger.error(f"Error listing folder '{folder}': {e}") + return + self.logger.debug(f"{len(items)} items in folder: {items}") + + filtered_items = [item for item in items if item.get_longname() not in [".", ".."]] + + # create local directory structure regardless of content; download empty folders by default + # change the Windows path to Linux and then join it with the base directory to get our actual save path + relative_path = os.path.join(*folder.replace(base_dir or folder, "").lstrip("\\").split("\\")) + local_folder_path = os.path.join(dest, relative_path) + + if not filtered_items and ignore_empty: + self.logger.debug(f"Skipping empty folder '{folder}'") + return + + # create the directory for this folder + os.makedirs(local_folder_path, exist_ok=True) + if not filtered_items and not silent: + self.logger.display(f"Created empty directory '{local_folder_path}'") + + for item in filtered_items: + item_name = sanitize_filename(item.get_longname()) + if not item_name: + self.logger.fail(f"Path traversal detected in '{item.get_longname()}', skipping") + continue + dir_path = ntpath.normpath(ntpath.join(normalized_folder, item_name)) + self.logger.debug(f"Parsing item: {item_name}, {dir_path}") + + if item.is_directory() and recursive: + self.logger.debug(f"Found new directory to parse: {dir_path}") + self.download_folder(dir_path, dest, recursive, silent, base_dir or folder, ignore_empty) + elif not item.is_directory(): + remote_file_path = ntpath.join(folder, item_name) + local_file_path = os.path.join(local_folder_path, item_name) + # Defense-in-depth: verify path stays under destination + resolved = Path(local_file_path).resolve() + if not str(resolved).startswith(str(Path(dest).resolve()) + os.sep): + self.logger.fail(f"Path traversal detected in '{item_name}', skipping") + continue + self.logger.debug(f"{dest=} {remote_file_path=} {relative_path=} {local_folder_path=} {local_file_path=}") + + try: + self.get_file_single(remote_file_path, local_file_path, silent) + except FileNotFoundError: + self.logger.fail(f"Error downloading file '{remote_file_path}' due to file not found (probably a race condition between listing and downloading)") + + def get_folder(self): + recursive = self.args.recursive + ignore_empty = getattr(self.args, "ignore_empty_folders", False) + self.logger.debug(f"Recursive option set to {recursive}") + self.logger.debug(f"Ignore empty folders option set to {ignore_empty}") + for folder, dest in self.args.get_folder: + self.download_folder(folder, dest, recursive, False, None, ignore_empty) + self.logger.success(f"Folder '{folder}' was downloaded to '{dest}'") + def enable_remoteops(self, regsecret=False): try: if regsecret: diff --git a/nxc/protocols/smb/proto_args.py b/nxc/protocols/smb/proto_args.py index 6765559c24..5c52d1b79c 100644 --- a/nxc/protocols/smb/proto_args.py +++ b/nxc/protocols/smb/proto_args.py @@ -90,6 +90,9 @@ def proto_args(parser, parents): files_group = smb_parser.add_argument_group("File Operations") files_group.add_argument("--put-file", action="append", nargs=2, metavar="FILE", help="Put a local file into remote target, ex: whoami.txt \\\\Windows\\\\Temp\\\\whoami.txt") files_group.add_argument("--get-file", action="append", nargs=2, metavar="FILE", help="Get a remote file, ex: \\\\Windows\\\\Temp\\\\whoami.txt whoami.txt") + files_group.add_argument("--get-folder", action="append", nargs=2, metavar="DIR", help="Get a remote directory, ex: \\\\Windows\\\\Temp\\\\testing testing") + files_group.add_argument("--recursive", default=False, action="store_true", help="Recursively get a folder") + files_group.add_argument("--ignore-empty-folders", default=False, action="store_true", help="Ignore empty folders when downloading") files_group.add_argument("--append-host", action="store_true", help="append the host to the get-file filename") cmd_exec_group = smb_parser.add_argument_group("Command Execution") diff --git a/nxc/protocols/smb/samrfunc.py b/nxc/protocols/smb/samrfunc.py index 5a4cc0f1e0..4cfe358d9f 100644 --- a/nxc/protocols/smb/samrfunc.py +++ b/nxc/protocols/smb/samrfunc.py @@ -7,6 +7,7 @@ from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE from impacket.nmb import NetBIOSError from impacket.smbconnection import SessionError +from nxc.protocols.ldap.constants import WELL_KNOWN_SIDS class SamrFunc: @@ -232,7 +233,9 @@ def get_policy_handle(self): def lookup_sids(self, sids): """Use a list comprehension to generate the names list. - - It calls the hLsarLookupSids() method directly in the list comprehension and extracts the "Name" value from each element in the "Names" list. + If the user is requesting a single SID and it is a well known SID, return the well known SID name to avoid a lookup. + Otherwise, it calls the hLsarLookupSids() method directly in the list comprehension and extracts the "Name" value from each element in the "Names" list. """ + if len(sids) == 1 and sids[0] in WELL_KNOWN_SIDS: + return [WELL_KNOWN_SIDS[sids[0]]] return [translated_names["Name"] for translated_names in lsat.hLsarLookupSids(self.dce, self.policy_handle, sids, lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta)["TranslatedNames"]["Names"]] diff --git a/tests/e2e_commands.txt b/tests/e2e_commands.txt index f9318889b8..1c84cd1505 100644 --- a/tests/e2e_commands.txt +++ b/tests/e2e_commands.txt @@ -43,6 +43,7 @@ netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -x ipconfig netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS --put-file TEST_NORMAL_FILE \\Windows\\Temp\\test_file.txt netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS --put-file TEST_NORMAL_FILE \\Windows\\Temp\\test_file.txt --put-file TEST_NORMAL_FILE \\Windows\\Temp\\test_file2.txt netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS --get-file \\Windows\\Temp\\test_file.txt /tmp/test_file.txt +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS --get-folder \\Windows\\Temp\\ /tmp/test_folder/ netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS --no-admin-check ##### SMB PowerShell netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -X ipconfig @@ -172,6 +173,13 @@ netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M winscp netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M zerologon #netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M change-password -o NEWPASS=Password123 #netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M change-password -o NEWNTHASH=58A478135A93AC3BF058A5EA0E8FDB71 +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M gpos -o DOWNLOAD=False +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M gpos -o NAME="Default Domain Policy" DOWNLOAD=False +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M gpos -o NAME="Default Domain Policy" ALL_PROPS=True DOWNLOAD=False +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M gpos -o NAME="Default" DOWNLOAD=False +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M gpos -o NAME="Default" FUZZY=True DOWNLOAD=False +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M gpos -o NAME="Default" FUZZY=True ALL_PROPS=True DOWNLOAD=False +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M gpos -o ALL_PROPS=True DOWNLOAD=False netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M lockscreendoors # test for multiple modules at once netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M spooler -M petitpotam -M zerologon -M nopac -M enum_av -M enum_dns -M gpp_autologin -M gpp_password -M lsassy -M impersonate -M install_elevated -M ioxidresolver -M ms17-010 -M ntlmv1 -M runasppl -M uac -M webdav -M wifi -M coerce_plus