-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
144 lines (128 loc) · 6 KB
/
database.py
File metadata and controls
144 lines (128 loc) · 6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import numpy as np
import psycopg2
import pandas as pd
import sqlalchemy
from psycopg2.extensions import register_adapter, AsIs
import config_file
from transformations import memory_to_gigabytes, time_to_seconds
register_adapter(np.int64, AsIs)
def sqlalc(df, db_config):
engine = sqlalchemy.create_engine(
f'postgresql+psycopg2://{db_config["user"]}:{db_config["password"]}@{db_config["host"]}:{db_config["port"]}/{db_config["dbname"]}')
df.to_sql("every_job", engine, if_exists='append', index=False)
def transform_df(df):
df = df[["JobID", "UID", "Account", "State", "Partition", "TimelimitRaw", "Submit", "Eligible", "Elapsed", "Planned", "Start",
"End", "Priority", "ReqCPUS", "ReqMem", "ReqNodes", "ReqTRES", "QOS"]]
df = df.rename(columns={"JobID": "job_id", "UID": "user_id", "Account": "account", "State": "state",
"Partition": "partition",
"TimelimitRaw": "time_limit_raw", "Submit": "submit",
"Eligible": "eligible", "Elapsed": "elapsed", "Planned": "planned", "Start": "start_time", "End": "end_time",
"Priority": "priority", "ReqCPUS": "req_cpus",
"ReqMem": "req_mem", "ReqNodes": "req_nodes", "ReqTRES": "req_tres", "QOS": "qos"})
# Remove incomplete jobs.
df.loc[df.planned == "INVALID", "planned"] = None
df['job_id'] = pd.to_numeric(df['job_id'], errors='coerce')
df['time_limit_raw'] = pd.to_numeric(df['time_limit_raw'], errors='coerce')
df.loc[df.start_time == "Unknown", "start_time"] = None
df.loc[df.end_time == "Unknown", "end_time"] = None
df['state'] = df['state'].str.partition(' ')[0]
df = df.dropna(subset=['job_id'])
df = df.dropna(subset=['planned'])
df = df.dropna(subset=['elapsed'])
df = df.dropna(subset=['time_limit_raw'])
df = df.dropna(subset=['req_mem'])
df = df.dropna(subset=['start_time'])
# Format fields.
df['req_mem'] = df['req_mem'].apply(memory_to_gigabytes)
df['planned'] = df['planned'].apply(time_to_seconds)
df['elapsed'] = df['elapsed'].apply(time_to_seconds)
partition_enum = ['standard', 'shared', 'wholenode', 'wide', 'gpu', 'highmem', 'azure']
df = df[df['partition'].isin(partition_enum)]
state_enum = ['COMPLETED', 'CANCELLED', 'FAILED', 'REQUEUED', 'NODE_FAIL', 'PENDING', 'OUT_OF_MEMORY', 'TIMEOUT']
df = df[df['state'].isin(state_enum)]
# Change Null values for pandas.NA
df = df.fillna(pd.NA)
return df
def create_enum(conn):
command = """CREATE TYPE partition_enum AS ENUM ('standard', 'shared', 'wholenode', 'wide', 'gpu', 'highmem',
'azure')"""
with conn.cursor() as cursor: cursor.execute(command)
command = """CREATE TYPE state_enum AS ENUM ('COMPLETED', 'CANCELLED', 'FAILED', 'REQUEUED', 'NODE_FAIL', 'PENDING',
'OUT_OF_MEMORY', 'TIMEOUT', 'RUNNING')"""
with conn.cursor() as cursor: cursor.execute(command)
def create_table(conn):
command = """
CREATE TABLE IF NOT EXISTS every_job (
job_id INTEGER PRIMARY KEY,
user_id INTEGER,
account VARCHAR(255),
state state_enum,
partition partition_enum,
time_limit_raw INTEGER,
submit TIMESTAMP,
eligible TIMESTAMP,
elapsed INTEGER,
planned INTEGER,
start_time TIMESTAMP,
end_time TIMESTAMP,
priority INTEGER,
req_cpus INTEGER,
req_mem REAL,
req_nodes INTEGER,
req_tres VARCHAR(255),
QOS VARCHAR(255),
jobs_ahead_queue INTEGER,
cpus_ahead_queue INTEGER,
memory_ahead_queue REAL,
nodes_ahead_queue INTEGER,
time_limit_ahead_queue INTEGER,
jobs_running INTEGER,
cpus_running INTEGER,
memory_running REAL,
nodes_running INTEGER,
time_limit_running INTEGER,
par_jobs_ahead_queue INTEGER,
par_cpus_ahead_queue INTEGER,
par_memory_ahead_queue REAL,
par_nodes_ahead_queue INTEGER,
par_time_limit_ahead_queue INTEGER,
par_jobs_running INTEGER,
par_cpus_running INTEGER,
par_memory_running REAL,
par_nodes_running INTEGER,
par_time_limit_running INTEGER,
jobs_ahead_queue_priority INTEGER,
cpus_ahead_queue_priority INTEGER,
memory_ahead_queue_priority REAL,
nodes_ahead_queue_priority INTEGER,
time_limit_ahead_queue_priority INTEGER,
user_jobs_past_day INTEGER,
user_cpus_past_day INTEGER,
user_memory_past_day INTEGER,
user_nodes_past_day INTEGER,
user_time_limit_past_day INTEGER
)"""
with conn.cursor() as cursor: cursor.execute(command)
def initialize_db(db_config):
with psycopg2.connect(dbname=db_config["dbname"], user=db_config["user"], password=db_config["password"], host=db_config["host"],
port=db_config["port"]) as conn:
print("Connected to database")
# create_enum(conn)
create_table(conn)
if __name__ == "__main__":
db_config = {
"dbname": "sacctdata",
"user": "postgres",
"password": config_file.postgres_password,
"host": "slurm-data-loadbalancer.reu-p4.anvilcloud.rcac.purdue.edu",
"port": "5432"
}
initialize_db(db_config)
csv_path = "/home/austin/start_to_2024-05-01.csv"
df1 = pd.read_csv(csv_path, delimiter="|")
csv_path = "/home/austin/2024-05-01_to_2024-06-26.csv"
df2 = pd.read_csv(csv_path, delimiter="|")
df = pd.concat([df1, df2]).drop_duplicates(['JobID'], keep='last')
# df = df.iloc[-2_000_000:]
df = transform_df(df)
sqlalc(df, db_config)