-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb_load.py
More file actions
417 lines (362 loc) · 15.6 KB
/
db_load.py
File metadata and controls
417 lines (362 loc) · 15.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
#!/usr/bin/env python3
"""
db_load.py — Load extracted records into SQLite.
Three tables:
mechanisms — one row per mechanism, scalar fields
mechanism_properties — key/value for variable-density optional fields
interactions — typed edges between mechanisms
Usage:
python db_load.py # load all extracted/ into db/mechanisms.sqlite
python db_load.py --rebuild # drop and recreate (full reload)
python db_load.py --id loss_aversion # load/update single record
"""
import argparse
import json
import sqlite3
from pathlib import Path
ROOT = Path(__file__).parent
EXTRACTED_DIR = ROOT / "extracted"
DB_PATH = ROOT / "db" / "mechanisms.sqlite"
# Fields that go into the mechanisms table as columns
SCALAR_FIELDS = [
"name",
"domain",
"description",
"summary",
"mechanism_type",
"class",
# triggers / outputs as JSON arrays (stored as JSON strings)
"triggers",
"behavioral_outputs",
"outputs",
"plain_language_outputs", # everyday-English effect phrases for vocabulary bridging
"narrative_outputs", # clinical/diagnostic register mechanism descriptions for LLM prompts
# evidence
"effect_size",
"replication",
"replication_status",
# individual variation
"individual_variation",
"variation",
# cross-cultural
"cross_cultural",
"cross_cultural_status",
# phase
"phase",
# verification quality score
"accuracy_score",
# raw notes
"notes",
]
# Fields that go into mechanism_properties as key/value
# Everything not in SCALAR_FIELDS and not in EXCLUDED_FIELDS
EXCLUDED_FIELDS = {
"id",
"name",
"domain",
"phase",
"sources",
"interactions",
"person_moderators",
"situation_activators", # normalized into own tables
}
# ─── DB setup ─────────────────────────────────────────────────────────────────
def get_conn() -> sqlite3.Connection:
DB_PATH.parent.mkdir(exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def create_tables(conn: sqlite3.Connection):
conn.executescript("""
CREATE TABLE IF NOT EXISTS mechanisms (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
domain TEXT,
description TEXT,
summary TEXT,
mechanism_type TEXT,
class TEXT,
triggers TEXT, -- JSON array
behavioral_outputs TEXT, -- JSON array
outputs TEXT, -- JSON array (alt field name)
plain_language_outputs TEXT, -- JSON array of everyday-English effect phrases
narrative_outputs TEXT, -- JSON array of clinical-register mechanism descriptions
effect_size TEXT,
replication TEXT,
replication_status TEXT,
individual_variation TEXT, -- JSON object or string
variation TEXT, -- JSON object or string (alt)
cross_cultural TEXT,
cross_cultural_status TEXT,
accuracy_score REAL, -- verifier score 0.0-1.0
phase TEXT,
notes TEXT,
loaded_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS mechanism_properties (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mechanism_id TEXT NOT NULL REFERENCES mechanisms(id) ON DELETE CASCADE,
key TEXT NOT NULL,
value TEXT, -- JSON-serialized
value_type TEXT -- 'string'|'number'|'boolean'|'array'|'object'|'null'
);
-- Normalized person-level moderators: queryable by dimension
CREATE TABLE IF NOT EXISTS person_moderators (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mechanism_id TEXT NOT NULL REFERENCES mechanisms(id) ON DELETE CASCADE,
dimension TEXT NOT NULL, -- key from dimensions.json person_trait/state vocab
direction TEXT NOT NULL, -- '+' | '-' | 'mixed'
strength TEXT, -- 'weak' | 'moderate' | 'strong'
note TEXT
);
-- Normalized situation activators: queryable by feature
CREATE TABLE IF NOT EXISTS situation_activators (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mechanism_id TEXT NOT NULL REFERENCES mechanisms(id) ON DELETE CASCADE,
feature TEXT NOT NULL, -- key from dimensions.json situation_dimensions vocab
effect TEXT NOT NULL, -- 'activates' | 'amplifies' | 'dampens' | 'required'
note TEXT
);
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mechanism_a TEXT NOT NULL REFERENCES mechanisms(id) ON DELETE CASCADE,
relationship TEXT NOT NULL, -- amplifies|suppresses|correlates_with|prerequisite_for|confounded_with
mechanism_b TEXT NOT NULL, -- may not exist in DB yet
strength TEXT, -- weak|moderate|strong or numeric
direction TEXT, -- bidirectional|unidirectional
notes TEXT,
source_record TEXT -- which extraction this came from
);
CREATE INDEX IF NOT EXISTS idx_mech_domain ON mechanisms(domain);
CREATE INDEX IF NOT EXISTS idx_mech_score ON mechanisms(accuracy_score);
CREATE INDEX IF NOT EXISTS idx_prop_mech ON mechanism_properties(mechanism_id);
CREATE INDEX IF NOT EXISTS idx_prop_key ON mechanism_properties(key);
CREATE INDEX IF NOT EXISTS idx_pm_mech ON person_moderators(mechanism_id);
CREATE INDEX IF NOT EXISTS idx_pm_dimension ON person_moderators(dimension);
CREATE INDEX IF NOT EXISTS idx_pm_dir ON person_moderators(direction);
CREATE INDEX IF NOT EXISTS idx_sa_mech ON situation_activators(mechanism_id);
CREATE INDEX IF NOT EXISTS idx_sa_feature ON situation_activators(feature);
CREATE INDEX IF NOT EXISTS idx_sa_effect ON situation_activators(effect);
CREATE INDEX IF NOT EXISTS idx_inter_a ON interactions(mechanism_a);
CREATE INDEX IF NOT EXISTS idx_inter_b ON interactions(mechanism_b);
CREATE INDEX IF NOT EXISTS idx_inter_rel ON interactions(relationship);
""")
conn.commit()
def drop_tables(conn: sqlite3.Connection):
conn.executescript("""
DROP TABLE IF EXISTS interactions;
DROP TABLE IF EXISTS situation_activators;
DROP TABLE IF EXISTS person_moderators;
DROP TABLE IF EXISTS mechanism_properties;
DROP TABLE IF EXISTS mechanisms;
""")
conn.commit()
# ─── Value helpers ────────────────────────────────────────────────────────────
def json_or_str(v) -> str | None:
if v is None:
return None
if isinstance(v, dict | list):
return json.dumps(v)
return str(v)
def value_type(v) -> str:
if v is None:
return "null"
if isinstance(v, bool):
return "boolean"
if isinstance(v, int | float):
return "number"
if isinstance(v, str):
return "string"
if isinstance(v, list):
return "array"
if isinstance(v, dict):
return "object"
return "string"
# ─── Load record ─────────────────────────────────────────────────────────────
def flatten_evidence(extraction: dict) -> dict:
"""Pull evidence sub-fields up to the top level for scalar storage."""
result = dict(extraction)
evidence = extraction.get("evidence", {})
if isinstance(evidence, dict):
for k in (
"effect_size",
"replication",
"replication_status",
"cross_cultural",
"cross_cultural_status",
"caveats",
):
if k in evidence and k not in result:
result[k] = evidence[k]
return result
def load_record(conn: sqlite3.Connection, rec: dict):
"""Insert or replace one extracted record into the DB."""
mid = rec["id"]
extraction = rec.get("extraction", {})
if not isinstance(extraction, dict):
extraction = {}
extraction = flatten_evidence(extraction)
# ── mechanisms row ──
mech_row = {
"id": mid,
"name": rec.get("name", extraction.get("name", mid)),
"domain": rec.get("domain", extraction.get("domain", "")),
"phase": rec.get("phase", ""),
"accuracy_score": rec.get("verification", {}).get("accuracy_score"),
}
for field in SCALAR_FIELDS:
if field not in mech_row and field in extraction:
mech_row[field] = json_or_str(extraction[field])
cols = list(mech_row.keys())
placeholders = ", ".join("?" for _ in cols)
col_list = ", ".join(cols)
update_set = ", ".join(f"{c}=excluded.{c}" for c in cols if c != "id")
conn.execute(
f"INSERT INTO mechanisms ({col_list}) VALUES ({placeholders}) "
f"ON CONFLICT(id) DO UPDATE SET {update_set}",
[mech_row.get(c) for c in cols],
)
# ── mechanism_properties ──
conn.execute("DELETE FROM mechanism_properties WHERE mechanism_id=?", (mid,))
known = set(SCALAR_FIELDS) | EXCLUDED_FIELDS
for k, v in extraction.items():
if k in known:
continue
if isinstance(v, dict) and k == "evidence":
# Expand evidence sub-keys as properties
for ek, ev in v.items():
conn.execute(
"INSERT INTO mechanism_properties (mechanism_id, key, value, value_type) "
"VALUES (?, ?, ?, ?)",
(mid, f"evidence.{ek}", json_or_str(ev), value_type(ev)),
)
else:
conn.execute(
"INSERT INTO mechanism_properties (mechanism_id, key, value, value_type) "
"VALUES (?, ?, ?, ?)",
(mid, k, json_or_str(v), value_type(v)),
)
# ── person_moderators ──
conn.execute("DELETE FROM person_moderators WHERE mechanism_id=?", (mid,))
pm = extraction.get("person_moderators", [])
if isinstance(pm, list):
for item in pm:
if not isinstance(item, dict):
continue
dim = item.get("dimension", "").strip()
direction = item.get("direction", "").strip()
if not dim or not direction:
continue
conn.execute(
"INSERT INTO person_moderators (mechanism_id, dimension, direction, strength, note) "
"VALUES (?, ?, ?, ?, ?)",
(mid, dim, direction, item.get("strength"), item.get("note")),
)
# ── situation_activators ──
conn.execute("DELETE FROM situation_activators WHERE mechanism_id=?", (mid,))
sa = extraction.get("situation_activators", [])
if isinstance(sa, list):
for item in sa:
if not isinstance(item, dict):
continue
feature = item.get("feature", "").strip()
effect = item.get("effect", "").strip()
if not feature or not effect:
continue
conn.execute(
"INSERT INTO situation_activators (mechanism_id, feature, effect, note) "
"VALUES (?, ?, ?, ?)",
(mid, feature, effect, item.get("note")),
)
# ── interactions ──
conn.execute("DELETE FROM interactions WHERE mechanism_a=?", (mid,))
interactions = extraction.get("interactions", [])
if isinstance(interactions, list):
for item in interactions:
if not isinstance(item, dict):
continue
rel = item.get("relationship") or item.get("type") or item.get("relation", "")
target = item.get("mechanism") or item.get("target") or item.get("mechanism_b", "")
if not rel or not target:
continue
conn.execute(
"INSERT INTO interactions "
"(mechanism_a, relationship, mechanism_b, strength, direction, notes, source_record) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
mid,
rel,
target,
item.get("strength"),
item.get("direction"),
item.get("notes"),
mid,
),
)
elif isinstance(interactions, dict):
# Some extractions use {"amplifies": [...], "suppresses": [...]}
for rel, targets in interactions.items():
if not isinstance(targets, list):
targets = [targets]
for target in targets:
if isinstance(target, dict):
tname = target.get("mechanism") or target.get("name", "")
notes = target.get("notes")
strength = target.get("strength")
else:
tname = str(target)
notes = None
strength = None
if tname:
conn.execute(
"INSERT INTO interactions "
"(mechanism_a, relationship, mechanism_b, strength, notes, source_record) "
"VALUES (?, ?, ?, ?, ?, ?)",
(mid, rel, tname, strength, notes, mid),
)
# ─── CLI ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Load extracted records into SQLite")
parser.add_argument(
"--rebuild", action="store_true", help="Drop and recreate all tables before loading"
)
parser.add_argument("--id", help="Load/update single mechanism by ID")
args = parser.parse_args()
conn = get_conn()
if args.rebuild:
print("Dropping tables...")
drop_tables(conn)
create_tables(conn)
if args.id:
paths = [EXTRACTED_DIR / f"{args.id}.json"]
else:
paths = sorted(EXTRACTED_DIR.glob("*.json"))
loaded = 0
skipped = 0
for path in paths:
if not path.exists():
print(f" ✗ {path.name}: not found")
skipped += 1
continue
try:
rec = json.loads(path.read_text())
except json.JSONDecodeError as e:
print(f" ✗ {path.name}: JSON error: {e}")
skipped += 1
continue
# Skip records with only raw_text
extraction = rec.get("extraction", {})
if isinstance(extraction, dict) and "raw_text" in extraction and len(extraction) == 1:
print(f" ⚠ {path.stem}: skipping (extraction was not parseable JSON)")
skipped += 1
continue
load_record(conn, rec)
loaded += 1
print(f" ✓ {path.stem}")
conn.commit()
conn.close()
print(f"\nDone: {loaded} loaded, {skipped} skipped → {DB_PATH}")
if __name__ == "__main__":
main()