Skip to content

Commit 4634073

Browse files
feat: per-file size tracking for ntuples and monitoring improvements (#31)
* track per-file sizes for ntuples * use per-file sizes for analysis workload * track rate in xrdcp notebook
1 parent 882dda3 commit 4634073

File tree

5 files changed

+155
-97
lines changed

5 files changed

+155
-97
lines changed

atlas/ntuple_production/cache_everything.ipynb

Lines changed: 0 additions & 85 deletions
This file was deleted.

atlas/ntuple_production/collect_file_metadata.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,22 @@ def rucio_file_paths(name, num_files_expected):
8282
"""file paths from rucio list-file-replicas call"""
8383
cmd = f"rucio list-file-replicas --protocols root {name}"
8484
output = subprocess.check_output(cmd, shell=True)
85-
rses_and_paths = re.findall(r"(\w+): (root:\/\/.*?)\s", output.decode())
85+
size_unit_rse_path = re.findall(r"(\d+\.\d+)\s(\wB).+?([\w-]+): (root:\/\/.*?)\s", output.decode())
8686

8787
# select a single RSE for each file
88-
filenames = sorted(set([rp[1].split("/")[-1] for rp in rses_and_paths]))
88+
filenames = sorted(set([rp[-1].split("/")[-1] for rp in size_unit_rse_path]))
8989
unique_paths = []
90+
sizes_GB = []
9091
for filename in filenames:
91-
fpaths = [rp for rp in rses_and_paths if filename in rp[1]]
92+
matches = [m for m in size_unit_rse_path if filename in m[-1]]
9293
# pick MWT2_UC_LOCALGROUPDISK match by default, otherwise first in the list
93-
fpath = next((fp for fp in fpaths if fp[0] == "MWT2_UC_LOCALGROUPDISK"), fpaths[0])[1]
94-
unique_paths.append(fpath)
94+
match = next((m for m in matches if m[2] == "MWT2_UC_LOCALGROUPDISK"), matches[0])
95+
unique_paths.append(match[3])
96+
size_to_GB = lambda num, unit: float(num) * {"kB": 1e-6, "MB": 1e-3, "GB": 1}[unit]
97+
sizes_GB.append(size_to_GB(*match[:2]))
9598

9699
assert len(unique_paths) == num_files_expected
97-
return unique_paths
100+
return unique_paths, sizes_GB
98101

99102

100103
def process_one_category(category, container_list, production_map):
@@ -132,8 +135,10 @@ def process_one_category(category, container_list, production_map):
132135
metadata[container]["size_output_GB"] = info_output["size_GB"]
133136

134137
# add xrootd file paths
135-
paths = rucio_file_paths(production_map[container]["output"], info_output["nfiles"])
138+
paths, sizes = rucio_file_paths(production_map[container]["output"], info_output["nfiles"])
136139
metadata[container]["files_output"] = paths
140+
metadata[container]["sizes_output_GB"] = sizes
141+
assert abs(sum(sizes) - info_output["size_GB"]) < 0.01 # agree within 10 MB
137142

138143
return {category: metadata}
139144

@@ -168,4 +173,4 @@ def save_full_metadata(production_map, fname, max_workers=8):
168173
production_map = parse_job_json(fname_bigpanda)
169174

170175
fname_full = "file_metadata.json.gz"
171-
metadata = save_full_metadata(production_map, fname_full)
176+
metadata = save_full_metadata(production_map, fname_full, max_workers=8)
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "ff6ca0fb-e358-4dbe-9951-e5167758d030",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"import datetime\n",
11+
"import gzip\n",
12+
"import json\n",
13+
"import time\n",
14+
"import os\n",
15+
"\n",
16+
"import dask\n",
17+
"from dask.distributed import Client\n",
18+
"import numpy as np\n",
19+
"import matplotlib.dates as mdates\n",
20+
"import matplotlib.pyplot as plt\n",
21+
"\n",
22+
"client = Client(\"tls://localhost:8786\")"
23+
]
24+
},
25+
{
26+
"cell_type": "code",
27+
"execution_count": null,
28+
"id": "d55ec891-e0c2-4f43-ba35-2077ecafcc07",
29+
"metadata": {},
30+
"outputs": [],
31+
"source": [
32+
"def get_input(max_size_GB = None):\n",
33+
" with gzip.open(\"file_metadata.json.gz\") as f:\n",
34+
" dataset_info = json.loads(f.read().decode())\n",
35+
"\n",
36+
" all_files = []\n",
37+
" all_sizes_GB = []\n",
38+
" for containers_for_category in dataset_info.values():\n",
39+
" for container, metadata in containers_for_category.items():\n",
40+
" if metadata[\"files_output\"] is None:\n",
41+
" continue\n",
42+
" for fname, size in zip(metadata[\"files_output\"], metadata[\"sizes_output_GB\"]):\n",
43+
" all_files.append(fname)\n",
44+
" all_sizes_GB.append(size)\n",
45+
" if max_size_GB and sum(all_sizes_GB) > max_size_GB:\n",
46+
" return all_files, all_sizes_GB\n",
47+
" return all_files, all_sizes_GB\n",
48+
"\n",
49+
"all_files, all_sizes_GB = get_input(max_size_GB = None) # limit list to specific total size\n",
50+
"print(f\"list of {len(all_files)} files with total size {sum(all_sizes_GB):.2f} GB\")"
51+
]
52+
},
53+
{
54+
"cell_type": "code",
55+
"execution_count": null,
56+
"id": "1c5cb8c1-ebf6-41f7-a12d-feff9a0456b8",
57+
"metadata": {},
58+
"outputs": [],
59+
"source": [
60+
"def run_xrdcp(fname, size):\n",
61+
" t0 = time.time()\n",
62+
" os.system(f\"xrdcp {fname} /dev/null -f\")\n",
63+
" t1 = time.time()\n",
64+
" return {\"t0\": t0, \"t1\": t1, \"GBread\": size}\n",
65+
"\n",
66+
"t0 = time.time()\n",
67+
"tasks = [dask.delayed(run_xrdcp)(fname, size) for fname, size in zip(all_files, all_sizes_GB)]\n",
68+
"res = dask.compute(*tasks)\n",
69+
"t1 = time.time()"
70+
]
71+
},
72+
{
73+
"cell_type": "markdown",
74+
"id": "c04faacc-8136-4ef9-910a-98b8660cb4d6",
75+
"metadata": {},
76+
"source": [
77+
"track egress: [link](https://grafana.mwt2.org/d/EKefjM-Sz/af-network-200gbps-challenge?orgId=1&from=now-1h&to=now&viewPanel=panel-205&refresh=5s)"
78+
]
79+
},
80+
{
81+
"cell_type": "code",
82+
"execution_count": null,
83+
"id": "dd03f327-857a-4d80-a013-3a13549de37f",
84+
"metadata": {},
85+
"outputs": [],
86+
"source": [
87+
"total_runtime_sum = sum(r[\"t1\"] - r[\"t0\"] for r in res)\n",
88+
"\n",
89+
"print(f\"processtime: {total_runtime_sum:.2f} s\")\n",
90+
"print(f\" -> data rate per worker: {sum(all_sizes_GB) * 8 / total_runtime_sum:.2f} Gbps\")\n",
91+
"\n",
92+
"print(f\"walltime: {t1-t0:.2f} s\")\n",
93+
"print(f\" -> total data rate: {sum(all_sizes_GB) * 8 / (t1-t0):.2f} Gbps\")\n",
94+
"\n",
95+
"starts = np.asarray([r[\"t0\"] for r in res])\n",
96+
"ends = np.asarray([r[\"t1\"] for r in res])\n",
97+
"GBread = [r[\"GBread\"] for r in res]\n",
98+
"rates_per_chunk = GBread / (ends - starts)\n",
99+
"\n",
100+
"t_samples = np.linspace(t0, t1, 100)\n",
101+
"rate_samples = []\n",
102+
"for t in t_samples:\n",
103+
" mask = np.logical_and((starts <= t), (t < ends))\n",
104+
" rate_samples.append(float(sum(rates_per_chunk[mask]) * 8))\n",
105+
"\n",
106+
"print(f\"total data read from data rate integral: {sum((t_samples[1] - t_samples[0]) * np.asarray(rate_samples)) / 8:.2f} GB\")\n",
107+
"t_samples = [datetime.datetime.fromtimestamp(t) for t in t_samples.tolist()]\n",
108+
"\n",
109+
"fig, ax = plt.subplots()\n",
110+
"ax.plot(t_samples, rate_samples, marker=\"v\", linewidth=0)\n",
111+
"ax.set_xlabel(\"time\")\n",
112+
"ax.tick_params(axis=\"x\", labelrotation=45)\n",
113+
"ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))\n",
114+
"ax.set_ylabel(\"data rate [Gbps]\")\n",
115+
"ax.set_ylim([0, ax.get_ylim()[1] * 1.1])\n",
116+
"fig.savefig(\"xrdcp_rate.png\")"
117+
]
118+
}
119+
],
120+
"metadata": {
121+
"kernelspec": {
122+
"display_name": "Python 3 (ipykernel)",
123+
"language": "python",
124+
"name": "python3"
125+
},
126+
"language_info": {
127+
"codemirror_mode": {
128+
"name": "ipython",
129+
"version": 3
130+
},
131+
"file_extension": ".py",
132+
"mimetype": "text/x-python",
133+
"name": "python",
134+
"nbconvert_exporter": "python",
135+
"pygments_lexer": "ipython3",
136+
"version": "3.12.11"
137+
}
138+
},
139+
"nbformat": 4,
140+
"nbformat_minor": 5
141+
}
36.1 KB
Binary file not shown.

atlas/utils.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,6 @@ def get_fileset(campaign_filter: list | None = None, dsid_filter: list | None =
238238
with gzip.open(fname) as f:
239239
dataset_info = json.loads(f.read().decode())
240240

241-
if max_files_per_sample is not None:
242-
print(f"[WARNING] limiting files per sample to {max_files_per_sample}, input size estimate is invalid")
243-
244241
# construct fileset
245242
fileset = {}
246243
input_size_GB = 0
@@ -266,7 +263,7 @@ def get_fileset(campaign_filter: list | None = None, dsid_filter: list | None =
266263
"files": dict((path, "reco") for path in metadata["files_output"][:num_files]),
267264
"metadata": {"dsid": dsid, "campaign": campaign, "category": category, "weight_xs": weight_xs, "lumi": lumi}
268265
}
269-
input_size_GB += metadata["size_output_GB"]
266+
input_size_GB += sum(metadata["sizes_output_GB"][:num_files])
270267

271268
print(f"[INFO] fileset has {len(fileset)} categories with {sum([len(f["files"]) for f in fileset.values()])} files total, size is {input_size_GB:.2f} GB")
272269
return fileset, input_size_GB

0 commit comments

Comments
 (0)