Skip to content

Commit 32b65b9

Browse files
authored
Task Runner E2E: Simplify the components' start and stop processes (#1514)
* Run resiliency 3 times Signed-off-by: noopur <noopur@intel.com> * Run 50 rounds Signed-off-by: noopur <noopur@intel.com> * Extra debugging Signed-off-by: noopur <noopur@intel.com> * Extra debugging Signed-off-by: noopur <noopur@intel.com> * Testing Signed-off-by: noopur <noopur@intel.com> * Testing Signed-off-by: noopur <noopur@intel.com> * Modified Signed-off-by: noopur <noopur@intel.com> * Remove extra logging Signed-off-by: noopur <noopur@intel.com> * Remove extra logging Signed-off-by: noopur <noopur@intel.com> * Kill process as a one-liner Signed-off-by: noopur <noopur@intel.com> * Kill process as a one-liner Signed-off-by: noopur <noopur@intel.com> * Upgraded pytest from 8.3.4 to 8.3.5 Signed-off-by: noopur <noopur@intel.com> * Specific functions to fetch pids and kill processes Signed-off-by: noopur <noopur@intel.com> * Multiple changes done Signed-off-by: noopur <noopur@intel.com> * File changes as part of lint fixing Signed-off-by: noopur <noopur@intel.com> * Minor logging fixes and copyright year correction Signed-off-by: noopur <noopur@intel.com> * Review comments addressed Signed-off-by: noopur <noopur@intel.com> * Revert pytest version upgrade change Signed-off-by: noopur <noopur@intel.com> * Check if start_process is present Signed-off-by: noopur <noopur@intel.com> --------- Signed-off-by: noopur <noopur@intel.com>
1 parent 4521a4a commit 32b65b9

File tree

8 files changed

+169
-103
lines changed

8 files changed

+169
-103
lines changed

tests/end_to_end/models/aggregator.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
# Copyright 2020-2023 Intel Corporation
1+
# Copyright 2020-2025 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
33

44
import logging
55
import os
66
import tempfile
77

8-
import tests.end_to_end.utils.constants as constants
98
import tests.end_to_end.utils.exceptions as ex
109
import tests.end_to_end.utils.federation_helper as fh
10+
import tests.end_to_end.utils.ssh_helper as ssh
1111

1212

1313
log = logging.getLogger(__name__)
@@ -37,6 +37,7 @@ def __init__(self, agg_domain_name, workspace_path, eval_scope=False, container_
3737
self.container_id = container_id
3838
self.tensor_db_file = os.path.join(self.workspace_path, "local_state", "tensor.db")
3939
self.res_file = None # Result file to track the logs
40+
self.start_process = None # Process associated with the aggregator start command
4041

4142
def generate_sign_request(self):
4243
"""
@@ -70,16 +71,24 @@ def start(self):
7071
log_file = os.path.join("logs", "aggregator.log")
7172
self.res_file = os.path.join(self.workspace_path, log_file)
7273

73-
command = f"LOG_FILE={log_file} {constants.AGG_START_CMD}"
74+
command = ["fx", "aggregator", "start"]
7475
if self.eval_scope:
75-
command += " --task_group evaluation"
76-
fh.run_command(
77-
command,
78-
error_msg=error_msg,
79-
container_id=self.container_id,
80-
workspace_path=self.workspace_path,
81-
run_in_background=True,
82-
bg_file=os.path.join(tempfile.mkdtemp(), "tmp.log"), # this file is simply to keep the process running
76+
command.append("--task_group")
77+
command.append("evaluation")
78+
log.info(f"Command for {self.name}: {command}")
79+
80+
# Set the log file path for the aggregator process
81+
env = os.environ.copy()
82+
env["LOG_FILE"] = log_file
83+
84+
# open file in append mode, so that restarting scenarios can be handled
85+
bg_file = open(os.path.join(tempfile.mkdtemp(), "tmp.log"), "a", buffering=1)
86+
self.start_process = ssh.run_command_background(
87+
cmd=command,
88+
work_dir=self.workspace_path,
89+
redirect_to_file=bg_file,
90+
check_sleep=60,
91+
env=env
8392
)
8493

8594
log.info(
@@ -90,6 +99,21 @@ def start(self):
9099
raise e
91100
return True
92101

102+
def kill_process(self):
103+
"""
104+
Kill the process of the aggregator and wait for it to finish
105+
"""
106+
try:
107+
if self.start_process:
108+
self.start_process.kill()
109+
self.start_process.wait()
110+
self.start_process = None
111+
else:
112+
log.warning("No process found for aggregator")
113+
except Exception as e:
114+
log.error(f"Failed to kill the process: {e}")
115+
raise ex.ProcessKillException(f"Failed to kill the process: {e}")
116+
93117
def modify_data_file(self, data_file, col_name, index):
94118
"""
95119
Modify the data.yaml file for the model

tests/end_to_end/models/collaborator.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
# Copyright 2020-2023 Intel Corporation
1+
# Copyright 2020-2025 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
33

44
import os
55
import logging
66
import tempfile
77

8-
import tests.end_to_end.utils.constants as constants
98
import tests.end_to_end.utils.exceptions as ex
109
import tests.end_to_end.utils.federation_helper as fh
10+
import tests.end_to_end.utils.ssh_helper as ssh
1111

1212
log = logging.getLogger(__name__)
1313

@@ -38,6 +38,7 @@ def __init__(self, collaborator_name=None, data_directory_path=None, workspace_p
3838
self.workspace_path = workspace_path
3939
self.container_id = container_id
4040
self.res_file = None # Result file to track the logs
41+
self.start_process = None # Process associated with the aggregator start command
4142

4243
def generate_sign_request(self):
4344
"""
@@ -130,13 +131,21 @@ def start(self):
130131
log_file = os.path.join("logs", f"{self.collaborator_name}.log")
131132
self.res_file = os.path.join(self.workspace_path, log_file)
132133

133-
fh.run_command(
134-
command=f"LOG_FILE={log_file} {constants.COL_START_CMD.format(self.collaborator_name)}",
135-
error_msg=error_msg,
136-
container_id=self.container_id,
137-
workspace_path=self.workspace_path,
138-
run_in_background=True,
139-
bg_file=os.path.join(tempfile.mkdtemp(), "tmp.log"), # this file is simply to keep the process running
134+
command = ["fx", "collaborator", "start", "-n", self.collaborator_name]
135+
log.info(f"Command for {self.name}: {command}")
136+
137+
# Set the log file path for the collaborator process
138+
env = os.environ.copy()
139+
env["LOG_FILE"] = log_file
140+
141+
# open file in append mode, so that restarting scenarios can be handled
142+
bg_file = open(os.path.join(tempfile.mkdtemp(), "tmp.log"), "a", buffering=1)
143+
self.start_process = ssh.run_command_background(
144+
cmd=command,
145+
work_dir=self.workspace_path,
146+
redirect_to_file=bg_file,
147+
check_sleep=60,
148+
env=env
140149
)
141150

142151
log.info(
@@ -147,6 +156,22 @@ def start(self):
147156
raise e
148157
return True
149158

159+
def kill_process(self):
160+
"""
161+
Kill the process of the collaborator and wait for it to finish
162+
"""
163+
try:
164+
if self.start_process:
165+
self.start_process.kill()
166+
self.start_process.wait()
167+
self.start_process = None
168+
else:
169+
log.warning(f"No process found for {self.collaborator_name}")
170+
171+
except Exception as e:
172+
log.error(f"Failed to kill the process: {e}")
173+
raise ex.ProcessKillException(f"Failed to kill the process: {e}")
174+
150175
def import_workspace(self):
151176
"""
152177
Import the workspace

tests/end_to_end/test_suites/tr_resiliency_tests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ def _perform_restart_validate_rounds(fed_obj, db_file, total_rounds):
279279
"""
280280

281281
init_round = fed_helper.get_current_round(db_file)
282+
log.info(f"Round number is {init_round} before restarts")
282283

283284
# Restart aggregator
284285
assert int_helper.restart_participants([fed_obj.aggregator])

tests/end_to_end/utils/exceptions.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2020-2023 Intel Corporation
1+
# Copyright 2020-2025 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
33

44
"""Module consists of custom exceptions for end to end testing"""
@@ -124,3 +124,8 @@ class GaNDLFConfigSegException(Exception):
124124
class FlowerAppException(Exception):
125125
"""Exception for Flower app"""
126126
pass
127+
128+
129+
class ProcessKillException(Exception):
130+
"""Exception for process kill"""
131+
pass

tests/end_to_end/utils/federation_helper.py

Lines changed: 29 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2020-2023 Intel Corporation
1+
# Copyright 2020-2025 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
33

44
import time
@@ -17,6 +17,7 @@
1717
import tests.end_to_end.utils.db_helper as db_helper
1818
import tests.end_to_end.utils.docker_helper as dh
1919
import tests.end_to_end.utils.exceptions as ex
20+
import tests.end_to_end.utils.interruption_helper as intr_helper
2021
import tests.end_to_end.utils.ssh_helper as ssh
2122
from tests.end_to_end.models import collaborator as col_model
2223

@@ -356,20 +357,18 @@ def _verify_completion_for_participant(
356357

357358
time.sleep(45)
358359

359-
# Verify that the process is completed successfully
360-
get_process_id = constants.AGG_START_CMD if participant.name == "aggregator" else constants.COL_START_CMD.format(participant.name)
361-
362-
# Find the process ID
363-
pids = []
364-
for line in os.popen(f"ps ax | grep '{get_process_id}' | grep -v grep"):
365-
fields = line.split()
366-
pids.append(fields[0])
367-
368-
if not pids:
369-
log.info(f"No processes found for participant {participant.name}")
370-
break
360+
# If process.poll() has a value, it means the process has completed
361+
# If None, it means the process is still running
362+
# This is applicable for native process only
363+
if participant.start_process:
364+
if participant.start_process.poll():
365+
log.info(f"No processes found for participant {participant.name}")
366+
break
367+
else:
368+
log.info(f"Process is yet to complete for {participant.name}")
371369
else:
372-
log.info(f"Process is yet to complete for {participant.name}")
370+
# Dockerized workspace scenario
371+
log.info(f"No process found for participant {participant.name}")
373372

374373
# Read tensor.db file for aggregator to check if the process is completed
375374
if participant.name == "aggregator" and num_rounds > 1:
@@ -1077,52 +1076,27 @@ def set_keras_backend(model_name):
10771076
return [f"KERAS_BACKEND={backend}"]
10781077

10791078

1080-
def remove_stale_processes(num_collaborators=0, envoys=[], director=False):
1079+
def remove_stale_processes(aggregator=None, collaborators=[], director=None, envoys=[]):
10811080
"""
10821081
Remove stale processes
1082+
Args:
1083+
aggregator (object): Aggregator object
1084+
collaborators (list): List of collaborator objects
1085+
director (object): Director object
1086+
envoys (list): List of envoy objects
10831087
"""
1084-
if num_collaborators > 0:
1085-
log.info("Removing stale processes..")
1086-
# Remove any stale processes
1087-
try:
1088-
for i in range(1, num_collaborators + 1):
1089-
subprocess.run(
1090-
f"sudo kill -9 $(ps -ef | grep 'collaborator{i}' | awk '{{print $2}}')",
1091-
shell=True,
1092-
check=True,
1093-
stdout=subprocess.DEVNULL,
1094-
stderr=subprocess.DEVNULL
1095-
)
1096-
subprocess.run(
1097-
"sudo kill -9 $(ps -ef | grep 'aggregator' | awk '{{print $2}}')",
1098-
shell=True,
1099-
check=True,
1100-
stdout=subprocess.DEVNULL,
1101-
stderr=subprocess.DEVNULL
1102-
)
1103-
except subprocess.CalledProcessError as e:
1104-
log.warning(f"Failed to kill processes: {e}")
1088+
if aggregator:
1089+
intr_helper.kill_processes(aggregator.name)
1090+
1091+
for collaborators in collaborators:
1092+
intr_helper.kill_processes(collaborators.name)
11051093

11061094
if director:
1107-
try:
1108-
subprocess.run(
1109-
"sudo kill -9 $(ps -ef | grep 'director' | awk '{{print $2}}')",
1110-
shell=True,
1111-
check=True,
1112-
)
1113-
except subprocess.CalledProcessError as e:
1114-
log.warning(f"Failed to kill processes: {e}")
1115-
1116-
if envoys:
1117-
for envoy in envoys:
1118-
try:
1119-
subprocess.run(
1120-
f"sudo kill -9 $(ps -ef | grep '{envoy}' | awk '{{print $2}}')",
1121-
shell=True,
1122-
check=True,
1123-
)
1124-
except subprocess.CalledProcessError as e:
1125-
log.warning(f"Failed to kill processes: {e}")
1095+
intr_helper.kill_processes("director")
1096+
1097+
for envoy in envoys:
1098+
intr_helper.kill_processes(envoy)
1099+
11261100
log.info("Stale processes (if any) removed successfully")
11271101

11281102

tests/end_to_end/utils/interruption_helper.py

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
import concurrent.futures
66
import time
7-
import os
7+
import psutil
88
import subprocess # nosec B404
99

1010
import tests.end_to_end.utils.constants as constants
@@ -75,30 +75,65 @@ def stop_start_native_participant(participant, action):
7575
if action not in ["stop", "start"]:
7676
raise ex.ParticipantStopException(f"Invalid action {action}")
7777

78-
if action == "stop":
79-
log.info(f"Stopping participant {participant.name}")
80-
cmd_for_process_kill = constants.AGG_START_CMD if participant.name == "aggregator" else constants.COL_START_CMD.format(participant.name)
81-
pids = []
82-
# Find the process ID
83-
for line in os.popen(f"ps ax | grep '{cmd_for_process_kill}' | grep -v grep"):
84-
fields = line.split()
85-
pids.append(fields[0])
86-
87-
if not pids:
88-
raise RuntimeError(f"No processes found for command '{cmd_for_process_kill}'")
89-
90-
# Kill all processes using sudo
91-
for pid in pids:
92-
try:
93-
subprocess.run(['sudo', 'kill', '-9', pid], check=True)
94-
except subprocess.CalledProcessError as e:
95-
raise RuntimeError(f"Failed to kill process '{pid}': {e}")
78+
# Irrespective of the action, kill the processes to ensure clean state
79+
log.info(f"Kill the processes for {participant.name} if running to avoid conflicts")
80+
participant.kill_process()
9681

82+
if action == "stop":
83+
log.info(f"Stopped {participant.name} successfully")
9784
else:
9885
try:
99-
log.info(f"Starting participant {participant.name}")
10086
participant.start()
87+
log.info(f"Started {participant.name} successfully")
10188
except Exception as e:
102-
raise ex.ParticipantStartException(f"Error starting participant: {e}")
89+
raise ex.ParticipantStartException(f"Error starting participant {participant.name}: {e}")
10390

10491
return True
92+
93+
94+
def get_pids_for_active_command(command):
95+
"""
96+
Get the process IDs of the given command if it is running.
97+
98+
Args:
99+
command (str): The command to check.
100+
101+
Returns:
102+
list: List of process IDs if the command is running, otherwise an empty list.
103+
"""
104+
pids = []
105+
for proc in psutil.process_iter(['pid', 'cmdline']):
106+
try:
107+
cmdline = proc.info['cmdline']
108+
if isinstance(cmdline, list):
109+
cmdline = ' '.join(cmdline)
110+
if command in cmdline:
111+
pids.append(proc.info['pid'])
112+
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
113+
continue
114+
return pids
115+
116+
117+
def kill_processes(command_to_kill, fail_if_not_found=False):
118+
"""
119+
Kill all processes for the given command.
120+
121+
Args:
122+
command_to_kill (str): The command to kill.
123+
fail_if_not_found (bool): Fail if given process is not found.
124+
125+
Returns:
126+
bool: True if processes were killed, False otherwise.
127+
"""
128+
try:
129+
pids = get_pids_for_active_command(command_to_kill)
130+
log.info(f"PIDs for command '{command_to_kill}': {pids}")
131+
# Kill each process
132+
for pid in pids:
133+
subprocess.run(['sudo', 'kill', '-9', str(pid)], check=fail_if_not_found)
134+
log.info(f"Killed process with PID {pid}")
135+
return True
136+
except subprocess.CalledProcessError:
137+
if fail_if_not_found:
138+
raise RuntimeError(f"Failed to kill process with PID {pid}")
139+
return False

0 commit comments

Comments
 (0)