-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaa
More file actions
133 lines (109 loc) · 4.34 KB
/
Copy pathaa
File metadata and controls
133 lines (109 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
import json
from google.cloud import bigquery, storage
from pathlib import Path
PROJECT_ID = "operating-aria-468908-b6"
DATASET_ID = "alpspr_dataset_01"
TABLE_ID = "jsonl_dump_4"
BUCKET_NAME = "my-bucket-sysco"
BLOB_NAME = "item-1d80b0e7-4c10-4b48-b1bd-47828f6bdc1d 1.jsonl"
LOCATION = "asia-south2"
BATCH_SIZE = 100 # number of rows per insert
FAILED_FILE = Path(__file__).parent / "failed_batches.jsonl"
def validate_and_clean(record, schema, line_num, errors):
"""Validate and clean a single JSON record against BigQuery schema."""
cleaned = {}
for key, value in record.items():
if key not in schema:
continue
col_type, col_mode = schema[key]
try:
# Handle REPEATED fields
if col_mode == "REPEATED":
if not value:
cleaned[key] = []
elif isinstance(value, list):
cleaned[key] = value
else:
cleaned[key] = [value]
continue
# Scalars
if value is None:
cleaned[key] = None
elif col_type in ("INTEGER", "INT64"):
cleaned[key] = int(value)
elif col_type in ("FLOAT", "FLOAT64", "NUMERIC"):
cleaned[key] = float(value)
elif col_type == "BOOLEAN":
cleaned[key] = str(value).lower() in ("true", "1", "yes")
elif col_type == "TIMESTAMP":
cleaned[key] = value # let BigQuery parse timestamp
else:
cleaned[key] = str(value)
except (ValueError, TypeError):
errors.append((line_num, f"Invalid value for field '{key}': {value} (expected {col_type})"))
cleaned[key] = None # fallback to None if invalid
return cleaned
def main():
bq_client = bigquery.Client(project=PROJECT_ID)
storage_client = storage.Client(project=PROJECT_ID)
# Get BigQuery table schema
table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
table = bq_client.get_table(table_ref)
schema = {field.name: (field.field_type, field.mode) for field in table.schema}
# Read JSONL file from GCS
print(f"Downloading {BLOB_NAME} from bucket {BUCKET_NAME}...")
bucket = storage_client.bucket(BUCKET_NAME)
blob = bucket.blob(BLOB_NAME)
content = blob.download_as_text().splitlines()
if not content:
print("❌ The JSONL file is empty, nothing to load.")
return
print(f"📄 Total lines read: {len(content)}")
rows = []
errors = []
for idx, line in enumerate(content, start=1):
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError as e:
errors.append((idx, f"Invalid JSON format: {e}"))
continue
cleaned = validate_and_clean(record, schema, idx, errors)
rows.append(cleaned)
if errors:
print(f"\n⚠️ Validation finished with {len(errors)} issues:")
for line_num, msg in errors[:20]: # show first 20 only
print(f" Line {line_num}: {msg}")
if len(errors) > 20:
print(f"... and {len(errors)-20} more errors.\n")
if not rows:
print("❌ No valid rows found to insert.")
return
# Insert rows in batches
print(f"\n🚀 Inserting {len(rows)} rows into {table_ref} in batches of {BATCH_SIZE}...")
failed_batches = []
failed_rows = []
for i in range(0, len(rows), BATCH_SIZE):
batch_num = i // BATCH_SIZE + 1
batch = rows[i:i+BATCH_SIZE]
errors_bq = bq_client.insert_rows_json(table, batch)
if errors_bq:
print(f"⚠️ Batch {batch_num} had errors.")
failed_batches.append(batch_num)
failed_rows.extend(batch) # store rows from failed batch
else:
print(f"✅ Batch {batch_num} inserted successfully ({len(batch)} rows).")
# Summary
if failed_batches:
print(f"\n❌ The following batches had issues: {failed_batches}")
with open(FAILED_FILE, "w") as f:
for row in failed_rows:
f.write(json.dumps(row) + "\n")
print(f"💾 Failed rows written to {FAILED_FILE}")
else:
print("\n🎉 All batches inserted into BigQuery successfully!")
if __name__ == "__main__":
main()