Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions scripts/artifacts/ConnectedDeviceInformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,13 @@ def connected_device_info_device_history(context):
return data_headers, data_list, source_path


import sqlite3 # <--- PENTING: Tambahkan ini di bagian paling atas file jika belum ada

@artifact_processor
def connected_device_info_consolidated_connected_device_history(context):
def connected_device_info_device_history(context):
"""
Extracts and consolidates device connection history from
'healthdb_secure.sqlite'.
Extracts and processes historical information about connected devices from
the `healthdb_secure.sqlite` database.
"""

files_found = context.get_files_found()
Expand All @@ -120,33 +122,49 @@ def connected_device_info_consolidated_connected_device_history(context):
SELECT
MIN(objects.creation_date),
MAX(objects.creation_date),
data_provenances.origin_product_type
data_provenances.origin_product_type,
source_id,
data_provenances.origin_build
FROM objects
LEFT OUTER JOIN data_provenances ON objects.provenance = ''' +\
'''data_provenances.ROWID
WHERE data_provenances.origin_product_type != "UnknownDevice" and ''' +\
'''data_provenances.origin_product_type != "iPhone0,0" AND ''' +\
'''objects.creation_date > 1
GROUP BY data_provenances.origin_product_type
LEFT OUTER JOIN data_provenances ON objects.provenance
= data_provenances.ROWID
WHERE data_provenances.origin_product_type != "iPhone0,0" AND
data_provenances.origin_product_type != "UnknownDevice"
AND objects.creation_date > 0
GROUP BY origin_product_type, origin_build
HAVING MIN(objects.creation_date) != MAX(objects.creation_date)
ORDER BY creation_date;
'''

data_headers = (
('Start Time', 'datetime'), ('End Time', 'datetime'),
'Origin Product Type', 'Device Model')

db_records = get_sqlite_db_records(source_path, query)
'Origin Product Type', 'Device Model', 'Source ID', 'Origin Build',
'OS Version')

for record in db_records:
start_timestamp = convert_cocoa_core_data_ts_to_utc(record[0])
end_timestamp = convert_cocoa_core_data_ts_to_utc(record[1])
device_model = context.get_device_model(record[2])
data_list.append(
(start_timestamp, end_timestamp, record[2], device_model))
# --- BAGIAN PERBAIKAN (BUG FIX) ---
try:
# Coba buka database dan jalankan query
db_records = get_sqlite_db_records(source_path, query)
except (sqlite3.DatabaseError, sqlite3.OperationalError) as e:
# Jika gagal (file corrupt/bukan DB), catat error dan kembalikan data kosong
# logfunc adalah fungsi bawaan iLEAPP untuk logging, atau gunakan print biasa
print(f" [!] Error: Gagal membaca database {source_path}. Kemungkinan file rusak atau bukan SQLite. Detail: {e}")
return data_headers, [], source_path
# ----------------------------------

# Jika berhasil (masuk blok ini), proses datanya
if db_records:
for record in db_records:
start_timestamp = convert_cocoa_core_data_ts_to_utc(record[0])
end_timestamp = convert_cocoa_core_data_ts_to_utc(record[1])
device_model = context.get_device_model(record[2])
os_version = context.get_os_version(record[4], record[2])
data_list.append(
(start_timestamp, end_timestamp, record[2], device_model,
record[3], record[4], os_version))

return data_headers, data_list, source_path


@artifact_processor
def connected_device_information_current_device_info(context):
"""
Expand Down
47 changes: 29 additions & 18 deletions scripts/artifacts/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
}
}

import sqlite3
from packaging import version
from scripts.ilapfuncs import artifact_processor, get_sqlite_db_records, \
attach_sqlite_db_readonly, does_table_exist_in_db, convert_cocoa_core_data_ts_to_utc
Expand Down Expand Up @@ -528,25 +529,35 @@ def health_workouts(context):
'Device ID', 'Device Model', 'Source', 'Software Version', 'Timezone',
('Timestamp added to Health', 'datetime'))

db_records = get_sqlite_db_records(data_source, query, attach_query)

for record in db_records:
start_timestamp = convert_cocoa_core_data_ts_to_utc(record[0])
end_timestamp = convert_cocoa_core_data_ts_to_utc(record[1])
added_timestamp = convert_cocoa_core_data_ts_to_utc(record[26])
device_model = context.get_device_model(record[22])
try:
# Mencoba akses database. Jika file corrupt/encrypted, baris ini yang akan melempar error.
db_records = get_sqlite_db_records(data_source, query, attach_query)
except (sqlite3.DatabaseError, sqlite3.OperationalError) as e:
# Menangkap error spesifik SQLite
print(f" [!] Error executing query on {data_source}: {e}")
# Kembalikan list kosong agar proses iLEAPP tidak mati (crash)
return data_headers, [], data_source

if db_records:
for record in db_records:
start_timestamp = convert_cocoa_core_data_ts_to_utc(record[0])
end_timestamp = convert_cocoa_core_data_ts_to_utc(record[1])
added_timestamp = convert_cocoa_core_data_ts_to_utc(record[26])
device_model = context.get_device_model(record[22])

# Reset temp variable untuk setiap iterasi
celcius_temp = None
if record[16]:
celcius_temp = round(((record[16] - 32) * (5 / 9)), 2)

if record[16]:
celcius_temp = round(((record[16] - 32) * (5 / 9)), 2)

data_list.append(
(start_timestamp, end_timestamp, str(record[2]).title(), record[3],
record[4], record[5], record[6], record[7], record[8], record[9],
record[10], record[11], record[12], record[13], record[14],
record[15], celcius_temp, record[16], record[17], record[18],
record[19], record[20], record[21], record[22], device_model,
record[23], record[24], record[25], added_timestamp)
)
data_list.append(
(start_timestamp, end_timestamp, str(record[2]).title(), record[3],
record[4], record[5], record[6], record[7], record[8], record[9],
record[10], record[11], record[12], record[13], record[14],
record[15], celcius_temp, record[16], record[17], record[18],
record[19], record[20], record[21], record[22], device_model,
record[23], record[24], record[25], added_timestamp)
)

return data_headers, data_list, data_source

Expand Down
11 changes: 9 additions & 2 deletions scripts/artifacts/mailprotect.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,19 @@ def get_mailprotect(files_found, report_folder, seeker, wrap_text, timezone_offs
db.close()

with open_sqlite_db_readonly(os.path.join(head, "Envelope Index")) as db:
# [PENTING] Inisialisasi cursor baru dari koneksi DB yang baru dibuka
cursor = db.cursor()

# Baru kemudian lakukan ATTACH
attach_query = attach_sqlite_db_readonly(f"{head}/Protected Index", 'PI')
cursor.execute(attach_query)

attach_query = attach_sqlite_db_readonly(f"{report_folder}/emails.db", 'emails')
cursor.execute(attach_query)

cursor = db.cursor()

# Baris di bawah ini (baris 73 lama) bisa dihapus atau dibiarkan
# (tapi redundant karena sudah didefinisikan di atas)
# cursor = db.cursor()
cursor.execute(
"""
select
Expand Down
62 changes: 46 additions & 16 deletions scripts/artifacts/netusage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ def pad_mac_adr(adr):
return ':'.join([i.zfill(2) for i in adr.split(':')]).upper()

def get_netusage(files_found, report_folder, seeker, wrap_text, timezone_offset):
# --- DEBUG MARKER ---
logfunc("DEBUG: Memulai script netusage.py versi PATCHED (Anti-Year-0)...")

for file_found in files_found:
file_found = str(file_found)
if not file_found.endswith('.sqlite'):
continue # Skip all other files

if 'netusage' in file_found:
db = open_sqlite_db_readonly(file_found)

# --- BAGIAN 1: App Data ---
cursor = db.cursor()
cursor.execute('''
select
Expand Down Expand Up @@ -43,21 +48,31 @@ def get_netusage(files_found, report_folder, seeker, wrap_text, timezone_offset)
report = ArtifactHtmlReport('Network Usage (netusage) - App Data')
report.start_artifact_report(report_folder, 'Network Usage (netusage) - App Data')
report.add_script()
data_headers = ('Last Connect Timestamp','First Usage Timestamp','Last Usage Timestamp','Bundle Name','Process Name','Type','Wifi In (Bytes)','Wifi Out (Bytes)','Mobile/WWAN In (Bytes)','Mobile/WWAN Out (Bytes)','Wired In (Bytes)','Wired Out (Bytes)') # Don't remove the comma, that is required to make this a tuple as there is only 1 element
data_headers = ('Last Connect Timestamp','First Usage Timestamp','Last Usage Timestamp','Bundle Name','Process Name','Type',
'Wifi In (Bytes)','Wifi Out (Bytes)','Mobile/WWAN In (Bytes)','Mobile/WWAN Out (Bytes)','Wired In (Bytes)','Wired Out (Bytes)')
data_list = []
for row in all_rows:
if row[0] is None:
lastconnected = ''
else:
lastconnected = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[0]),timezone_offset)
if row[1] is None:
firstused = ''
else:
firstused = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[1]),timezone_offset)
if row[2] is None:
lastused = ''
else:
lastused = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[2]),timezone_offset)
try:
if row[0] is None:
lastconnected = ''
else:
lastconnected = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[0]),timezone_offset)
except (ValueError, TypeError):
lastconnected = str(row[0])
try:
if row[1] is None:
firstused = ''
else:
firstused = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[1]),timezone_offset)
except (ValueError, TypeError):
firstused = str(row[1])
try:
if row[2] is None:
lastused = ''
else:
lastused = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[2]),timezone_offset)
except (ValueError, TypeError):
lastused = str(row[2])

data_list.append((lastconnected,firstused,lastused,row[3],row[4],row[5],row[6],row[7],row[8],row[9],row[10],row[11]))

Expand All @@ -72,6 +87,7 @@ def get_netusage(files_found, report_folder, seeker, wrap_text, timezone_offset)
else:
logfunc('No Network Usage (netusage) - App Data data available')

# --- BAGIAN 2: Connections ---
cursor = db.cursor()
cursor.execute('''
select
Expand All @@ -98,11 +114,25 @@ def get_netusage(files_found, report_folder, seeker, wrap_text, timezone_offset)
report = ArtifactHtmlReport('Network Usage (netusage) - Connections')
report.start_artifact_report(report_folder, 'Network Usage (netusage) - Connections')
report.add_script()
data_headers = ('First Connection Timestamp','Last Connection Timestamp','Network Name','Cell Tower ID/Wifi MAC','Network Type','Bytes In','Bytes Out','Connection Attempts','Connection Successes','Packets In','Packets Out') # Don't remove the comma, that is required to make this a tuple as there is only 1 element
data_headers = ('First Connection Timestamp','Last Connection Timestamp','Network Name','Cell Tower ID/Wifi MAC','Network Type','Bytes In','Bytes Out','Connection Attempts','Connection Successes','Packets In','Packets Out')
data_list = []
for row in all_rows:
firstconncted = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[0]),timezone_offset)
lastconnected = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[1]),timezone_offset)
# FIX: Try-Except juga diterapkan di bagian Connections
try:
if row[0] is None:
firstconncted = ''
else:
firstconncted = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[0]),timezone_offset)
except (ValueError, TypeError):
firstconncted = str(row[0])

try:
if row[1] is None:
lastconnected = ''
else:
lastconnected = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[1]),timezone_offset)
except (ValueError, TypeError):
lastconnected = str(row[1])

if row[2] == None:
data_list.append((firstconncted,lastconnected,'','',row[3],row[4],row[5],row[6],row[7],row[8],row[9]))
Expand Down
27 changes: 20 additions & 7 deletions scripts/artifacts/systemVersionPlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,28 @@ def system_version_plist(context):
sysdiagnose_archive = context.get_source_file_path("sysdiagnose_*.tar.gz")

if plist_file:
data_source = system_version_plist
# PERBAIKAN: Gunakan variabel 'plist_file' (string path), BUKAN nama fungsi 'system_version_plist'
data_source = plist_file
pl = get_plist_file_content(data_source)
elif 'sysdiagnose_' in sysdiagnose_archive and "IN_PROGRESS_" not in sysdiagnose_archive:
tar = tarfile.open(sysdiagnose_archive)
root = tar.getmembers()[0].name.split('/')[0]

elif sysdiagnose_archive and 'sysdiagnose_' in sysdiagnose_archive and "IN_PROGRESS_" not in sysdiagnose_archive:
try:
data_source = tar.extractfile(f"{root}/logs/SystemVersion/SystemVersion.plist")
pl = get_plist_file_content(data_source)
except KeyError:
tar = tarfile.open(sysdiagnose_archive)
# Validasi agar tidak crash jika tar kosong
members = tar.getmembers()
if members:
root = members[0].name.split('/')[0]
try:
# Kita ubah data_source menjadi string path arsip untuk keperluan reporting
data_source = sysdiagnose_archive

# Ekstrak file spesifik dari dalam tar
extracted_file = tar.extractfile(f"{root}/logs/SystemVersion/SystemVersion.plist")
if extracted_file:
pl = get_plist_file_content(extracted_file)
except KeyError:
pl = None
except tarfile.ReadError:
pl = None

if pl is not None:
Expand Down
Loading