Skip to content

Commit 589f8d9

Browse files
committed
feat: use a staging table to avoid unduly truncations
1 parent 5c09bc4 commit 589f8d9

File tree

7 files changed

+144
-76
lines changed

7 files changed

+144
-76
lines changed

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1513,53 +1513,62 @@ def setJobCommandStatus(self, jobID, command, status):
15131513
def fillJobsHistorySummary(self):
15141514
"""Fill the JobsHistorySummary table with the summary of the jobs in a final state"""
15151515

1516+
# Create the staging table
1517+
createStagingTable_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging LIKE JobsHistorySummary"
1518+
if not (result := self._update(createStagingTable_sql))["OK"]:
1519+
return result
1520+
1521+
# Insert the data into the staging table
15161522
defString = "Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus"
15171523
valuesString = "COUNT(JobID), SUM(RescheduleCounter)"
15181524
final_states = "', '".join(JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES)
15191525
final_states = f"'{final_states}'"
1520-
15211526
query = (
1522-
f"INSERT INTO JobsHistorySummary SELECT {defString}, {valuesString} "
1527+
f"INSERT INTO JobsHistorySummary_staging SELECT {defString}, {valuesString} "
15231528
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime < UTC_DATE() "
15241529
f"GROUP BY {defString}"
15251530
)
1526-
result = self._update(query)
1527-
if not result["OK"]:
1531+
if not (result := self._update(query))["OK"]:
15281532
return result
1529-
return S_OK(result["Value"])
1533+
1534+
# Atomic swap
1535+
sql = (
1536+
"RENAME TABLE JobsHistorySummary TO JobsHistorySummary_old,"
1537+
"JobsHistorySummary_staging TO JobsHistorySummary;"
1538+
"DROP TABLE JobsHistorySummary_old;"
1539+
)
1540+
return self._update(sql)
15301541

15311542
def getSummarySnapshot(self, requestedFields=False):
15321543
"""Get the summary snapshot for a given combination"""
15331544
if not requestedFields:
15341545
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
1535-
valueFields = ["COUNT(JobID)", "SUM(RescheduleCounter)"]
15361546
defString = ", ".join(requestedFields)
1537-
valueString = ", ".join(valueFields)
15381547
final_states = "', '".join(JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES)
15391548
final_states = f"'{final_states}'"
15401549

1541-
query = f"SELECT {defString}, {valueString} FROM ("
1550+
query = f"SELECT {defString}, SUM(JobCount) AS JobCount, SUM(RescheduleSum) AS RescheduleSum FROM ("
15421551
# All jobs that are NOT in a final state
15431552
query += (
1544-
f"SELECT {defString}, {valueString}, COUNT(JobID), SUM(RescheduleCounter) "
1553+
f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum "
15451554
f"FROM Jobs WHERE STATUS NOT IN ({final_states}) "
1546-
f"GROUP BY {defString}, {valueString} "
1555+
f"GROUP BY {defString} "
15471556
)
15481557
query += "UNION ALL "
15491558
# Recent jobs only (today) that are in a final state
15501559
query += (
1551-
f"SELECT {defString}, {valueString}, COUNT(JobID), SUM(RescheduleCounter) "
1560+
f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum "
15521561
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1553-
f"GROUP BY {defString}, {valueString} "
1562+
f"GROUP BY {defString} "
15541563
)
15551564
query += "UNION ALL "
15561565
# Cached history (of jobs in a final state)
1557-
query += f"SELECT * FROM JobsHistorySummary) AS combined GROUP BY {defString}, {valueString}"
1566+
query += f"SELECT {defString}, JobCount, RescheduleSum FROM JobsHistorySummary) AS combined GROUP BY {defString}"
15581567

1559-
result = self._query(query)
1560-
if not result["OK"]:
1561-
return result
1562-
return S_OK(((requestedFields + valueFields), result["Value"]))
1568+
return self._query(query)
1569+
# if not result["OK"]:
1570+
# return result
1571+
# return S_OK(((requestedFields + valueFields), result["Value"]))
15631572

15641573
def removeInfoFromHeartBeatLogging(self, status, delTime, maxLines):
15651574
"""Remove HeartBeatLoggingInfo from DB.

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,53 +1050,61 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems):
10501050
def fillPilotsHistorySummary(self):
10511051
"""Fill the PilotsHistorySummary table with the summary of the Pilots in a final state"""
10521052

1053+
# Create the staging table
1054+
createStagingTable_sql = "CREATE TABLE IF NOT EXISTS PilotsHistorySummary_staging LIKE PilotsHistorySummary"
1055+
if not (result := self._update(createStagingTable_sql))["OK"]:
1056+
return result
1057+
1058+
# Insert the data into the staging table
10531059
defString = "GridSite, GridType, Status"
10541060
valuesString = "COUNT(PilotID)"
10551061
final_states = "', '".join(PilotStatus.PILOT_FINAL_STATES)
10561062
final_states = f"'{final_states}'"
10571063

10581064
query = (
1059-
f"INSERT INTO PilotsHistorySummary SELECT {defString}, {valuesString} "
1065+
f"INSERT INTO PilotsHistorySummary_staging SELECT {defString}, {valuesString} "
10601066
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime < UTC_DATE() "
10611067
f"GROUP BY {defString}"
10621068
)
1063-
result = self._update(query)
1064-
if not result["OK"]:
1069+
if not (result := self._update(query))["OK"]:
10651070
return result
1066-
return S_OK(result["Value"])
1071+
1072+
# Atomic swap
1073+
sql = (
1074+
"RENAME TABLE PilotsHistorySummary TO PilotsHistorySummary_old,"
1075+
"PilotsHistorySummary_staging TO PilotsHistorySummary;"
1076+
"DROP TABLE PilotsHistorySummary_old;"
1077+
)
1078+
return self._update(sql)
10671079

10681080
def getSummarySnapshot(self, requestedFields=False):
10691081
"""Get the summary snapshot for a given combination"""
10701082
if not requestedFields:
10711083
requestedFields = ["GridSite", "GridType", "Status"]
1072-
valueFields = ["COUNT(PilotID)"]
10731084
defString = ", ".join(requestedFields)
1074-
valueString = ", ".join(valueFields)
1085+
valueString = "COUNT(PilotID) AS PilotCount"
10751086
final_states = "', '".join(PilotStatus.PILOT_FINAL_STATES)
10761087
final_states = f"'{final_states}'"
10771088

1078-
query = f"SELECT {defString}, {valueString} FROM ("
1089+
query = f"SELECT {defString}, SUM(PilotCount) AS PilotCount FROM ("
10791090
# All Pilots that are NOT in a final state
10801091
query += (
1081-
f"SELECT {defString}, {valueString}, COUNT(PilotID) "
1082-
f"FROM PilotsAgents WHERE STATUS NOT IN ({final_states}) "
1083-
f"GROUP BY {defString}, {valueString} "
1092+
f"SELECT {defString}, {valueString} "
1093+
f"FROM PilotAgents WHERE STATUS NOT IN ({final_states}) "
1094+
f"GROUP BY {defString} "
10841095
)
10851096
query += "UNION ALL "
10861097
# Recent Pilots only (today) that are in a final state
10871098
query += (
1088-
f"SELECT {defString}, {valueString}, COUNT(PilotID) "
1089-
f"FROM Pilots WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1090-
f"GROUP BY {defString}, {valueString} "
1099+
f"SELECT {defString}, {valueString} "
1100+
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime >= UTC_DATE() "
1101+
f"GROUP BY {defString} "
10911102
)
10921103
query += "UNION ALL "
10931104
# Cached history (of Pilots in a final state)
1094-
query += f"SELECT * FROM PilotsHistorySummary) AS combined GROUP BY {defString}, {valueString}"
1105+
query += f"SELECT * FROM PilotsHistorySummary) AS combined GROUP BY {defString}"
10951106

1096-
result = self._query(query)
1097-
if not result["OK"]:
1098-
return result
1099-
return S_OK(((requestedFields + valueFields), result["Value"]))
1107+
return self._query(query)
11001108

11011109

11021110
class PivotedPilotSummaryTable:

tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@
2929
import tempfile
3030
import time
3131

32-
import DIRAC
3332
import pytest
3433

34+
import DIRAC
35+
3536
DIRAC.initialize() # Initialize configuration
3637

3738
from DIRAC import gLogger
@@ -406,7 +407,6 @@ def test_JobStateUpdateAndJobMonitoringMultiple(lfn: str) -> None:
406407
try:
407408
res = jobMonitoringClient.getSites()
408409
assert res["OK"], res["Message"]
409-
assert set(res["Value"]) <= {"ANY", "DIRAC.Jenkins.ch", "Site"}
410410

411411
res = jobMonitoringClient.getJobTypes()
412412
assert res["OK"], res["Message"]

tests/Integration/WorkloadManagementSystem/Test_JobDB.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -450,16 +450,13 @@ def test_attributes(jobDB):
450450
def process_data(jobIDs, data):
451451
converted_data = []
452452

453-
print(data[0])
454453
full_data = []
455454

456455
for j, d in zip(jobIDs, data):
457456
row = list(d)
458457
row.insert(0, j) # Insert JobID at the beginning of the row
459458
full_data.append(row)
460459

461-
print(full_data[0])
462-
463460
for row in full_data:
464461
# date fields
465462
date_indices = [8, 9, 10, 11, 12, 13] # Positions of date fields
@@ -483,12 +480,25 @@ def process_data(jobIDs, data):
483480
except ValueError:
484481
# Handle invalid integers
485482
row[i] = 0
486-
# Convert boolean fields
487483
converted_data.append(tuple(row))
488484
return converted_data
489485

490486

491-
def test_summarySnapshot(jobDB: JobDB):
487+
def test_summarySnapshot():
488+
jobDB = JobDB()
489+
for table in [
490+
"InputData",
491+
"JobParameters",
492+
"AtticJobParameters",
493+
"HeartBeatLoggingInfo",
494+
"OptimizerParameters",
495+
"JobCommands",
496+
"Jobs",
497+
"JobJDLs",
498+
]:
499+
sqlCmd = f"DELETE from `{table}`"
500+
jobDB._update(sqlCmd)
501+
492502
# insert some predefined jobs to test the summary snapshot
493503
with open("jobs.csv", newline="", encoding="utf-8") as csvfile:
494504
csvreader = csv.reader(csvfile)
@@ -514,3 +524,11 @@ def test_summarySnapshot(jobDB: JobDB):
514524
assert res["OK"], res["Message"]
515525
res = jobDB.getSummarySnapshot()
516526
assert res["OK"], res["Message"]
527+
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
528+
defString = ", ".join(requestedFields)
529+
simple_query = f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum FROM Jobs GROUP BY {defString}"
530+
res_sq = jobDB._query(simple_query)
531+
assert res_sq["OK"], res_sq["Message"]
532+
print(len(res_sq["Value"]))
533+
print(len(res["Value"]))
534+
assert sorted(res_sq["Value"]) == sorted(res["Value"])

tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# pylint: disable=wrong-import-position
77

88
import csv
9-
9+
from datetime import datetime
1010
from unittest.mock import patch
1111

1212
import DIRAC
@@ -190,17 +190,65 @@ def test_PivotedPilotSummaryTable():
190190
cleanUpPilots(pilotRef)
191191

192192

193+
# Parse date strings into datetime objects
194+
def process_data(data):
195+
converted_data = []
196+
197+
for row in data:
198+
# date fields
199+
date_indices = [10, 11] # Positions of date fields
200+
for i in date_indices:
201+
if not row[i]:
202+
row[i] = None
203+
else:
204+
try:
205+
row[i] = datetime.strptime(row[i], "%Y-%m-%d %H:%M:%S")
206+
except ValueError:
207+
# Handle invalid dates
208+
row[i] = None
209+
# Convert other fields to appropriate types
210+
int_indices = [0, 1] # Positions of integer fields
211+
for i in int_indices:
212+
if not row[i]:
213+
row[i] = 0
214+
else:
215+
try:
216+
row[i] = int(row[i])
217+
except ValueError:
218+
# Handle invalid integers
219+
row[i] = 0
220+
float_indices = [9] # Positions of float fields
221+
for i in float_indices:
222+
if not row[i]:
223+
row[i] = 0
224+
else:
225+
try:
226+
row[i] = float(row[i])
227+
except ValueError:
228+
# Handle invalid float
229+
row[i] = 0
230+
converted_data.append(tuple(row))
231+
return converted_data
232+
233+
193234
def test_summarySnapshot():
194-
# insert some predefined jobs to test the summary snapshot
235+
# insert some predefined pilots to test the summary snapshot
195236
with open("pilots.csv", newline="", encoding="utf-8") as csvfile:
196237
csvreader = csv.reader(csvfile)
197238
data = list(csvreader)
198-
placeholders = ",".join(["%s"] * len(data[0]))
199-
sql = f"INSERT INTO PilotAgents VALUES ({placeholders})"
200-
res = paDB._updatemany(sql, data)
239+
processed_data = process_data(data)
240+
placeholders = ",".join(["%s"] * len(processed_data[0]))
241+
sql = f"INSERT INTO PilotAgents (InitialJobID, CurrentJobID, PilotJobReference, PilotStamp, DestinationSite, Queue, GridSite, VO, GridType, BenchMark, SubmissionTime, LastUpdateTime, Status, StatusReason, AccountingSent) VALUES ({placeholders})"
242+
res = paDB._updatemany(sql, processed_data)
201243
assert res["OK"], res["Message"]
202244
# Act
203245
res = paDB.fillPilotsHistorySummary()
204246
assert res["OK"], res["Message"]
205247
res = paDB.getSummarySnapshot()
206248
assert res["OK"], res["Message"]
249+
requestedFields = ["GridSite", "GridType", "Status"]
250+
defString = ", ".join(requestedFields)
251+
simple_query = f"SELECT {defString}, COUNT(PilotID) AS PilotCount FROM PilotAgents GROUP BY {defString}"
252+
res_sq = paDB._query(simple_query)
253+
assert res_sq["OK"], res_sq["Message"]
254+
assert sorted(res_sq["Value"]) == sorted(res["Value"])

tests/Integration/WorkloadManagementSystem/Test_WMSAdministratorClient.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ def test_WMSAdministratorClient():
1313

1414
res = wmsAdministrator.getSiteSummaryWeb({}, [], 0, 100)
1515
assert res["OK"], res["Message"]
16-
assert res["Value"]["TotalRecords"] in [0, 1, 2, 34]
1716

1817
res = wmsAdministrator.getSiteSummarySelectors()
1918
assert res["OK"], res["Message"]

0 commit comments

Comments
 (0)