forked from PyCQA/pylint-pytest
-
Notifications
You must be signed in to change notification settings - Fork 4
Open
Description
Bug description
When parsing the following a.py:
"""Data source abstractions for BOFHound parsing pipeline."""
import sys
from abc import ABC, abstractmethod
import logging
from typing import Iterator, List, AsyncIterator
from typing_extensions import override
import glob
import os
from pathlib import Path
from mythic import mythic
from syncer import sync
import base64
from bofhound.logger import logger
class DataSource(ABC):
"""Abstract base class for data sources that provide lines to parse."""
@abstractmethod
def get_data_streams(self) -> Iterator['DataStream']:
"""Return an iterator of data streams to process."""
pass
class DataStream(ABC):
"""Abstract base class representing a single stream of data to parse."""
@property
@abstractmethod
def identifier(self) -> str:
"""Unique identifier for this data stream (e.g., filename, callback ID)."""
pass
@abstractmethod
def lines(self) -> Iterator[str]:
"""Return an iterator of lines from this data stream."""
pass
def __str__(self) -> str:
return self.identifier
class FileDataSource(DataSource):
"""Data source that reads from local files."""
def __init__(self, input_path: str, filename_pattern: str = "*.log"):
self.input_path = input_path
self.filename_pattern = filename_pattern
def get_data_streams(self) -> Iterator['FileDataStream']:
"""Get file-based data streams."""
if os.path.isfile(self.input_path):
yield FileDataStream(self.input_path)
elif os.path.isdir(self.input_path):
pattern = f"{self.input_path}/**/{self.filename_pattern}"
files = glob.glob(pattern, recursive=True)
files.sort(key=os.path.getmtime)
for file_path in files:
yield FileDataStream(file_path)
else:
raise ValueError(f"Input path does not exist: {self.input_path}")
class FileDataStream(DataStream):
"""Data stream that reads from a local file."""
def __init__(self, file_path: str):
self.file_path = file_path
@property
def identifier(self) -> str:
return self.file_path
def lines(self) -> Iterator[str]:
"""Read lines from the file."""
with open(self.file_path, 'r', encoding='utf-8') as f:
for line in f:
yield line.rstrip('\n\r')
class MythicCallback:
"""
Quick and dirty class to hold Mythic callback information
and allow print statments from the main logic to still work
"""
def __init__(self, callback, mythic_instance=None):
self.callback_id = callback["id"]
self.display_id = callback["display_id"]
self.domain = callback["domain"]
self.user = callback["user"]
self.host = callback["host"]
self.uuid = callback["agent_callback_id"]
self._mythic_instance = mythic_instance
def __repr__(self):
return f"Mythic callback {self.callback_id} [{self.uuid}]"
class MythicDataSource(DataSource):
"""Data source that fetches data from Mythic server."""
def __init__(self, mythic_server: str, mythic_token: str):
self.mythic_server = mythic_server
self.mythic_token = mythic_token
self._mythic_instance = None
def _connect(self):
logger.debug("Logging into Mythic...")
try:
self._mythic_instance = sync(mythic.login(
apitoken=self.mythic_token,
server_ip=self.mythic_server,
server_port=7443,
timeout=-1,
logging_level=logging.CRITICAL,
))
except Exception as e:
logger.error("Error logging into Mythic")
logger.error(e)
sys.exit(-1)
logger.debug("Logged into Mythic successfully")
def _get_callbacks(self) -> Iterator[MythicCallback]:
logger.debug("Retrieving callbacks from Mythic...")
return_attributes = [
"id",
"display_id",
"domain",
"user",
"host",
"agent_callback_id"
]
try:
if not self._mythic_instance:
self._connect()
raw_callbacks = sync(mythic.get_all_callbacks(
self._mythic_instance,
custom_return_attributes=",".join(return_attributes)
))
for callback in raw_callbacks:
yield MythicCallback(callback, self._mythic_instance)
except Exception as e:
logger.error("Error retrieving callbacks from Mythic")
logger.error(e)
sys.exit(-1)
@override
def get_data_streams(self) -> Iterator['MythicDataStream']:
"""
Get Mythic callback data streams.
For mythic, instead of processing individual log "files"
we will processes the taskings given to individual callbacks
"""
for callback in self._get_callbacks():
yield MythicDataStream(callback)
class MythicDataStream(DataStream):
"""Data stream that reads from a Mythic callback's task outputs."""
def __init__(self, callback: MythicCallback):
self.callback = callback
@property
def identifier(self) -> str:
return f"mythic_callback_{self.callback.callback_id}"
def lines(self) -> Iterator[str]:
"""Get lines from Mythic callback task outputs."""
# Get all tasks for this callback
tasks = self._get_tasks()
for task in tasks:
# Get task output
outputs = self._get_task_output(task["display_id"])
for output in outputs:
# Decode and yield each line
try:
decoded_data = base64.b64decode(output["response_text"]).decode("utf-8")
for line in decoded_data.splitlines():
if line.strip(): # Skip empty lines
yield line
except Exception:
continue # Skip malformed responses
def _get_tasks(self):
"""Get tasks for the callback."""
return sync(mythic.get_all_tasks(
self.callback._mythic_instance,
callback_display_id=self.callback.display_id
))
def _get_task_output(self, task_id):
"""Get output for a specific task."""
return sync(mythic.get_all_task_output_by_id(
self.callback._mythic_instance,
task_id
))
Command used
pylint a.pyPylint output
pylint crashed with a ``AstroidError`` and with the following stacktrace:
Traceback (most recent call last):
File "/Users/coffeegist/.vscode/extensions/ms-python.pylint-2025.3.12271016/bundled/libs/pylint/lint/pylinter.py", line 788, in _lint_file
check_astroid_module(module)
File "/Users/coffeegist/.vscode/extensions/ms-python.pylint-2025.3.12271016/bundled/libs/pylint/lint/pylinter.py", line 1020, in check_astroid_module
retval = self._check_astroid_module(
File "/Users/coffeegist/.vscode/extensions/ms-python.pylint-2025.3.12271016/bundled/libs/pylint/lint/pylinter.py", line 1072, in _check_astroid_module
walker.walk(node)
File "/Users/coffeegist/.vscode/extensions/ms-python.pylint-2025.3.12271016/bundled/libs/pylint/utils/ast_walker.py", line 87, in walk
callback(astroid)
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pylint_pytest/checkers/fixture.py", line 129, in visit_module
ret = pytest.main(
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/_pytest/config/__init__.py", line 150, in main
arguments directly from the process command line (:data:`sys.argv`).
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/_pytest/config/__init__.py", line 331, in _prepareconfig
elif not isinstance(args, list):
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_hooks.py", line 512, in __call__
return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_manager.py", line 120, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_callers.py", line 167, in _multicall
raise exception
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_callers.py", line 139, in _multicall
teardown.throw(exception)
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_callers.py", line 43, in run_old_style_hookwrapper
teardown.send(result)
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/_pytest/helpconfig.py", line 104, in pytest_cmdline_parse
@pytest.hookimpl(wrapper=True)
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_result.py", line 103, in get_result
raise exc.with_traceback(tb)
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_callers.py", line 38, in run_old_style_hookwrapper
res = yield
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_callers.py", line 121, in _multicall
res = hook_impl.function(*args)
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/_pytest/config/__init__.py", line 1075, in pytest_cmdline_parse
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/_pytest/config/__init__.py", line 1425, in parse
finally:
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/_pytest/config/__init__.py", line 1305, in _preparse
for name in _iter_rewritable_modules(package_files):
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pluggy/_manager.py", line 416, in load_setuptools_entrypoints
plugin = ep.load()
File "/Users/coffeegist/.pyenv/versions/3.10.18/lib/python3.10/importlib/metadata/__init__.py", line 171, in load
module = import_module(match.group('module'))
File "/Users/coffeegist/.pyenv/versions/3.10.18/lib/python3.10/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/_pytest/assertion/rewrite.py", line 186, in exec_module
it.
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pytest_asyncio/__init__.py", line 7, in <module>
from .plugin import fixture, is_async_test
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/_pytest/assertion/rewrite.py", line 186, in exec_module
it.
File "/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pytest_asyncio/plugin.py", line 38, in <module>
from pytest import (
ImportError: cannot import name 'FixtureDef' from 'pytest' (/Users/coffeegist/Library/Caches/pypoetry/virtualenvs/bofhound-h-QmirpJ-py3.10/lib/python3.10/site-packages/pytest/__init__.py)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/coffeegist/.vscode/extensions/ms-python.pylint-2025.3.12271016/bundled/libs/pylint/lint/pylinter.py", line 752, in _lint_files
self._lint_file(fileitem, module, check_astroid_module)
File "/Users/coffeegist/.vscode/extensions/ms-python.pylint-2025.3.12271016/bundled/libs/pylint/lint/pylinter.py", line 790, in _lint_file
raise astroid.AstroidError from e
astroid.exceptions.AstroidErrorExpected behavior
No crash.
Pylint version
pylint 3.3.4
astroid 3.3.8
Python 3.10.18 (main, Sep 29 2025, 14:13:44) [Clang 17.0.0 (clang-1700.0.13.5)]OS / Environment
darwin (Darwin)
Metadata
Metadata
Assignees
Labels
No labels