Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master (unreleased)

### New Features
- freeze: add `--reproducible` flag that zeros dynamic header metadata

### Breaking Changes

Expand Down Expand Up @@ -131,6 +132,7 @@
- ci: use explicit and per job permissions @mike-hunhoff #3002
- replace black/isort/flake8 with ruff @mike-hunhoff #2992
- ci: update GitHub Actions to support Node.js 24 (deprecate Node.js 20) @mr-tz #2984
- tests: add snapshot tests for feature extraction @williballenthin #3069

### Raw diffs
- [capa v9.4.0...master](https://github.com/mandiant/capa/compare/v9.4.0...master)
Expand Down
2 changes: 1 addition & 1 deletion capa/features/extractors/dotnetfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def extract_file_namespace_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple
# namespaces may be empty, discard
namespaces.discard("")

for namespace in namespaces:
for namespace in sorted(namespaces):
# namespace do not have an associated token, so we yield 0x0
yield Namespace(namespace), NO_ADDRESS

Expand Down
93 changes: 77 additions & 16 deletions capa/features/freeze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ def from_capa(cls, a: capa.features.address.Address) -> "Address":
return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid))

elif isinstance(a, capa.features.address.DynamicCallAddress):
return cls(
type=AddressType.CALL,
value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id),
)
return cls(type=AddressType.CALL, value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id))

elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress):
return cls(type=AddressType.NO_ADDRESS, value=None)
Expand Down Expand Up @@ -346,9 +343,14 @@ class Freeze(BaseModel):
model_config = ConfigDict(populate_by_name=True)


def dumps_static(extractor: StaticFeatureExtractor) -> str:
def dumps_static(extractor: StaticFeatureExtractor, reproducible: bool = False) -> str:
"""
serialize the given extractor to a string

When `reproducible` is true, the freeze's dynamic header metadata (e.g. the
embedded capa version) is zeroed out so that output is identical across
capa versions for a given extractor. This is used by the feature snapshot
tests to keep fixtures stable across version bumps.
"""
global_features: list[GlobalFeature] = []
for feature, _ in extractor.extract_global_features():
Expand All @@ -357,6 +359,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
feature=feature_from_capa(feature),
)
)
global_features.sort(key=lambda gf: gf.feature.model_dump_json())

file_features: list[FileFeature] = []
for feature, address in extractor.extract_file_features():
Expand All @@ -366,6 +369,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
address=Address.from_capa(address),
)
)
file_features.sort(key=lambda ff: (ff.address, ff.feature.model_dump_json()))

function_features: list[FunctionFeatures] = []
for f in extractor.get_functions():
Expand All @@ -378,6 +382,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_function_features(f)
]
ffeatures.sort(key=lambda ff: (ff.address, ff.feature.model_dump_json()))

basic_blocks = []
for bb in extractor.get_basic_blocks(f):
Expand All @@ -390,6 +395,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_basic_block_features(f, bb)
]
bbfeatures.sort(key=lambda bf: (bf.address, bf.feature.model_dump_json()))

instructions = []
for insn in extractor.get_instructions(f, bb):
Expand All @@ -402,6 +408,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_insn_features(f, bb, insn)
]
ifeatures.sort(key=lambda i: (i.address, i.feature.model_dump_json()))

instructions.append(
InstructionFeatures(
Expand All @@ -410,6 +417,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
)

instructions.sort(key=lambda i: i.address)
basic_blocks.append(
BasicBlockFeatures(
address=bbaddr,
Expand All @@ -418,6 +426,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
)

basic_blocks.sort(key=lambda bb: bb.address)
function_features.append(
FunctionFeatures(
address=faddr,
Expand All @@ -426,28 +435,33 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
)

function_features.sort(key=lambda ff: ff.address)

features = StaticFeatures(
global_=global_features, # type: ignore[call-arg] # pydantic alias "global" not recognized by type checkers
file=tuple(file_features),
functions=tuple(function_features),
)

extractor_version = "" if reproducible else capa.version.__version__
freeze = Freeze(
version=CURRENT_VERSION,
base_address=Address.from_capa(extractor.get_base_address()), # type: ignore[call-arg] # pydantic alias "base address" not recognized by type checkers
sample_hashes=extractor.get_sample_hashes(),
flavor="static",
extractor=Extractor(name=extractor.__class__.__name__),
extractor=Extractor(name=extractor.__class__.__name__, version=extractor_version),
features=features,
)
# type checkers are unable to recognise `base_address` as an argument due to alias

return freeze.model_dump_json()


def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
def dumps_dynamic(extractor: DynamicFeatureExtractor, reproducible: bool = False) -> str:
"""
serialize the given extractor to a string

See `dumps_static` for `reproducible`.
"""
global_features: list[GlobalFeature] = []
for feature, _ in extractor.extract_global_features():
Expand All @@ -456,6 +470,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
feature=feature_from_capa(feature),
)
)
global_features.sort(key=lambda gf: gf.feature.model_dump_json())

file_features: list[FileFeature] = []
for feature, address in extractor.extract_file_features():
Expand All @@ -465,6 +480,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
address=Address.from_capa(address),
)
)
file_features.sort(key=lambda ff: (ff.address, ff.feature.model_dump_json()))

process_features: list[ProcessFeatures] = []
for p in extractor.get_processes():
Expand All @@ -478,6 +494,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_process_features(p)
]
pfeatures.sort(key=lambda pf: (pf.address, pf.feature.model_dump_json()))

threads = []
for t in extractor.get_threads(p):
Expand All @@ -490,6 +507,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_thread_features(p, t)
]
tfeatures.sort(key=lambda tf: (tf.address, tf.feature.model_dump_json()))

calls = []
for call in extractor.get_calls(p, t):
Expand All @@ -503,6 +521,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_call_features(p, t, call)
]
cfeatures.sort(key=lambda cf: (cf.address, cf.feature.model_dump_json()))

calls.append(
CallFeatures(
Expand All @@ -512,6 +531,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
)

calls.sort(key=lambda c: c.address)
threads.append(
ThreadFeatures(
address=taddr,
Expand All @@ -520,6 +540,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
)

threads.sort(key=lambda t: t.address)
process_features.append(
ProcessFeatures(
address=paddr,
Expand All @@ -529,6 +550,8 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
)

process_features.sort(key=lambda pf: pf.address)

features = DynamicFeatures(
global_=global_features, # type: ignore[call-arg] # pydantic alias "global" not recognized by type checkers
file=tuple(file_features),
Expand All @@ -539,12 +562,13 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
get_base_addr = getattr(extractor, "get_base_address", None)
base_addr = get_base_addr() if get_base_addr else capa.features.address.NO_ADDRESS

extractor_version = "" if reproducible else capa.version.__version__
freeze = Freeze(
version=CURRENT_VERSION,
base_address=Address.from_capa(base_addr), # type: ignore[call-arg] # pydantic alias "base address" not recognized by type checkers
sample_hashes=extractor.get_sample_hashes(),
flavor="dynamic",
extractor=Extractor(name=extractor.__class__.__name__),
extractor=Extractor(name=extractor.__class__.__name__, version=extractor_version),
features=features,
)
# type checkers are unable to recognise `base_address` as an argument due to alias
Expand Down Expand Up @@ -627,28 +651,28 @@ def loads_dynamic(s: str) -> DynamicFeatureExtractor:
MAGIC = "capa0000".encode("ascii")
Comment thread
mr-tz marked this conversation as resolved.


def dumps(extractor: FeatureExtractor) -> str:
def dumps(extractor: FeatureExtractor, reproducible: bool = False) -> str:
"""serialize the given extractor to a string."""
if isinstance(extractor, StaticFeatureExtractor):
doc = dumps_static(extractor)
doc = dumps_static(extractor, reproducible=reproducible)
elif isinstance(extractor, DynamicFeatureExtractor):
doc = dumps_dynamic(extractor)
doc = dumps_dynamic(extractor, reproducible=reproducible)
else:
raise ValueError("Invalid feature extractor")

return doc


def dump(extractor: FeatureExtractor) -> bytes:
def dump(extractor: FeatureExtractor, reproducible: bool = False) -> bytes:
"""serialize the given extractor to a byte array."""
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8"))
return MAGIC + zlib.compress(dumps(extractor, reproducible=reproducible).encode("utf-8"))


def is_freeze(buf: bytes) -> bool:
return buf[: len(MAGIC)] == MAGIC


def loads(s: str):
def loads(s: str) -> FeatureExtractor:
doc = json.loads(s)

if doc["version"] != CURRENT_VERSION:
Expand All @@ -662,7 +686,7 @@ def loads(s: str):
raise ValueError(f"unsupported freeze format flavor: {doc['flavor']}")


def load(buf: bytes):
def load(buf: bytes) -> FeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a byte array."""
if not is_freeze(buf):
raise ValueError("missing magic header")
Expand All @@ -685,6 +709,11 @@ def main(argv=None):
parser = argparse.ArgumentParser(description="save capa features to a file")
capa.main.install_common_args(parser, {"input_file", "format", "backend", "os", "signatures"})
parser.add_argument("output", type=str, help="Path to output file")
parser.add_argument(
"--reproducible",
action="store_true",
help="zero out dynamic header metadata (e.g. capa version) so output is stable across capa versions",
)
args = parser.parse_args(args=argv)

try:
Expand All @@ -696,11 +725,43 @@ def main(argv=None):
except capa.main.ShouldExitError as e:
return e.status_code

Path(args.output).write_bytes(dump(extractor))
output_path = Path(args.output)
output_path.write_bytes(dump(extractor, reproducible=args.reproducible))

# Log a manifest entry for the feature snapshot tests at INFO level. This
# makes it easy to copy/paste into
# `tests/fixtures/snapshots/features/manifest.json` when adding a new
# fixture or refreshing an existing one.
entry: dict[str, str] = {
"name": output_path.stem,
"sample": str(args.input_file),
"freeze": output_path.name,
}
if args.format and args.format != "auto":
entry["format"] = args.format
if args.backend and args.backend != "auto":
entry["backend"] = args.backend
if args.os and args.os != "auto":
entry["os"] = args.os
commit = _git_head_commit()
if commit:
entry["generated_at_commit"] = commit
logger.info("manifest entry: %s", json.dumps(entry))

return 0


def _git_head_commit() -> str:
"""Return the HEAD commit, or empty string if this isn't a git checkout."""
import subprocess

try:
out = subprocess.check_output(["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL)
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
return ""
return out.decode("ascii", errors="replace").strip()


if __name__ == "__main__":
import sys

Expand Down
19 changes: 19 additions & 0 deletions capa/features/freeze/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

from capa.features.freeze import main

sys.exit(main())
Loading
Loading