Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ jobs:
- name: Install pre-commit
run: pipx install pre-commit

- name: Install tox
run: pipx install tox

- name: Run linters
run: pre-commit run -a

Expand Down
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,12 @@ repos:
language: system
files: ^server/frontend/.*\.(js|mjs|cjs|vue)$
stages: [ pre-commit ]
- id: mypy
name: mypy
entry: tox -e mypy --
language: system
require_serial: true
files: ^(Collector|FTB|Reporter)/
exclude: (^|/)tests/
types: [python]
pass_filenames: false
83 changes: 46 additions & 37 deletions Collector/Collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import os
import shutil
import sys
from collections.abc import Iterator, Mapping
from tempfile import mkstemp
from typing import Any
from zipfile import ZipFile

from FTB.ProgramConfiguration import ProgramConfiguration
Expand All @@ -38,16 +40,16 @@
signature_checks,
)

__all__ = []
__all__: list[str] = []
__version__ = 0.1
__date__ = "2014-10-01"
__updated__ = "2025-04-08"
__updated__ = "2026-04-23"


class Collector(Reporter):
@remote_checks
@signature_checks
def refresh(self):
def refresh(self) -> None:
"""
Refresh signatures by contacting the server, downloading new signatures
and invalidating old ones.
Expand All @@ -68,12 +70,13 @@ def refresh(self):
os.remove(zipFileName)

@signature_checks
def refreshFromZip(self, zipFileName):
def refreshFromZip(self, zipFileName: str) -> None:
"""
Refresh signatures from a local zip file, adding new signatures
and invalidating old ones. (This is a non-standard use case;
you probably want to use refresh() instead.)
"""
assert self.sigCacheDir is not None
with ZipFile(zipFileName, "r") as zipFile:
if zipFile.testzip():
raise InvalidDataError(f"Bad CRC for downloaded zipfile {zipFileName}")
Expand All @@ -94,12 +97,12 @@ def refreshFromZip(self, zipFileName):
@remote_checks
def submit(
self,
crashInfo,
testCase=None,
testCaseQuality=0,
testCaseSize=None,
metaData=None,
):
crashInfo: CrashInfo,
testCase: str | None = None,
testCaseQuality: int = 0,
testCaseSize: int | None = None,
metaData: Mapping[str, Any] | None = None,
) -> Any:
"""
Submit the given crash information and an optional testcase/metadata
to the server for processing and storage.
Expand Down Expand Up @@ -131,7 +134,7 @@ def submit(

# Serialize our crash information, testcase and metadata into a dictionary to
# POST
data = {}
data: dict[str, Any] = {}

data["rawStdout"] = os.linesep.join(crashInfo.rawStdout)
data["rawStderr"] = os.linesep.join(crashInfo.rawStderr)
Expand All @@ -154,6 +157,7 @@ def submit(
if testcase_ext:
data["testcase_ext"] = testcase_ext

assert crashInfo.configuration is not None
data["platform"] = crashInfo.configuration.platform
data["product"] = crashInfo.configuration.product
data["os"] = crashInfo.configuration.os
Expand All @@ -165,7 +169,7 @@ def submit(
data["tool"] = self.tool

if crashInfo.configuration.metadata or metaData:
aggrMetaData = {}
aggrMetaData: dict[str, Any] = {}

if crashInfo.configuration.metadata:
aggrMetaData.update(crashInfo.configuration.metadata)
Expand All @@ -184,7 +188,7 @@ def submit(
return self.post(url, data).json()

@signature_checks
def search(self, crashInfo):
def search(self, crashInfo: CrashInfo) -> tuple[str | None, dict[str, Any] | None]:
"""
Searches within the local signature cache directory for a signature matching the
given crash.
Expand All @@ -196,7 +200,7 @@ def search(self, crashInfo):
@return: Tuple containing filename of the signature and metadata matching, or
None if no match.
"""

assert self.sigCacheDir is not None
cachedSigFiles = os.listdir(self.sigCacheDir)

for sigFile in cachedSigFiles:
Expand All @@ -210,7 +214,7 @@ def search(self, crashInfo):
crashSig = CrashSignature(sigData)
if crashSig.matches(crashInfo):
metadataFile = sigFile.replace(".signature", ".metadata")
metadata = None
metadata: dict[str, Any] | None = None
if os.path.exists(metadataFile):
with open(metadataFile) as m:
metadata = json.loads(m.read())
Expand All @@ -222,11 +226,11 @@ def search(self, crashInfo):
@signature_checks
def generate(
self,
crashInfo,
forceCrashAddress=None,
forceCrashInstruction=None,
numFrames=None,
):
crashInfo: CrashInfo,
forceCrashAddress: bool = False,
forceCrashInstruction: bool = False,
numFrames: int = 8,
) -> str | None:
"""
Generates a signature in the local cache directory. It will be deleted when
L{refresh} is called on the same local cache directory.
Expand Down Expand Up @@ -257,7 +261,7 @@ def generate(
return self.__store_signature_hashed(sig)

@remote_checks
def download(self, crashId):
def download(self, crashId: int) -> tuple[str, dict[str, Any]] | None:
"""
Download the testcase for the specified crashId.

Expand Down Expand Up @@ -300,7 +304,7 @@ def download(self, crashId):
return (local_filename, resp_json)

@remote_checks
def download_all(self, bucketId):
def download_all(self, bucketId: int) -> Iterator[str]:
"""
Download all testcases for the specified bucketId.

Expand All @@ -310,8 +314,10 @@ def download_all(self, bucketId):
@rtype: generator
@return: generator of filenames where tests were stored.
"""
params = {"query": json.dumps({"op": "OR", "bucket": bucketId})}
next_url = (
params: dict[str, str] | None = {
"query": json.dumps({"op": "OR", "bucket": bucketId})
}
next_url: str | None = (
f"{self.serverProtocol}://{self.serverHost}:{self.serverPort}"
"/crashmanager/rest/crashes/"
)
Expand Down Expand Up @@ -350,7 +356,7 @@ def download_all(self, bucketId):

yield local_filename

def __store_signature_hashed(self, signature):
def __store_signature_hashed(self, signature: CrashSignature) -> str:
"""
Store a signature, using the sha1 hash hex representation as filename.

Expand All @@ -361,19 +367,17 @@ def __store_signature_hashed(self, signature):
@return: Name of the file that the signature was written to

"""
assert self.sigCacheDir is not None
h = hashlib.new("sha1")
if str is bytes:
h.update(str(signature))
else:
h.update(str(signature).encode("utf-8"))
h.update(str(signature).encode("utf-8"))
sigfile = os.path.join(self.sigCacheDir, h.hexdigest() + ".signature")
with open(sigfile, "w") as f:
f.write(str(signature))

return sigfile

@staticmethod
def read_testcase(testCase):
def read_testcase(testCase: str) -> tuple[bytes, bool]:
"""
Read a testcase file, return the content and indicate if it is binary or not.

Expand All @@ -394,7 +398,7 @@ def read_testcase(testCase):
return (testCaseData, isBinary)


def main(args=None):
def main(args: list[str] | None = None) -> int:
"""Command line options."""
sentry_init()

Expand Down Expand Up @@ -686,7 +690,7 @@ def main(args=None):
if opts.testcase:
(testCaseData, isBinary) = Collector.read_testcase(opts.testcase)
if not isBinary:
crashInfo.testcase = testCaseData
crashInfo.testcase = testCaseData.decode("utf-8")

serverauthtoken = None
if opts.serverauthtokenfile:
Expand All @@ -708,23 +712,26 @@ def main(args=None):
return 0

if opts.submit:
assert crashInfo is not None
testcase = opts.testcase
collector.submit(
crashInfo, testcase, opts.testcasequality, opts.testcasesize, metadata
)
return 0

if opts.search:
(sig, metadata) = collector.search(crashInfo)
assert crashInfo is not None
(sig, sigMetadata) = collector.search(crashInfo)
if sig is None:
print("No match found", file=sys.stderr)
return 3
print(sig)
if metadata:
print(json.dumps(metadata, indent=4))
if sigMetadata:
print(json.dumps(sigMetadata, indent=4))
return 0

if opts.generate:
assert crashInfo is not None
sigFile = collector.generate(
crashInfo, opts.forcecrashaddr, opts.forcecrashinst, opts.numframes
)
Expand All @@ -738,6 +745,7 @@ def main(args=None):
return 0

if opts.autosubmit:
assert configuration is not None
runner = AutoRunner.fromBinaryArgs(opts.rargs[0], opts.rargs[1:], env=env)
if runner.run():
crashInfo = runner.getCrashInfo(configuration)
Expand All @@ -752,10 +760,11 @@ def main(args=None):
return 1

if opts.download:
(retFile, retJSON) = collector.download(opts.download)
if not retFile:
downloadResult = collector.download(opts.download)
if downloadResult is None:
print("Specified crash entry does not have a testcase", file=sys.stderr)
return 1
retFile, retJSON = downloadResult

if retJSON.get("args"):
args = json.loads(retJSON["args"])
Expand Down
Empty file added Collector/py.typed
Empty file.
Loading
Loading