From 3a6dafa859dd66ef24c535f07470ecf07f2c24de Mon Sep 17 00:00:00 2001 From: zurro Date: Tue, 16 Dec 2025 20:54:35 +0000 Subject: [PATCH 1/8] Added Cobalt Strike Connector --- .../cli/cli/cobaltstrike_connector/cache.py | 37 ++ .../cobaltstrike_client.py | 331 ++++++++++++++++++ .../cobaltstrike_connector.py | 92 +++++ .../download_monitor.py | 95 +++++ .../download_processor.py | 226 ++++++++++++ projects/cli/cli/config.py | 9 +- projects/cli/cli/main.py | 24 ++ projects/cli/settings_cobaltstrike.yaml | 19 + 8 files changed, 832 insertions(+), 1 deletion(-) create mode 100644 projects/cli/cli/cobaltstrike_connector/cache.py create mode 100644 projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py create mode 100644 projects/cli/cli/cobaltstrike_connector/cobaltstrike_connector.py create mode 100644 projects/cli/cli/cobaltstrike_connector/download_monitor.py create mode 100644 projects/cli/cli/cobaltstrike_connector/download_processor.py create mode 100644 projects/cli/settings_cobaltstrike.yaml diff --git a/projects/cli/cli/cobaltstrike_connector/cache.py b/projects/cli/cli/cobaltstrike_connector/cache.py new file mode 100644 index 00000000..cfe765cd --- /dev/null +++ b/projects/cli/cli/cobaltstrike_connector/cache.py @@ -0,0 +1,37 @@ +import logging + +from cli.cobaltstrike_connector.cobaltstrike_client import Beacon, CobaltStrikeClient + + +class ImplantCache: + def __init__(self, client: CobaltStrikeClient): + self.client = client + self.cache: dict[str, Beacon] = {} + self.logger = logging.getLogger(__name__) + + async def initialize(self): + """Initialize the cache with current beacons""" + try: + beacons = await self.client.get_beacons() + for beacon in beacons: + self.cache[beacon.bid] = beacon + self.logger.info(f"Initialized cache with {len(beacons)} beacons") + except Exception as e: + self.logger.error(f"Failed to initialize beacon cache: {e}") + raise + + async def get_beacon(self, bid: str) -> Beacon | None: + """Get beacon from cache, fetching from API if not found""" + if bid in self.cache: + return self.cache[bid] + + try: + # Refresh entire cache as there's no endpoint for single beacon + beacons = await self.client.get_beacons() + for beacon in beacons: + self.cache[beacon.bid] = beacon + + return self.cache.get(bid) + except Exception as e: + self.logger.error(f"Failed to fetch beacon {bid}: {e}") + return None diff --git a/projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py b/projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py new file mode 100644 index 00000000..6401e0f7 --- /dev/null +++ b/projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py @@ -0,0 +1,331 @@ +import functools +import logging +from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime +from typing import Any, ParamSpec, TypeVar +from urllib.parse import urljoin, urlparse + +import aiohttp + +# Type variables for the decorator +T = TypeVar("T") +P = ParamSpec("P") + + +@dataclass +class Beacon: + bid: str + pbid: str + pid: int + process: str + user: str + impersonated: str | None + is_admin: bool + computer: str + host: str + internal: str + external: str + os: str + version: str + build: int + charset: str + system_arch: str + beacon_arch: str + session: str + listener: str + pivot_hint: str | None + port: int + note: str | None + color: str + alive: bool + link_state: str + last_checkin_time: datetime + last_checkin_ms: int + last_checkin_formatted: str + sleep: int + jitter: int + supports_sleep: bool + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "Beacon": + sleep_data = data.get("sleep", {}) + return cls( + bid=data["bid"], + pbid=data["pbid"], + pid=data["pid"], + process=data["process"], + user=data["user"], + impersonated=data.get("impersonated"), + is_admin=data["isAdmin"], + computer=data["computer"], + host=data["host"], + internal=data["internal"], + external=data["external"], + os=data["os"], + version=data["version"], + build=data["build"], + charset=data["charset"], + system_arch=data["systemArch"], + beacon_arch=data["beaconArch"], + session=data["session"], + listener=data["listener"], + pivot_hint=data.get("pivotHint"), + port=data["port"], + note=data.get("note"), + color=data["color"], + alive=data["alive"], + link_state=data["linkState"], + last_checkin_time=datetime.fromisoformat(data["lastCheckinTime"].replace("Z", "+00:00")), + last_checkin_ms=data["lastCheckinMs"], + last_checkin_formatted=data["lastCheckinFormatted"], + sleep=sleep_data.get("sleep", 0), + jitter=sleep_data.get("jitter", 0), + supports_sleep=data["supportsSleep"], + ) + + +@dataclass +class Download: + id: str + bid: str + name: str + path: str + size: int + timestamp: datetime + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "Download": + return cls( + id=data["id"], + bid=data["bid"], + name=data["name"], + path=data["path"], + size=data["size"], + timestamp=datetime.fromtimestamp(data["timestamp"] / 1000), # Convert milliseconds to seconds + ) + + +class CobaltStrikeClient: + def __init__(self, base_url: str, verify_ssl: bool = False, validate_https: bool = True): + """ + Initialize the Cobalt Strike C2 Client client. + + Args: + base_url: The base URL of the API server (default: https://localhost:11000) + verify_ssl: Whether to verify SSL certificates (default: False) + validate_https: Whether to validate HTTPS usage (default: True) + + Raises: + ValueError: If the base_url contains a path, query parameters, or fragment + ValueError: If validate_https is True and the URL scheme is not HTTPS + """ + self.logger = logging.getLogger(__name__) + + # Parse and validate the base URL + parsed = urlparse(base_url) + if any([parsed.path not in ["", "/"], parsed.params, parsed.query, parsed.fragment]): + raise ValueError("base_url must not contain path, parameters, query string, or fragment") + + # Validate HTTPS usage if required + # if validate_https and parsed.scheme != "https": + # raise ValueError("HTTPS is required when validate_https is True") + + # Ensure the base_url doesn't end with a slash + self.base_url = base_url.rstrip("/") + self.verify_ssl = verify_ssl + self._session = None + self._access_token = None + self._stored_credentials = None # Store credentials for reauthentication + + async def __aenter__(self): + """Context manager entry - creates the aiohttp session.""" + # Set timeout for all requests (30 seconds total, 10 seconds for connection) + timeout = aiohttp.ClientTimeout(total=30, connect=10) + + # Create SSL context based on verify_ssl setting + import ssl + ssl_context = ssl.create_default_context() + if not self.verify_ssl: + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + self._session = aiohttp.ClientSession( + cookie_jar=aiohttp.CookieJar(), + connector=aiohttp.TCPConnector(ssl=ssl_context if self.verify_ssl else False), + timeout=timeout + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - closes the aiohttp session.""" + if self._session: + await self._session.close() + self._session = None + + async def _reauthenticate(self) -> bool: + """ + Attempt to reauthenticate using stored credentials. + + Returns: + bool: True if reauthentication was successful + """ + if not self._stored_credentials: + self.logger.error("No stored credentials available for reauthentication") + return False + + self.logger.info("Attempting reauthentication") + return await self.authenticate(self._stored_credentials["username"], self._stored_credentials["password"]) + + def requires_auth(func: Callable[P, T]) -> Callable[P, T]: + """ + Decorator that handles token expiration and reauthentication. + """ + + @functools.wraps(func) + async def wrapper(self: "CobaltStrikeClient", *args: P.args, **kwargs: P.kwargs) -> T: + try: + return await func(self, *args, **kwargs) + except aiohttp.ClientResponseError as e: + if e.status == 401: # Unauthorized - token might be expired + self.logger.info("Token appears to be expired, attempting reauthentication") + if await self._reauthenticate(): + self.logger.info("Reauthentication successful, retrying original request") + return await func(self, *args, **kwargs) + else: + self.logger.error("Reauthentication failed") + raise + raise + + return wrapper + + async def authenticate(self, username: str, password: str) -> bool: + """ + Authenticate with the API using username and password. + + Args: + username: The username to authenticate with + password: The password for authentication + + Returns: + bool: True if authentication was successful + """ + + if not self._session: + raise RuntimeError("Client session not established") + + auth_url = urljoin(self.base_url, "/api/auth/login") + data = {"username": username, "password": password, "duration_ms": 86400000} + + try: + self.logger.info(f"Attempting authentication for user: {username}") + async with self._session.post(auth_url, json=data) as response: + if response.status == 200: + auth_response = await response.json() + token = auth_response.get("access_token") + + if token: + self._access_token = token + + # Set the Authorization header for future requests + self._session.headers.update({"Authorization": f"Bearer {token}"}) + + self._stored_credentials = {"username": username, "password": password} + self.logger.info("Authentication successful") + return True + else: + self.logger.warning("No access token in response") + return False + + self.logger.warning(f"Authentication failed with status code: {response.status}") + return False + except Exception as e: + self.logger.error(f"Authentication error: {str(e)}") + raise + + @requires_auth + async def get_beacons(self) -> list[Beacon]: + """Get list of all beacons.""" + + if not self._session: + raise RuntimeError("Client session not established") + + try: + self.logger.info("Fetching beacons list") + async with self._session.get(urljoin(self.base_url, "/api/v1/beacons")) as response: + if response.status == 200: + data = await response.json() + beacons = [Beacon.from_dict(beacon) for beacon in data] + self.logger.info(f"Successfully retrieved {len(beacons)} beacons") + return beacons + self.logger.error(f"Failed to get beacons, status code: {response.status}") + response.raise_for_status() + except Exception as e: + self.logger.error(f"Error getting beacons: {str(e)}") + raise + + @requires_auth + async def get_downloads(self) -> list[Download]: + """Get list of all downloads.""" + + if not self._session: + raise RuntimeError("Client session not established") + + try: + self.logger.debug("Fetching downloads list") + async with self._session.get(urljoin(self.base_url, "/api/v1/data/downloads")) as response: + if response.status == 200: + data = await response.json() + downloads = [Download.from_dict(download) for download in data] + self.logger.debug(f"Successfully retrieved {len(downloads)} downloads") + return downloads + + self.logger.error(f"Failed to get downloads, status code: {response.status}") + response.raise_for_status() + except Exception as e: + self.logger.error(f"Error getting downloads: {str(e)}") + raise + + @requires_auth + async def download_file(self, download_uid: str, output_path: str) -> bool: + """ + Download a file using its UID and save it to the specified path. + + Args: + download_uid: The UID of the download + output_path: The local path where the file should be saved + + Returns: + bool: True if download was successful + + Raises: + aiohttp.ClientResponseError: If the server returns an error response + IOError: If there's an error writing the file + """ + if not self._session: + raise RuntimeError("Client session not established") + + try: + self.logger.debug(f"Downloading file with UID: {download_uid}") + download_url = urljoin(self.base_url, f"/api/v1/data/downloads/{download_uid}") + + async with self._session.get(download_url) as response: + if response.status == 200: + with open(output_path, "wb") as f: + # Stream the download to avoid loading entire file into memory + async for chunk in response.content.iter_chunked(8192): + f.write(chunk) + + self.logger.debug(f"Successfully downloaded file to: {output_path}") + return True + + self.logger.error(f"Failed to download file, status code: {response.status}") + response.raise_for_status() + return False + + except OSError as e: + self.logger.error(f"Error writing file to {output_path}: {str(e)}") + raise + except Exception as e: + self.logger.error(f"Error downloading file: {str(e)}") + raise diff --git a/projects/cli/cli/cobaltstrike_connector/cobaltstrike_connector.py b/projects/cli/cli/cobaltstrike_connector/cobaltstrike_connector.py new file mode 100644 index 00000000..e2fe3675 --- /dev/null +++ b/projects/cli/cli/cobaltstrike_connector/cobaltstrike_connector.py @@ -0,0 +1,92 @@ +# main.py +import asyncio +import logging + +import urllib3 +from cli.config import Config, CobaltStrikeConfig +from cli.log import setup_logging +from cli.nemesis_client import NemesisClient +from cli.cobaltstrike_connector.cache import ImplantCache +from cli.cobaltstrike_connector.download_monitor import CobaltStrikeDownloadMonitor +from cli.cobaltstrike_connector.download_processor import CobaltStrikeDownloadProcessor +from cli.cobaltstrike_connector.cobaltstrike_client import CobaltStrikeClient + +setup_logging() + + +async def run_cobaltstrike_connector(config: Config, logger: logging.Logger): + """Main connector logic""" + + if not config.validate_https_certs: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + try: + tasks = [] + + # Add CobaltStrike tasks + if config.cobaltstrike: + for cobaltstrike_config in config.cobaltstrike: + task = setup_cobaltstrike_monitor(cobaltstrike_config, config, logger) + tasks.append(task) + + # Add Mythic tasks + # if config.mythic: + # tasks.extend(setup_mythic_manager(mythic_config, config, logger) for mythic_config in config.mythic) + + # Add other C2 platform tasks here following the same pattern... + + if tasks: + # Run all managers concurrently + await asyncio.gather(*tasks, return_exceptions=True) + else: + logger.warning("No C2 platforms configured") + + except Exception as e: + logger.error(f"Error running connector: {e}") + raise + finally: + logger.info("Shutting down") + + +async def setup_cobaltstrike_monitor(cobaltstrike_config: CobaltStrikeConfig, config: Config, logger: logging.Logger): + """Set up and run a single Cobalt Strike manager instance""" + base_url = str(cobaltstrike_config.url) + username = cobaltstrike_config.credential.username + password = cobaltstrike_config.credential.password + logger.info(f"Connecting to Cobalt Strike at {base_url}") + async with CobaltStrikeClient( + base_url=base_url, + verify_ssl=config.validate_https_certs, + ) as cobaltstrike_client: + # Authenticate + if not await cobaltstrike_client.authenticate(username, password): + raise Exception(f"Authentication failed for {base_url}") + + # Initialize components + implant_cache = ImplantCache(cobaltstrike_client) + await implant_cache.initialize() + + nemesis_client = NemesisClient(config.nemesis) + + # Initialize processor with the client + nemesis_file_processor = CobaltStrikeDownloadProcessor( + config.cache_db_path, + nemesis_client, + cobaltstrike_config.project, + cobalt_strike=cobaltstrike_client, + ) + + monitor = CobaltStrikeDownloadMonitor( + cobaltstrike_client, + implant_cache, + nemesis_file_processor, + cobaltstrike_config.poll_interval_sec, + ) + + # Start the manager + logger.info(f"Starting Cobalt Strike download monitor for {base_url}") + try: + await monitor.start() + except Exception as e: + logger.error(f"Monitor failed: {e}") + raise \ No newline at end of file diff --git a/projects/cli/cli/cobaltstrike_connector/download_monitor.py b/projects/cli/cli/cobaltstrike_connector/download_monitor.py new file mode 100644 index 00000000..179b5c11 --- /dev/null +++ b/projects/cli/cli/cobaltstrike_connector/download_monitor.py @@ -0,0 +1,95 @@ +import asyncio +import logging +from asyncio import Queue + +from cli.cobaltstrike_connector.cache import ImplantCache +from cli.cobaltstrike_connector.download_processor import CobaltStrikeDownloadProcessor +from cli.cobaltstrike_connector.cobaltstrike_client import CobaltStrikeClient + +logger = logging.getLogger(__name__) + + +class CobaltStrikeDownloadMonitor: + """Monitors and processes Cobalt Strike C2 downloads. + + This class continuously polls for new downloads from a Cobalt Strike C2 server, + verifies if they've been previously processed, and submits unprocessed downloads + to a processing queue. It maintains a background worker that processes queued + downloads using the Nemesis file processor. + + Attributes: + cobalt_strike_client (CobaltStrikeClient): Client for interacting with Cobalt Strike C2 server + beacon_cache (ImplantCache): Cache for looking up beacon information + nemesis_file_processor (NemesisFileProcessor): Processor for handling downloaded files + polling_interval (int): Seconds to wait between polling for new downloads + queue (Queue): Async queue for managing download processing + is_running (bool): Flag indicating if the processor is currently running + + Example: + processor = CobaltStrikeDownloadProcessor(client, cache, file_processor) + await processor.start() # Begins monitoring and processing downloads + await processor.stop() # Gracefully stops processing + """ + + def __init__( + self, + cobalt_strike_client: CobaltStrikeClient, + beacon_cache: ImplantCache, + nemesis_file_processor: CobaltStrikeDownloadProcessor, + polling_interval: int = 60, + ): + self.cobalt_strike_client = cobalt_strike_client + self.beacon_cache = beacon_cache + self.nemesis_file_processor = nemesis_file_processor + self.polling_interval = polling_interval + self.queue: Queue = Queue() + self.is_running = False + + async def process_queue(self): + """Process downloads from the queue""" + while self.is_running: + try: + download, beacon = await self.queue.get() + await self.nemesis_file_processor.process_cobaltstrike_download(download, beacon) + self.queue.task_done() + except Exception as e: + logger.error(f"Error processing download from queue: {e}") + + async def get_downloads(self): + """Check for new downloads and add them to queue""" + while self.is_running: + try: + downloads = await self.cobalt_strike_client.get_downloads() + logger.debug(f"Found {len(downloads)} CobaltStrike downloads") + + for download in downloads: + if self.nemesis_file_processor.is_processed(download): + logger.debug(f"Download {download.id} has already been processed") + continue + + beacon = await self.beacon_cache.get_beacon(download.bid) + if beacon: + await self.queue.put((download, beacon)) + logger.info(f"Queued download for processing. ID: {download.id}. Path: {download.path}") + else: + logger.warning(f"Could not find beacon {download.bid} for download {download.id}") + + except Exception as e: + logger.error(f"Error checking downloads: {e}") + + await asyncio.sleep(self.polling_interval) + + async def start(self): + """Start the download manager""" + self.is_running = True + await asyncio.gather( + self.process_queue(), + self.get_downloads(), + ) + + async def stop(self): + """Stop the download manager""" + self.is_running = False + # Wait for queue to be processed + await self.queue.join() + self.nemesis_file_processor.close() diff --git a/projects/cli/cli/cobaltstrike_connector/download_processor.py b/projects/cli/cli/cobaltstrike_connector/download_processor.py new file mode 100644 index 00000000..0ba56c1c --- /dev/null +++ b/projects/cli/cli/cobaltstrike_connector/download_processor.py @@ -0,0 +1,226 @@ +import json +import logging +import os +import tempfile +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Union + +import plyvel +from cli.nemesis_client import NemesisClient +from cli.cobaltstrike_connector.cobaltstrike_client import Download, Beacon, CobaltStrikeClient +from common.models2.api import FileMetadata, FileWithMetadataResponse + +logger = logging.getLogger(__name__) + + +@dataclass +class DownloadedFileInfo: + file_path: Path | None + delete_after: bool + success: bool + temp_file: Path | None = None + + def __post_init__(self): + # Track temp file separately for cleanup + if self.delete_after and self.file_path: + self.temp_file = self.file_path + + +class CobaltStrikeDownloadProcessor: + """Processes and tracks downloads from Cobalt Strike, uploading them to Nemesis. + + This class handles the lifecycle of downloads from Cobalt Strike: + - Retrieves files directly from the Cobalt Strike REST API + - Tracks processed downloads to prevent duplicate processing + - Uploads files to Nemesis + + """ + + def __init__( + self, + db_path: Path, + nemesis: NemesisClient, + project: str, + cobalt_strike: CobaltStrikeClient | None = None, + ): + """Initialize with LevelDB path and optional components + + Args: + db_path: Path to LevelDB database + nemesis: Configured NemesisClient instance + project: Name of the project + cobalt_strike: Configured CobaltStrikeClient instance + """ + + # Check that cobalt_strike client is configured + if not cobalt_strike: + raise ValueError("cobalt_strike is required") + + logger.info("Using Cobalt Strike API to obtain downloads") + + if not nemesis: + raise ValueError("NemesisClient is required") + + self.project = project + self.client = nemesis + self.cobalt_strike = cobalt_strike + self.db = plyvel.DB(str(db_path), create_if_missing=True) + + def is_processed(self, download: Download) -> bool: + """Check if download has been processed""" + key = f"download:{download.id}".encode() + return self.db.get(key) is not None + + def mark_processed( + self, download: Download, success: bool = True, response: FileWithMetadataResponse | None = None + ): + """Mark download as processed""" + key = f"download:{download.id}".encode() + value = json.dumps( + { + "timestamp": datetime.now().isoformat(), + "download_info": f"{download.id} - {download.bid}", + "success": success, + "file_path": str(download.id), + "object_id": str(response.object_id) if response else None, + "submission_id": str(response.submission_id) if response else None, + } + ).encode() + self.db.put(key, value) + + async def upload_file( + self, + file_path: Union[Path, str], + metadata: FileMetadata, + delete_after: bool = False, + ) -> tuple[bool, str | None, FileWithMetadataResponse | None]: + """Upload file with metadata to the API endpoint""" + if not self.client: + raise ValueError("NemesisClient is required") + + try: + if not os.access(file_path, os.R_OK): + raise PermissionError(f"No read permission for {file_path}") + + response = self.client.post_file(str(file_path), metadata) + + if isinstance(response, FileWithMetadataResponse): + return True, None, response + elif response is None: + return False, "Upload failed: No response from server", None + else: + return False, f"Upload failed: {response.detail}", None + + except PermissionError: + return False, f"Permission denied: {file_path}", None + except FileNotFoundError: + return False, f"File not found: {file_path}", None + except Exception as e: + return False, f"Unexpected error with {file_path}: {str(e)}", None + finally: + if delete_after: + try: + os.unlink(file_path) + except Exception as e: + logger.error(f"Failed to delete temporary file {file_path} in upload_file finally block: {e}") + + async def process_cobaltstrike_download(self, download: Download, beacon: Beacon) -> None: + """Process a download by either retrieving from local directory or downloading from C2. + + Args: + download: Download object containing download information + beacon: Beacon object containing host information + """ + logger.debug( + f"Processing Cobalt Strike download {download.id} from beacon {beacon.computer}. Path: {download.path}" + ) + + # Get the file from the downloads directory or API + downloaded_file_info = await self._get_downloaded_file(download) + if not downloaded_file_info.success: + self.mark_processed(download, success=False) + return + + # Upload the file to Nemesis + try: + # Use the beacon computer as the source identifier + source = f"host://{beacon.computer}" if beacon.computer else None + + metadata = FileMetadata( + agent_id="Cobalt Strike", + source=source, + project=self.project, + timestamp=datetime.now(UTC), + expiration=datetime.now(UTC).replace(year=datetime.now().year + 1), + path=str(download.path), + ) + success, error, response = await self.upload_file( + str(downloaded_file_info.file_path), + metadata, + delete_after=downloaded_file_info.delete_after, + ) + + if success and response: + logger.info( + f"Successfully uploaded {downloaded_file_info.file_path} - " + f"Object ID: {response.object_id}, Submission ID: {response.submission_id}" + ) + self.mark_processed(download, success=True, response=response) + else: + logger.error(f"Failed to upload {downloaded_file_info.file_path}: {error}") + self.mark_processed(download, success=False) + + except Exception as e: + logger.error(f"Error in process_download: {e}") + self.mark_processed(download, success=False) + finally: + self._cleanup_temp_file(downloaded_file_info.temp_file) + + async def _get_downloaded_file(self, download: Download) -> DownloadedFileInfo: + """Get the download file from C2 API. + + Returns: + FileInfo containing file path, whether to delete after, and success status + """ + # Use the API to download the file + return await self._download_from_c2(download) + + async def _download_from_c2(self, download: Download) -> DownloadedFileInfo: + """Download file from C2 server to temporary location. + + Returns: + FileInfo containing temporary file path and success status + """ + if not self.cobalt_strike: + raise ValueError("No CobaltStrikeClient configured for remote download") + + temp_fd, temp_path = tempfile.mkstemp(prefix=f"nemesis_{download.id}_") + os.close(temp_fd) + temp_file = Path(temp_path) + + logger.info(f"Downloading file from C2 to {temp_file}") + success = await self.cobalt_strike.download_file(download.id, str(temp_file)) + + if not success: + logger.error( + f"Failed to download file from C2. Download ID: {download.id}. Download Path: {download.path}" + ) + return DownloadedFileInfo(temp_file, delete_after=True, success=False) + + return DownloadedFileInfo(temp_file, delete_after=True, success=True) + + def _cleanup_temp_file(self, temp_file: Path | None) -> None: + """Clean up temporary file if it exists and hasn't been cleaned up already. + Silently ignores if the file doesn't exist, as it may have been cleaned up + by the upload_file method.""" + if temp_file: + try: + temp_file.unlink(missing_ok=True) + except Exception as e: + logger.warn(f"Note: Temporary file {temp_file} cleanup error: {e}") + + def close(self): + """Close the LevelDB connection""" + self.db.close() diff --git a/projects/cli/cli/config.py b/projects/cli/cli/config.py index e3152020..5afb38ab 100644 --- a/projects/cli/cli/config.py +++ b/projects/cli/cli/config.py @@ -91,6 +91,12 @@ def validate_path(cls, v): return None return Path(v) +class CobaltStrikeConfig(BaseConfig): + url: StrictHttpUrl + credential: PasswordCredential + project: str = Field(description="Project name for Nemesis file uploads") + poll_interval_sec: Annotated[int, Field(gt=0, description="Polling interval of the Cobalt Strike API in seconds")] = 3 + class Config(BaseConfig): cache_db_path: Path = Field(default_factory=lambda: Path("/tmp/connectors"), description="LevelDB cache path") @@ -100,8 +106,9 @@ class Config(BaseConfig): nemesis: NemesisConfig mythic: list[MythicConfig] | None = Field(default_factory=list) outflank: list[OutflankConfig] | None = Field(default_factory=list) + cobaltstrike: list[CobaltStrikeConfig] | None = Field(default_factory=list) - @field_validator("mythic", "outflank", mode="before") + @field_validator("mythic", "outflank", "cobaltstrike", mode="before") @classmethod def ensure_list(cls, v): if v is None: diff --git a/projects/cli/cli/main.py b/projects/cli/cli/main.py index b53ce646..98878fb9 100644 --- a/projects/cli/cli/main.py +++ b/projects/cli/cli/main.py @@ -8,6 +8,7 @@ from cli.monitor import monitor_main from cli.mythic_connector.mythic_connector import start from cli.stage1_connector.stage1_connector import run_outflank_connector +from cli.cobaltstrike_connector.cobaltstrike_connector import run_cobaltstrike_connector from cli.submit import submit_main @@ -17,6 +18,8 @@ def get_default_config_file(tool_name: str) -> str: return "settings_outflank.yaml" elif tool_name == "mythic": return "settings_mythic.yaml" + elif tool_name == "cobaltstrike": + return "settings_cobaltstrike.yaml" else: raise ValueError(f"Unknown tool name: {tool_name}") @@ -101,6 +104,27 @@ def connect_mythic(config: str, debug: bool, showconfig: bool) -> None: click.echo(f"Unexpected error: {e}") raise click.Abort() from e +@cli.command() +@connector_options("cobaltstrike") +def connect_cobaltstrike(config: str, debug: bool, showconfig: bool): + """Ingest Cobalt Strike data into Nemesis""" + + if showconfig: + with open(config) as f: + click.echo(f.read()) + sys.exit(0) + + logger = setup_logging(debug) + try: + # get the absolute path of the config file + abs_path = click.format_filename(config) + + logger.info("Starting Cobalt Strike connector using the config file: %s", abs_path) + cfg = load_config(config) + asyncio.run(run_cobaltstrike_connector(cfg, logger)) + except Exception as e: + logger.exception("Unhandled exception in connector", e) + sys.exit(1) def get_os_user_and_host_string() -> str: """Get the current OS user and hostname for metadata""" diff --git a/projects/cli/settings_cobaltstrike.yaml b/projects/cli/settings_cobaltstrike.yaml new file mode 100644 index 00000000..cc71f8d1 --- /dev/null +++ b/projects/cli/settings_cobaltstrike.yaml @@ -0,0 +1,19 @@ +cache_db_path: "/tmp/nemesis_connectors" +conn_timeout_sec: 5 +validate_https_certs: false + +nemesis: + url: "https://nemesis.example.com" + credential: + username: "connector_bot" + password: "pass" + expiration_days: 100 + max_file_size: 1000000000 + +cobaltstrike: + - url: "https://cobaltstrike.example.com" + credential: + username: "nemesis_bot" + password: "cobaltstrike_password" + project: "my-assessment" + poll_interval_sec: 3 From 72df4a429e898dcebb45b8fb86c746d671ae5a8f Mon Sep 17 00:00:00 2001 From: zurro Date: Tue, 16 Dec 2025 20:56:32 +0000 Subject: [PATCH 2/8] Updated cli readme --- projects/cli/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/projects/cli/README.md b/projects/cli/README.md index e08d93cd..637b8cdf 100644 --- a/projects/cli/README.md +++ b/projects/cli/README.md @@ -46,6 +46,13 @@ Ingest data from Outflank Stage1 C2 into Nemesis. - Uses `settings_outflank.yaml` configuration file - `--showconfig`: Display example configuration +### connect-cobaltstrike +Ingest data from Cobalt Strike into Nemesis. + +**Configuration:** +- Uses `settings_cobaltstrike.yaml` configuration file +- `--showconfig`: Display example configuration + ## Additional Tools - **stress_test**: Load testing tool for API performance evaluation From 2156fd772274ceae88cbe492bd04590bd47fbdc9 Mon Sep 17 00:00:00 2001 From: "Pablo A. Zurro" Date: Wed, 17 Dec 2025 00:19:53 +0100 Subject: [PATCH 3/8] Add Cobalt Strike to C2 connectors list in README Updated README to include Cobalt Strike in C2 connectors. --- projects/cli/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/cli/README.md b/projects/cli/README.md index 637b8cdf..4edde145 100644 --- a/projects/cli/README.md +++ b/projects/cli/README.md @@ -4,13 +4,13 @@ A command-line interface for the Nemesis platform that provides file submission, ## Purpose -This CLI tool serves as the primary interface for uploading files to Nemesis, monitoring directories for new files, and synchronizing data from C2 frameworks like Mythic and Outflank. +This CLI tool serves as the primary interface for uploading files to Nemesis, monitoring directories for new files, and synchronizing data from C2 frameworks like Mythic, Cobalt Strike and Outflank. ## Features - **File submission**: Upload single files or entire directories to Nemesis - **Directory monitoring**: Real-time monitoring of folders for new files -- **C2 connectors**: Synchronize data from Mythic and Outflank C2 frameworks +- **C2 connectors**: Synchronize data from Mythic, Cobalt Strike and Outflank C2 frameworks - **Stress testing**: Load testing capabilities for the Nemesis API - **Module testing**: Execute file enrichment modules standalone for development From 70c961e11e6f6e9d985b19cbc595a82f9f1d662d Mon Sep 17 00:00:00 2001 From: "Pablo A. Zurro" Date: Wed, 17 Dec 2025 00:22:50 +0100 Subject: [PATCH 4/8] Update usage guide to include Cobalt Strike --- docs/usage_guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/usage_guide.md b/docs/usage_guide.md index 58ad1aaa..f4a25ec4 100644 --- a/docs/usage_guide.md +++ b/docs/usage_guide.md @@ -24,7 +24,7 @@ For a general overview of the Nemesis project structure, see the [overview](over Once Nemesis is running, data first needs to be ingested into the platform. Ingestion into Nemesis can occur in multiple ways, including: -* [Auto-ingesting data from C2 platforms](#nemesis-c2-connector-setup), including Mythic and Outflank C2. +* [Auto-ingesting data from C2 platforms](#nemesis-c2-connector-setup), including Mythic, Cobalt Strike and Outflank C2. * [Manually uploading files on the "File Upload" page in the Nemesis Dashboard UI.](#manual-file-upload) * [Using the CLI tool](./cli.md) to: * [submit individual files or entire folders/subfolders](./cli.md#file-submission) @@ -33,7 +33,7 @@ Once Nemesis is running, data first needs to be ingested into the platform. Inge ### Nemesis C2 Connector Setup -Nemesis includes connectors for [Mythic](https://github.com/its-a-feature/Mythic) and Outflank C2 (formerly Stage1). The connectors hook into the C2 platforms and transfer data automatically into Nemesis. The connectors are located in the [CLI](https://github.com/SpecterOps/Nemesis/tree/main/projects/cli/cli/) project. +Nemesis includes connectors for [Mythic](https://github.com/its-a-feature/Mythic), Cobalt Strike and Outflank C2 (formerly Stage1). The connectors hook into the C2 platforms and transfer data automatically into Nemesis. The connectors are located in the [CLI](https://github.com/SpecterOps/Nemesis/tree/main/projects/cli/cli/) project. See the [CLI](./cli.md) documentation for more details on configuration. @@ -201,4 +201,4 @@ Navigating to the "Help" menu reachable in the bottom left of the Nemesis interf ![ReDoc API Documentation](images/api-redoc.png) -Additionally, the API documentation is dynamically rebuilt by GitHub actions when the relevant files are modified and published to [API](./api.md) for up-to-date offline access. \ No newline at end of file +Additionally, the API documentation is dynamically rebuilt by GitHub actions when the relevant files are modified and published to [API](./api.md) for up-to-date offline access. From 8a091a4b6945850820eebef3cbb1cfedb90cd6bb Mon Sep 17 00:00:00 2001 From: "Pablo A. Zurro" Date: Wed, 17 Dec 2025 00:27:08 +0100 Subject: [PATCH 5/8] Document Cobalt Strike Connector setup and usage Added Cobalt Strike Connector section with configuration and usage instructions. --- docs/cli.md | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/docs/cli.md b/docs/cli.md index 006c708c..41222d3f 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -275,6 +275,57 @@ docker run --rm -ti \ - **File Downloads**: Agent-collected files - **Screenshots**: Visual captures from agents +## Cobalt Strike Connector + +Ingest data from Cobalt Strike into Nemesis. + +### Configuration + +Create a configuration file (e.g., `settings_cobaltstrike.yaml`): + +```yaml +cache_db_path: "/tmp/nemesis_connectors" +conn_timeout_sec: 5 +validate_https_certs: true + +nemesis: + url: "https://nemesis.example.com" + credential: + username: "connector_bot" + password: "connector_password" + expiration_days: 100 + max_file_size: 1000000000 + +cobaltstrike: + - url: "https://cobaltstrike.example.com" + credential: + username: "nemesis_bot" + password: "cobaltstrike_password" + + project: "project name" + poll_interval_sec: 3 +``` + +### Usage + +```bash +# Run with mounted config file +docker run \ + --rm -ti \ + -v /path/to/settings_cobaltstrike.yaml:/config/settings_cobaltstrike.yaml \ + ghcr.io/specterops/nemesis/cli \ + connect-cobaltstrike -c /config/settings_cobaltstrike.yaml + +# Show example configuration +docker run --rm ghcr.io/specterops/nemesis/cli connect-cobaltstrike --showconfig + +# Enable debug logging +docker run --rm \ + -v /path/to/settings_cobaltstrike.yaml:/config/settings_cobaltstrike.yaml \ + ghcr.io/specterops/nemesis/cli \ + connect-cobaltstrike -c /config/settings_cobaltstrike.yaml --debug +``` + ## Outflank Connector Ingest data from Outflank Stage1 C2 into Nemesis. From 436f437656aa93480f3e0503add9b2cf58f526a7 Mon Sep 17 00:00:00 2001 From: zurro Date: Wed, 17 Dec 2025 09:29:16 +0000 Subject: [PATCH 6/8] Updated documentation on REST API startup and modified Path to include the file name --- docs/cli.md | 6 ++++++ .../cli/cli/cobaltstrike_connector/download_processor.py | 2 +- projects/cli/settings_cobaltstrike.yaml | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/cli.md b/docs/cli.md index 41222d3f..6c3a437b 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -279,6 +279,12 @@ docker run --rm -ti \ Ingest data from Cobalt Strike into Nemesis. +### Requirements + +- The Cobalt Strike API Server should be running. For information on starting the REST API server, see [Starting the REST API Server](https://hstechdocs.helpsystems.com/manuals/cobaltstrike/current/userguide/content/topics/welcome_starting-rest-server.htm). +- Cobalt Strike should be installed and configured. +- Cobalt Strike should be properly licensed + ### Configuration Create a configuration file (e.g., `settings_cobaltstrike.yaml`): diff --git a/projects/cli/cli/cobaltstrike_connector/download_processor.py b/projects/cli/cli/cobaltstrike_connector/download_processor.py index 0ba56c1c..57853cf6 100644 --- a/projects/cli/cli/cobaltstrike_connector/download_processor.py +++ b/projects/cli/cli/cobaltstrike_connector/download_processor.py @@ -154,7 +154,7 @@ async def process_cobaltstrike_download(self, download: Download, beacon: Beacon project=self.project, timestamp=datetime.now(UTC), expiration=datetime.now(UTC).replace(year=datetime.now().year + 1), - path=str(download.path), + path=f"{download.path}/{download.name}", ) success, error, response = await self.upload_file( str(downloaded_file_info.file_path), diff --git a/projects/cli/settings_cobaltstrike.yaml b/projects/cli/settings_cobaltstrike.yaml index cc71f8d1..2d009377 100644 --- a/projects/cli/settings_cobaltstrike.yaml +++ b/projects/cli/settings_cobaltstrike.yaml @@ -11,7 +11,7 @@ nemesis: max_file_size: 1000000000 cobaltstrike: - - url: "https://cobaltstrike.example.com" + - url: "https://cobaltstrike.example.com:50443" credential: username: "nemesis_bot" password: "cobaltstrike_password" From affedebfe6b726757e2cc108520a99b4a580f3ff Mon Sep 17 00:00:00 2001 From: "Pablo A. Zurro" Date: Wed, 17 Dec 2025 10:39:05 +0100 Subject: [PATCH 7/8] Update Cobalt Strike URL to include port 50443 --- docs/cli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cli.md b/docs/cli.md index 6c3a437b..0d116366 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -303,7 +303,7 @@ nemesis: max_file_size: 1000000000 cobaltstrike: - - url: "https://cobaltstrike.example.com" + - url: "https://cobaltstrike.example.com:50443" credential: username: "nemesis_bot" password: "cobaltstrike_password" From 9dabecee25c822d190c6405bc9569e6aada7f5e0 Mon Sep 17 00:00:00 2001 From: zurro Date: Thu, 18 Dec 2025 22:05:48 +0000 Subject: [PATCH 8/8] adding code 403 to force reauthentication to mathc with the CS response --- projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py b/projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py index 6401e0f7..56c22a38 100644 --- a/projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py +++ b/projects/cli/cli/cobaltstrike_connector/cobaltstrike_client.py @@ -187,7 +187,7 @@ async def wrapper(self: "CobaltStrikeClient", *args: P.args, **kwargs: P.kwargs) try: return await func(self, *args, **kwargs) except aiohttp.ClientResponseError as e: - if e.status == 401: # Unauthorized - token might be expired + if e.status in (401,403): # Unauthorized - token might be expired self.logger.info("Token appears to be expired, attempting reauthentication") if await self._reauthenticate(): self.logger.info("Reauthentication successful, retrying original request")