-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
418 lines (379 loc) · 18 KB
/
database.py
File metadata and controls
418 lines (379 loc) · 18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
"""Database schema and operations for memory forensics artifacts"""
import aiosqlite
from pathlib import Path
from typing import Optional, List, Dict, Any
import json
class ForensicsDatabase:
"""Manages SQLite database for processed memory artifacts"""
def __init__(self, db_path: Path):
self.db_path = db_path
async def initialize(self):
"""Create database schema"""
async with aiosqlite.connect(self.db_path) as db:
# Memory dumps metadata
await db.execute("""
CREATE TABLE IF NOT EXISTS dumps (
dump_id TEXT PRIMARY KEY,
file_path TEXT NOT NULL,
file_size INTEGER,
os_type TEXT,
os_version TEXT,
last_processed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'new'
)
""")
# Processes
await db.execute("""
CREATE TABLE IF NOT EXISTS processes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dump_id TEXT NOT NULL,
pid INTEGER NOT NULL,
ppid INTEGER,
name TEXT,
path TEXT,
cmdline TEXT,
create_time TEXT,
exit_time TEXT,
is_hidden BOOLEAN DEFAULT 0,
is_suspicious BOOLEAN DEFAULT 0,
FOREIGN KEY (dump_id) REFERENCES dumps(dump_id),
UNIQUE(dump_id, pid)
)
""")
# Memory regions (for injection detection)
await db.execute("""
CREATE TABLE IF NOT EXISTS memory_regions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dump_id TEXT NOT NULL,
pid INTEGER NOT NULL,
base_address TEXT,
size INTEGER,
protection TEXT,
is_file_backed BOOLEAN,
backing_file TEXT,
is_suspicious BOOLEAN DEFAULT 0,
FOREIGN KEY (dump_id) REFERENCES dumps(dump_id)
)
""")
# Network connections
await db.execute("""
CREATE TABLE IF NOT EXISTS network_connections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dump_id TEXT NOT NULL,
pid INTEGER,
local_addr TEXT,
local_port INTEGER,
remote_addr TEXT,
remote_port INTEGER,
state TEXT,
protocol TEXT,
FOREIGN KEY (dump_id) REFERENCES dumps(dump_id),
UNIQUE(dump_id, pid, local_addr, local_port, remote_addr, remote_port, protocol)
)
""")
# DLLs
await db.execute("""
CREATE TABLE IF NOT EXISTS dlls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dump_id TEXT NOT NULL,
pid INTEGER NOT NULL,
base_address TEXT,
size INTEGER,
name TEXT,
path TEXT,
FOREIGN KEY (dump_id) REFERENCES dumps(dump_id)
)
""")
# Command execution log
await db.execute("""
CREATE TABLE IF NOT EXISTS command_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dump_id TEXT NOT NULL,
plugin_name TEXT NOT NULL,
command_line TEXT NOT NULL,
parameters TEXT,
executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
execution_time_ms INTEGER,
row_count INTEGER,
success BOOLEAN DEFAULT 1,
error_message TEXT,
FOREIGN KEY (dump_id) REFERENCES dumps(dump_id)
)
""")
# File hashes for dumps
await db.execute("""
CREATE TABLE IF NOT EXISTS dump_hashes (
dump_id TEXT PRIMARY KEY,
md5 TEXT,
sha1 TEXT,
sha256 TEXT,
calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (dump_id) REFERENCES dumps(dump_id)
)
""")
# Extracted files tracking
await db.execute("""
CREATE TABLE IF NOT EXISTS extracted_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dump_id TEXT NOT NULL,
extraction_type TEXT NOT NULL,
source_pid INTEGER,
source_address TEXT,
output_path TEXT NOT NULL,
file_size INTEGER,
file_hash_sha256 TEXT,
extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (dump_id) REFERENCES dumps(dump_id)
)
""")
# Create indexes for performance
await db.execute("CREATE INDEX IF NOT EXISTS idx_processes_dump_pid ON processes(dump_id, pid)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_processes_name ON processes(name)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_network_dump ON network_connections(dump_id)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_memory_regions_dump_pid ON memory_regions(dump_id, pid)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_command_log_dump ON command_log(dump_id)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_command_log_timestamp ON command_log(executed_at)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_extracted_dump ON extracted_files(dump_id)")
await db.commit()
async def add_dump(self, dump_id: str, file_path: str, file_size: int, os_type: str = None):
"""Register a new memory dump"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"INSERT OR REPLACE INTO dumps (dump_id, file_path, file_size, os_type) VALUES (?, ?, ?, ?)",
(dump_id, file_path, file_size, os_type)
)
await db.commit()
async def get_dump(self, dump_id: str) -> Optional[Dict[str, Any]]:
"""Get dump metadata"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM dumps WHERE dump_id = ?", (dump_id,)) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def list_dumps(self) -> List[Dict[str, Any]]:
"""List all registered dumps"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM dumps ORDER BY last_processed DESC") as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def add_processes(self, dump_id: str, processes: List[Dict[str, Any]]):
"""Bulk insert processes"""
async with aiosqlite.connect(self.db_path) as db:
for proc in processes:
await db.execute("""
INSERT OR REPLACE INTO processes
(dump_id, pid, ppid, name, path, cmdline, create_time, is_hidden, is_suspicious)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
dump_id,
proc.get('pid'),
proc.get('ppid'),
proc.get('name'),
proc.get('path'),
proc.get('cmdline'),
proc.get('create_time'),
proc.get('is_hidden', False),
proc.get('is_suspicious', False)
))
await db.commit()
async def get_processes(self, dump_id: str, suspicious_only: bool = False) -> List[Dict[str, Any]]:
"""Get processes for a dump"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
query = "SELECT * FROM processes WHERE dump_id = ?"
params = [dump_id]
if suspicious_only:
query += " AND is_suspicious = 1"
query += " ORDER BY pid"
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_process_by_pid(self, dump_id: str, pid: int) -> Optional[Dict[str, Any]]:
"""Get specific process details"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM processes WHERE dump_id = ? AND pid = ?",
(dump_id, pid)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def clear_network_connections(self, dump_id: str):
"""Clear network connections for a dump before reprocessing"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"DELETE FROM network_connections WHERE dump_id = ?",
(dump_id,)
)
await db.commit()
async def add_network_connections(self, dump_id: str, connections: List[Dict[str, Any]]):
"""Bulk insert network connections"""
async with aiosqlite.connect(self.db_path) as db:
for conn in connections:
await db.execute("""
INSERT OR IGNORE INTO network_connections
(dump_id, pid, local_addr, local_port, remote_addr, remote_port, state, protocol)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
dump_id,
conn.get('pid'),
conn.get('local_addr'),
conn.get('local_port'),
conn.get('remote_addr'),
conn.get('remote_port'),
conn.get('state'),
conn.get('protocol')
))
await db.commit()
async def get_network_connections(self, dump_id: str, pid: Optional[int] = None) -> List[Dict[str, Any]]:
"""Get network connections"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
if pid:
query = "SELECT * FROM network_connections WHERE dump_id = ? AND pid = ?"
params = (dump_id, pid)
else:
query = "SELECT * FROM network_connections WHERE dump_id = ?"
params = (dump_id,)
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def clear_memory_regions(self, dump_id: str):
"""Clear memory regions for a dump before reprocessing"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"DELETE FROM memory_regions WHERE dump_id = ?",
(dump_id,)
)
await db.commit()
async def add_memory_regions(self, dump_id: str, regions: List[Dict[str, Any]]):
"""Bulk insert memory regions"""
async with aiosqlite.connect(self.db_path) as db:
for region in regions:
await db.execute("""
INSERT INTO memory_regions
(dump_id, pid, base_address, size, protection, is_file_backed, backing_file, is_suspicious)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
dump_id,
region.get('pid'),
region.get('base_address'),
region.get('size'),
region.get('protection'),
region.get('is_file_backed', False),
region.get('backing_file'),
region.get('is_suspicious', False)
))
await db.commit()
async def get_suspicious_memory_regions(self, dump_id: str, pid: Optional[int] = None) -> List[Dict[str, Any]]:
"""Get suspicious memory regions (potential injection)"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
query = "SELECT * FROM memory_regions WHERE dump_id = ? AND is_suspicious = 1"
params = [dump_id]
if pid:
query += " AND pid = ?"
params.append(pid)
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def add_command_log(self, dump_id: str, plugin_name: str, command_line: str,
parameters: str = None, execution_time_ms: int = 0,
row_count: int = 0, success: bool = True, error_message: str = None):
"""Log a Volatility command execution"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO command_log
(dump_id, plugin_name, command_line, parameters, execution_time_ms, row_count, success, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (dump_id, plugin_name, command_line, parameters, execution_time_ms, row_count, success, error_message))
await db.commit()
async def get_command_history(self, dump_id: str, limit: int = 50) -> List[Dict[str, Any]]:
"""Get command execution history for a dump"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM command_log
WHERE dump_id = ?
ORDER BY executed_at DESC
LIMIT ?
""", (dump_id, limit)) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_command_stats(self, dump_id: str) -> Dict[str, Any]:
"""Get command execution statistics for a dump"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT
COUNT(*) as total_commands,
SUM(row_count) as total_rows,
AVG(execution_time_ms) as avg_execution_time,
SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as failed_commands
FROM command_log
WHERE dump_id = ?
""", (dump_id,)) as cursor:
row = await cursor.fetchone()
return dict(row) if row else {}
async def store_dump_hashes(self, dump_id: str, hashes: Dict[str, str]):
"""Store file hashes for a dump"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT OR REPLACE INTO dump_hashes (dump_id, md5, sha1, sha256)
VALUES (?, ?, ?, ?)
""", (dump_id, hashes.get('md5'), hashes.get('sha1'), hashes.get('sha256')))
await db.commit()
async def get_dump_hashes(self, dump_id: str) -> Optional[Dict[str, str]]:
"""Get cached file hashes for a dump"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT md5, sha1, sha256, calculated_at
FROM dump_hashes
WHERE dump_id = ?
""", (dump_id,)) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def add_extracted_file(self, dump_id: str, extraction_type: str,
output_path: str, file_size: int,
file_hash_sha256: str, source_pid: int = None,
source_address: str = None):
"""Track an extracted file"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO extracted_files
(dump_id, extraction_type, source_pid, source_address, output_path, file_size, file_hash_sha256)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (dump_id, extraction_type, source_pid, source_address, output_path, file_size, file_hash_sha256))
await db.commit()
async def get_extracted_files(self, dump_id: str) -> List[Dict[str, Any]]:
"""Get all extracted files for a dump"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM extracted_files
WHERE dump_id = ?
ORDER BY extracted_at DESC
""", (dump_id,)) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def mark_process_suspicious(self, dump_id: str, pid: int, is_suspicious: bool = True):
"""Mark a specific process as suspicious or not"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
UPDATE processes
SET is_suspicious = ?
WHERE dump_id = ? AND pid = ?
""", (is_suspicious, dump_id, pid))
await db.commit()
async def mark_processes_suspicious(self, dump_id: str, pids: List[int]):
"""Bulk mark multiple processes as suspicious"""
async with aiosqlite.connect(self.db_path) as db:
for pid in pids:
await db.execute("""
UPDATE processes
SET is_suspicious = 1
WHERE dump_id = ? AND pid = ?
""", (dump_id, pid))
await db.commit()