Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
limiter = Limiter(key_func=get_remote_address)

# Import routers after limiter is defined so they can use it
from .routers import open_academic_analytics, datasets, auth, wikimedia, annotations, dark_data_survey, scisciDB, datalakes, interdisciplinarity
from .routers import open_academic_analytics, datasets, auth, wikimedia, annotations, dark_data_survey, scisciDB, datalakes, interdisciplinarity, storywrangler

app = FastAPI(
title=settings.app_name,
Expand Down Expand Up @@ -89,6 +89,7 @@ async def shutdown_event():
app.include_router(scisciDB.router, prefix="/scisciDB", tags=["scisciDB"])
app.include_router(dark_data_survey.router, prefix="", tags=["dark-data-survey"])
app.include_router(interdisciplinarity.router, prefix="", tags=["interdisciplinarity"])
app.include_router(storywrangler.router, prefix="/storywrangler", tags=["storywrangler"])
app.include_router(admin.router, prefix="/admin", tags=["admin"], include_in_schema=False)

# Admin endpoints (secured with admin authentication)
Expand Down
328 changes: 315 additions & 13 deletions backend/app/routers/datalakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,304 @@ async def register_datalake(
}
}

@router.get("/search-terms2")
async def search_terms_batch(
types: str = Query(..., description="Comma-separated list of ngram terms"),
date: Optional[str] = Query(None, description="First system focus date (YYYY-MM-DD)"),
date2: Optional[str] = Query(None, description="Second system focus date (YYYY-MM-DD)"),
location: str = Query("wikidata:Q30", description="First system location entity ID"),
location2: Optional[str] = Query(None, description="Second system location entity ID (defaults to location)"),
granularity: str = Query("daily", description="Granularity: daily, weekly, monthly"),
window_size: int = Query(7, description="Number of granularity periods before/after each focus date"),
db: AsyncSession = Depends(get_db_session)
):
"""
Batch sparkline lookup for multiple ngram terms across one or two systems.

Two comparison modes:
- Temporal (date + date2, same location): ONE DuckDB scan — both windows' paths merged.
- Geographic (date, location + location2): TWO DuckDB scans — paths live in separate geo dirs.

Results are keyed as system1/system2 so the frontend can render both sides
without coordinating parallel calls.
"""
if granularity not in ["daily", "weekly", "monthly"]:
raise HTTPException(status_code=400, detail="granularity must be one of: daily, weekly, monthly")

for d_str in [date, date2]:
if d_str:
try:
datetime.fromisoformat(d_str)
except ValueError as e:
raise HTTPException(status_code=400, detail=f"Invalid date format. Use YYYY-MM-DD: {e}")

terms = [t.strip() for t in types.split(",") if t.strip()]
if not terms:
raise HTTPException(status_code=400, detail="At least one term is required")

systems_input: Dict[str, Dict] = {}
if date:
systems_input["system1"] = {"date": date, "location": location}
if date2:
systems_input["system2"] = {"date": date2, "location": location2 or location}
if not systems_input:
raise HTTPException(status_code=400, detail="At least one of date or date2 must be provided")

query = select(Datalake).where(Datalake.dataset_id == "wikigrams")
result = await db.execute(query)
datalake = result.scalar_one_or_none()

if not datalake:
raise HTTPException(status_code=404, detail="Wikigrams datalake not found")

granularity_mapping = {
"daily": ("wikigrams", "date"),
"weekly": ("wikigrams_weekly", "week"),
"monthly": ("wikigrams_monthly", "month")
}
table_name, time_column = granularity_mapping[granularity]

has_top_articles = (
granularity == "daily"
and bool(datalake.data_schema and "top_articles" in datalake.data_schema)
)

try:
duckdb_client = get_duckdb_client()
conn = duckdb_client.connect()

if not datalake.tables_metadata:
raise HTTPException(status_code=500, detail="Datalake metadata is missing.")

if table_name not in datalake.tables_metadata:
available = [k for k in datalake.tables_metadata.keys() if k.startswith("wikigrams")]
raise HTTPException(status_code=400, detail=f"Table '{table_name}' not found. Available: {available}.")

t_paths = time.time()
wikigrams_path_all, adapter_path = get_parquet_paths(datalake, table_name)
t_paths_ms = (time.time() - t_paths) * 1000

# Build a prefix index once: parent_dir → [paths].
# Lookup is O(N_paths) to build, O(1) per partition — avoids the
# O(N_paths × N_partitions) linear scan of the old list comprehension.
path_prefix_index: Dict[str, List[str]] = {}
for p in wikigrams_path_all:
dir_path = p.rsplit("/", 1)[0]
path_prefix_index.setdefault(dir_path, []).append(p)

placeholders = ",".join(["?" for _ in terms])
start_time = time.time()

# --- Resolve unique locations to local_geo (one adapter lookup per unique location) ---
unique_locations = {s["location"] for s in systems_input.values()}
geo_map: Dict[str, str] = {}
t_adapter = time.time()
for loc in unique_locations:
row = conn.execute(
"SELECT local_id FROM read_parquet(?) WHERE entity_id = ? LIMIT 1",
[adapter_path, loc]
).fetchone()
if not row:
raise HTTPException(status_code=400, detail=f"Location '{loc}' not found in adapter")
geo_map[loc] = quote(row[0], safe='')
t_adapter_ms = (time.time() - t_adapter) * 1000

# --- Compute per-system window metadata ---
window_unit_days = {"daily": 1, "weekly": 7, "monthly": 30}[granularity]
effective_window = window_size * window_unit_days

per_system: Dict[str, Dict] = {}
t_filter = time.time()
for sys_key, system in systems_input.items():
loc = system["location"]
local_geo = geo_map[loc]
focus_date = datetime.fromisoformat(system["date"])
w_start = (focus_date - timedelta(days=effective_window)).strftime("%Y-%m-%d")
w_end = (focus_date + timedelta(days=effective_window)).strftime("%Y-%m-%d")
window_partitions = compute_partition_starts(w_start, w_end, granularity)
focus_partition = compute_partition_starts(system["date"], system["date"], granularity)[0]

# O(1) lookup per partition using prefix index
base = f"{datalake.data_location}/{table_name}/geo={local_geo}"
query_paths = []
for ps in window_partitions:
query_paths.extend(path_prefix_index.get(f"{base}/{time_column}={ps}", []))

if not query_paths:
raise HTTPException(status_code=404, detail=f"No data found for {sys_key} ({system['date']}, {loc})")

focus_paths = path_prefix_index.get(f"{base}/{time_column}={focus_partition}", [])

per_system[sys_key] = {
"loc": loc,
"focus_date_str": system["date"],
"window_partitions": window_partitions,
"window_set": set(window_partitions),
"focus_partition": focus_partition,
"query_paths": query_paths,
"focus_paths": focus_paths,
}
t_filter_ms = (time.time() - t_filter) * 1000

# --- Choose scan strategy ---
all_geos = {geo_map[s["location"]] for s in systems_input.values()}
temporal_comparison = len(systems_input) == 2 and len(all_geos) == 1
print(f" setup: get_paths={t_paths_ms:.0f}ms, adapter={t_adapter_ms:.0f}ms, filter={t_filter_ms:.0f}ms | total_paths={len(wikigrams_path_all)}")

system_results: Dict[str, Dict] = {}

if temporal_comparison:
# TEMPORAL: same geo, different dates → merge paths, single DuckDB scan.
# top_articles is split into a separate query against focus-date files only
# so the large text column is not read from all 111 sparkline files.
s1 = per_system["system1"]
s2 = per_system["system2"]
combined_paths = sorted(set(s1["query_paths"]) | set(s2["query_paths"]))
range_start = min(s1["window_partitions"][0], s2["window_partitions"][0])
range_end = max(s1["window_partitions"][-1], s2["window_partitions"][-1])

# 1. Sparklines query — no top_articles column (small columns only)
spark_sql = f"""
SELECT
w.types,
w.{time_column},
MIN(w.rank) AS rank,
SUM(w.counts) AS counts
FROM read_parquet(?) w
WHERE w.{time_column} BETWEEN ? AND ?
AND w.types IN ({placeholders})
GROUP BY w.types, w.{time_column}
ORDER BY w.types, w.{time_column}
"""
t_query = time.time()
cursor = conn.execute(spark_sql, [combined_paths, range_start, range_end] + terms)
t_spark_ms = (time.time() - t_query) * 1000

rows = cursor.fetchall()
cols = [desc[0] for desc in cursor.description]

# Initialize both systems
for sys_key, meta in per_system.items():
system_results[sys_key] = {
"date": meta["focus_date_str"],
"location": meta["loc"],
"sparkData": {t: [] for t in terms},
"topArticles": {},
}

# Split rows by window membership (a date can fall in both windows if they overlap)
for row in rows:
d = dict(zip(cols, row))
term = d["types"]
date_val = str(d[time_column])
point = {time_column: d[time_column], "rank": d["rank"], "counts": d["counts"]}
if date_val in s1["window_set"]:
system_results["system1"]["sparkData"][term].append(point)
if date_val in s2["window_set"]:
system_results["system2"]["sparkData"][term].append(point)

# 2. top_articles query — focus-date files only (1-2 files)
t_articles = time.time()
if has_top_articles:
focus_paths = sorted(set(s1["focus_paths"]) | set(s2["focus_paths"]))
if focus_paths:
try:
art_cursor = conn.execute(f"""
SELECT
w.types,
ANY_VALUE(w.top_articles) FILTER (WHERE w.{time_column} = ?) AS top_articles_s1,
ANY_VALUE(w.top_articles) FILTER (WHERE w.{time_column} = ?) AS top_articles_s2
FROM read_parquet(?) w
WHERE w.types IN ({placeholders})
GROUP BY w.types
""", [s1["focus_partition"], s2["focus_partition"], focus_paths] + terms)
for row in art_cursor.fetchall():
d = dict(zip([c[0] for c in art_cursor.description], row))
if d.get("top_articles_s1") is not None:
system_results["system1"]["topArticles"][d["types"]] = d["top_articles_s1"]
if d.get("top_articles_s2") is not None:
system_results["system2"]["topArticles"][d["types"]] = d["top_articles_s2"]
except Exception:
pass # top_articles unavailable; sparklines already populated
t_articles_ms = (time.time() - t_articles) * 1000

print(f" temporal: {len(combined_paths)} paths, spark={t_spark_ms:.0f}ms, articles={t_articles_ms:.0f}ms ({len(s1['focus_paths'])+len(s2['focus_paths'])} focus files)")

else:
# GEOGRAPHIC (or single system): separate scan per system.
# Same split as temporal: sparklines without top_articles, then tiny articles query.
for sys_key, meta in per_system.items():
query_paths = meta["query_paths"]

# 1. Sparklines — small columns only
t_query = time.time()
cursor = conn.execute(f"""
SELECT
w.types,
w.{time_column},
MIN(w.rank) AS rank,
SUM(w.counts) AS counts
FROM read_parquet(?) w
WHERE w.{time_column} BETWEEN ? AND ?
AND w.types IN ({placeholders})
GROUP BY w.types, w.{time_column}
ORDER BY w.types, w.{time_column}
""", [query_paths, meta["window_partitions"][0], meta["window_partitions"][-1]] + terms)
t_query_ms = (time.time() - t_query) * 1000

rows = cursor.fetchall()
cols = [desc[0] for desc in cursor.description]

spark_data: Dict[str, List[Dict]] = {t: [] for t in terms}
for row in rows:
d = dict(zip(cols, row))
spark_data[d["types"]].append({
time_column: d[time_column],
"rank": d["rank"],
"counts": d["counts"],
})

# 2. top_articles — focus-date file(s) only
top_articles: Dict[str, Any] = {}
t_articles = time.time()
if has_top_articles and meta["focus_paths"]:
try:
art_cursor = conn.execute(f"""
SELECT
w.types,
ANY_VALUE(w.top_articles) AS top_articles
FROM read_parquet(?) w
WHERE w.{time_column} = ?
AND w.types IN ({placeholders})
GROUP BY w.types
""", [meta["focus_paths"], meta["focus_partition"]] + terms)
for row in art_cursor.fetchall():
d = dict(zip([c[0] for c in art_cursor.description], row))
if d.get("top_articles") is not None:
top_articles[d["types"]] = d["top_articles"]
except Exception:
pass # top_articles unavailable; sparklines already populated
t_articles_ms = (time.time() - t_articles) * 1000

system_results[sys_key] = {
"date": meta["focus_date_str"],
"location": meta["loc"],
"sparkData": spark_data,
"topArticles": top_articles,
}
print(f" {sys_key}: {len(query_paths)} paths, spark={t_query_ms:.0f}ms, articles={t_articles_ms:.0f}ms ({len(meta['focus_paths'])} focus files)")

duration = (time.time() - start_time) * 1000
print(f"searchTermsBatch total={duration:.2f}ms — {'temporal' if temporal_comparison else 'geographic'} for {len(terms)} terms × {len(systems_input)} systems")

return {**system_results, "duration": duration}

except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}")


@router.get("/{dataset_id}")
async def get_datalake_info(
dataset_id: str,
Expand Down Expand Up @@ -294,16 +592,22 @@ async def get_adapter_info(
detail="Datalake metadata is missing. Please re-register the datalake with proper tables_metadata."
)

adapter_fnames = datalake.tables_metadata.get("adapter")

if not adapter_fnames:
raise HTTPException(
status_code=500,
detail="Missing babynames or adapter file paths. Required: tables_metadata.babynames and tables_metadata.adapter"
)

adapter_path = [
f"{datalake.data_location}/{datalake.ducklake_data_path}/main/adapter/{fname}" for fname in adapter_fnames
if datalake.data_format == "parquet_hive":
if not datalake.entity_mapping or not datalake.entity_mapping.get("path"):
raise HTTPException(
status_code=500,
detail="Missing entity_mapping.path for parquet_hive format."
)
adapter_path = [datalake.entity_mapping["path"]]
else:
adapter_fnames = datalake.tables_metadata.get("adapter")
if not adapter_fnames:
raise HTTPException(
status_code=500,
detail="Missing adapter file paths. Required: tables_metadata.adapter"
)
adapter_path = [
f"{datalake.data_location}/{datalake.ducklake_data_path}/main/adapter/{fname}" for fname in adapter_fnames
]

# Execute comparative queries
Expand Down Expand Up @@ -626,15 +930,13 @@ async def get_wikigrams_top_ngrams(
w.types,
SUM(w.counts) as counts
FROM read_parquet(?) w
LEFT JOIN read_parquet(?) a ON w.geo = a.local_id
WHERE w.{time_column} BETWEEN ? AND ?
AND a.entity_id = ?
GROUP BY w.types
ORDER BY counts DESC
LIMIT ?
"""

params = [filtered_wikigrams_path, adapter_path, date_range[0], date_range[1], location, limit]
params = [filtered_wikigrams_path, date_range[0], date_range[1], limit]

cursor = conn.execute(sql_query, params)
query_results = cursor.fetchall()
Expand Down
Loading