Skip to content

Commit da5ee7c

Browse files
committed
NodeTasksBase
1 parent e5b86b0 commit da5ee7c

File tree

9 files changed

+308
-42
lines changed

9 files changed

+308
-42
lines changed

src/cluster_tasks/scenarios/scenario_base.py

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import asyncio
21
import logging
3-
import time
42
from abc import ABC, abstractmethod
53

64
from ext_api.proxmox_api import ProxmoxAPI
5+
from cluster_tasks.tasks.node_tasks_sync import NodeTasksSync
76

87
logger = logging.getLogger("CT.{__name__}")
98

@@ -12,26 +11,7 @@ class ScenarioBase(ABC):
1211
def __init__(self, api: ProxmoxAPI):
1312
self.name = self.__class__.__name__
1413
self.api: ProxmoxAPI = api
15-
16-
@staticmethod
17-
def get_status(api: ProxmoxAPI, node: str, upid: str):
18-
result = api.nodes(node).tasks(upid).status.get(filter_keys="status")
19-
return result
20-
21-
def wait_task_done(self, api: ProxmoxAPI, node: str, upid: str):
22-
while self.get_status(api, node, upid) != "stopped":
23-
logger.info("Waiting for task to finish...")
24-
time.sleep(2)
25-
26-
@staticmethod
27-
async def async_get_status(api: ProxmoxAPI, node: str, upid: str):
28-
result = await api.nodes(node).tasks(upid).status.get(filter_keys="status")
29-
return result
30-
31-
async def async_wait_task_done(self, api: ProxmoxAPI, node: str, upid: str):
32-
while await self.async_get_status(api, node, upid) != "stopped":
33-
logger.info("Waiting for task to finish...")
34-
await asyncio.sleep(2)
14+
self.node_tasks = NodeTasksSync(api)
3515

3616
@abstractmethod
3717
def run(self):

src/cluster_tasks/scenarios/scenario_clone_template_vm.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,33 @@ def __init__(self, api: ProxmoxAPI):
1515

1616
def configure(self, config):
1717
self.node = config.get("node")
18-
self.vmid = config.get("vmid")
18+
self.vmid = int(config.get("vmid", 0))
1919
self.newid = config.get("newid")
2020
self.name = config.get("name")
2121
self.full = int(config.get("full", True))
2222

2323
def run(self):
2424
print(f"Running Scenario Template: {self.template_name} at {self.source_node}")
2525
# Perform the specific API logic for this scenario
26-
27-
# data = {"newid": int(self.newid), "name": self.name, "full": self.full}
28-
with self.api as api:
29-
present = (
30-
api.nodes(self.node)
31-
.qemu(self.newid)
32-
.status.current.get(filter_keys="vmid")
33-
)
34-
logger.info(f"result: {present} {self.newid} {self.newid==present}")
35-
if present and int(present) == int(self.newid):
36-
logger.info(f"VM {self.newid} already exists - Deleteting...")
37-
upid = api.nodes(self.node).qemu(self.newid).delete()
38-
logger.info(f"result: {upid}")
39-
self.wait_task_done(api, self.node, upid)
40-
data = {"newid": int(self.newid), "name": self.name, "full": self.full}
41-
upid = api.nodes(self.node).qemu(self.vmid).clone.create(data=data)
42-
logger.info(f"result: {upid}, data: {data}")
43-
self.wait_task_done(api, self.node, upid)
44-
logger.info(f"VM {self.newid} created")
26+
try:
27+
# Open a connection session of the Proxmox API
28+
with self.api:
29+
# Check is VM already exists
30+
present = self.node_tasks.vm_status(self.node, self.newid)
31+
if present:
32+
# If VM already exists, delete it
33+
logger.info(f"VM {self.newid} already exists - Deleting...")
34+
is_deleted = self.node_tasks.vm_delete(self.node, self.newid)
35+
if is_deleted:
36+
logger.info(f"VM {self.newid} deleted successfully")
37+
else:
38+
raise Exception(f"Failed to delete VM {self.newid}")
39+
# Clone the VM from the template
40+
data = {"newid": int(self.newid), "name": self.name, "full": self.full}
41+
is_created = self.node_tasks.vm_clone(self.node, self.vmid, data)
42+
if is_created:
43+
logger.info(f"VM {self.newid} cloned successfully")
44+
else:
45+
raise Exception(f"Failed to clone VM {self.newid}")
46+
except Exception as e:
47+
logger.error(f"Failed to run scenario: {e}")

src/cluster_tasks/scenarios_configs.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
API:
2+
backend: "https"
3+
type: "sync"
14
Scenarios:
25
CloneTemplateVM:
36
file: "clone_template_vm"

src/cluster_tasks/tasks/__init__.py

Whitespace-only changes.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from datetime import timedelta
2+
3+
from ext_api.proxmox_api import ProxmoxAPI
4+
5+
6+
class BaseTasks:
7+
TIMEOUT = 60
8+
LOOP_SLEEP = 2
9+
10+
def __init__(self, api: ProxmoxAPI):
11+
self._api: ProxmoxAPI = api
12+
13+
@property
14+
def api(self):
15+
return self._api
16+
17+
@api.setter
18+
def api(self, value):
19+
self._api = value
20+
21+
@staticmethod
22+
def format_duration(seconds: float | int) -> str:
23+
"""Format a duration (in seconds) as HH:MM:SS."""
24+
return str(timedelta(seconds=seconds)).split(".")[0]
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import asyncio
2+
import logging
3+
4+
from cluster_tasks.tasks.base_tasks import BaseTasks
5+
from cluster_tasks.tasks.node_tasks_base import NodeTasksBase
6+
7+
logger = logging.getLogger("CT.{__name__}")
8+
9+
10+
class NodeTasksAsync(NodeTasksBase):
11+
"""
12+
NodeTasks is a class for managing tasks related to virtual machines
13+
on a Proxmox node. This includes operations like checking task status,
14+
waiting for tasks to complete, and performing VM operations (e.g., clone, delete).
15+
16+
Inherits from `NodeTasksBase`, which provides common functionality for API interaction.
17+
18+
Methods:
19+
vm_status(node: str, vm_id: int) -> int:
20+
Retrieves the current status of a virtual machine by its ID.
21+
22+
vm_delete(node: str, vm_id: int, wait: bool = True) -> str | bool | None:
23+
Deletes a virtual machine and optionally waits for the task to finish.
24+
25+
vm_clone(node: str, vm_id: int, data: dict, wait: bool = True) -> str | bool | None:
26+
Clones a virtual machine and optionally waits for the task to finish.
27+
"""
28+
29+
async def vm_status(self, node: str, vm_id: int) -> int:
30+
"""
31+
Retrieves the current status of a virtual machine.
32+
33+
Args:
34+
node (str): The name of the Proxmox node.
35+
vm_id (int): The ID of the virtual machine.
36+
37+
Returns:
38+
int: The status of the virtual machine (e.g., running, stopped).
39+
"""
40+
status_vm = (
41+
await self.api.nodes(node)
42+
.qemu(vm_id)
43+
.status.current.get(filter_keys="vmid")
44+
)
45+
return int(status_vm) if status_vm else 0
46+
47+
async def vm_delete(
48+
self, node: str, vm_id: int, wait: bool = True
49+
) -> str | bool | None:
50+
"""
51+
Deletes a virtual machine and optionally waits for the deletion task to complete.
52+
53+
Args:
54+
node (str): The name of the Proxmox node.
55+
vm_id (int): The ID of the virtual machine.
56+
wait (bool): Whether to wait for the task to complete (default is True).
57+
58+
Returns:
59+
str | bool | None: The task UPID if `wait` is False;
60+
`True` if task is completed successfully,
61+
`False` if task timed out.
62+
"""
63+
upid = await self.api.nodes(node).qemu(vm_id).delete()
64+
if wait:
65+
return await self.wait_task_done_async(node, upid)
66+
return upid
67+
68+
async def vm_clone(
69+
self, node: str, vm_id: int, data: dict, wait: bool = True
70+
) -> str | bool | None:
71+
"""
72+
Clones a virtual machine and optionally waits for the cloning task to complete.
73+
74+
Args:
75+
node (str): The name of the Proxmox node.
76+
vm_id (int): The ID of the virtual machine to clone.
77+
data (dict): The parameters for cloning the virtual machine.
78+
wait (bool): Whether to wait for the task to complete (default is True).
79+
80+
Returns:
81+
str | bool | None: The task UPID if `wait` is False;
82+
`True` if the task finished successfully,
83+
`False` if task timed out.
84+
"""
85+
upid = await self.api.nodes(node).qemu(vm_id).clone.create(data=data)
86+
if wait:
87+
return await self.wait_task_done_async(node, upid)
88+
return upid
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import asyncio
2+
import logging
3+
import time
4+
5+
from cluster_tasks.tasks.base_tasks import BaseTasks
6+
7+
logger = logging.getLogger("CT.{__name__}")
8+
9+
10+
class NodeTasksBase(BaseTasks):
11+
"""
12+
Base class with shared logic for both sync and async task handling.
13+
Contains methods for both sync and async API calls.
14+
Methods:
15+
get_status_sync(node: str, upid: str) -> str | None:
16+
Retrieves the status of a task by its UPID.
17+
18+
wait_task_done_sync(node: str, upid: str) -> bool:
19+
Waits for a task to complete, checking the status periodically.
20+
21+
get_status_async(node: str, upid: str) -> str | None:
22+
Retrieves the status of a task by its UPID.
23+
24+
wait_task_done_async(node: str, upid: str) -> bool:
25+
Waits for a task to complete, checking the status periodically.
26+
"""
27+
28+
def get_status_sync(self, node: str, upid: str) -> str | None:
29+
"""
30+
Synchronously retrieves the status of a task identified by its UPID.
31+
"""
32+
if upid:
33+
result = self.api.nodes(node).tasks(upid).status.get(filter_keys="status")
34+
return result
35+
36+
async def get_status_async(self, node: str, upid: str) -> str | None:
37+
"""
38+
Asynchronously retrieves the status of a task identified by its UPID.
39+
"""
40+
if upid:
41+
result = (
42+
await self.api.nodes(node).tasks(upid).status.get(filter_keys="status")
43+
)
44+
return result
45+
46+
def wait_task_done_sync(self, node: str, upid: str) -> bool:
47+
"""
48+
Synchronously waits for a task to complete.
49+
"""
50+
start_time = time.time()
51+
while (result := self.get_status_sync(node, upid)) is not None:
52+
if result == "stopped":
53+
return True
54+
duration = time.time() - start_time
55+
formatted_duration = self.format_duration(duration)
56+
formatted_timeout = self.format_duration(self.TIMEOUT)
57+
logger.info(
58+
f"Waiting for task to finish... [ {formatted_duration} / {formatted_timeout} ]"
59+
)
60+
time.sleep(self.LOOP_SLEEP)
61+
if time.time() - start_time > self.TIMEOUT:
62+
logger.warning(
63+
f"Timeout reached while waiting for task to finish. {upid=}"
64+
)
65+
break
66+
return False
67+
68+
async def wait_task_done_async(self, node: str, upid: str) -> bool:
69+
"""
70+
Asynchronously waits for a task to complete.
71+
"""
72+
start_time = time.time()
73+
while (result := await self.get_status_async(node, upid)) is not None:
74+
if result == "stopped":
75+
return True
76+
duration = time.time() - start_time
77+
formatted_duration = self.format_duration(duration)
78+
formatted_timeout = self.format_duration(self.TIMEOUT)
79+
logger.info(
80+
f"Waiting for task to finish... [ {formatted_duration} / {formatted_timeout} ]"
81+
)
82+
await asyncio.sleep(self.LOOP_SLEEP)
83+
if time.time() - start_time > self.TIMEOUT:
84+
logger.warning(
85+
f"Timeout reached while waiting for task to finish. {upid=}"
86+
)
87+
break
88+
return False
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import logging
2+
import time
3+
4+
from cluster_tasks.tasks.base_tasks import BaseTasks
5+
from cluster_tasks.tasks.node_tasks_base import NodeTasksBase
6+
7+
# Creating a logger instance specific to the current module
8+
logger = logging.getLogger("CT.{__name__}")
9+
"""
10+
The logger is used for logging messages related to task status monitoring and
11+
task operations within the NodeTasks class. It is named with the format "CT.{module_name}"
12+
to reflect the source module where the log entries are generated.
13+
"""
14+
15+
16+
class NodeTasksSync(NodeTasksBase):
17+
"""
18+
NodeTasks class provides functionality for interacting with tasks on a Proxmox node.
19+
It includes methods to check task status, wait for tasks to finish, and perform
20+
operations on virtual machines (VMs), such as deleting and cloning them.
21+
22+
Inherits from `NodeTasksBase` for common API functionality.
23+
"""
24+
25+
def vm_status(self, node: str, vm_id: int) -> int:
26+
"""
27+
Retrieves the current status of a virtual machine.
28+
29+
Args:
30+
node (str): The name of the Proxmox node.
31+
vm_id (int): The ID of the virtual machine.
32+
33+
Returns:
34+
int: The status of the virtual machine (e.g., running, stopped).
35+
"""
36+
status_vm = (
37+
self.api.nodes(node).qemu(vm_id).status.current.get(filter_keys="vmid")
38+
)
39+
return int(status_vm) if status_vm else 0
40+
41+
def vm_delete(self, node: str, vm_id: int, wait: bool = True) -> str | bool | None:
42+
"""
43+
Deletes a virtual machine and optionally waits for the deletion task to complete.
44+
45+
Args:
46+
node (str): The name of the Proxmox node.
47+
vm_id (int): The ID of the virtual machine.
48+
wait (bool): Whether to wait for the task to complete (default is True).
49+
50+
Returns:
51+
str | bool | None: The task UPID if `wait` is False;
52+
`True` if task is completed successfully,
53+
`False` if task timed out.
54+
"""
55+
upid = self.api.nodes(node).qemu(vm_id).delete()
56+
if wait:
57+
return self.wait_task_done_sync(node, upid)
58+
return upid
59+
60+
def vm_clone(
61+
self, node: str, vm_id: int, data: dict, wait: bool = True
62+
) -> str | bool | None:
63+
"""
64+
Clones a virtual machine and optionally waits for the cloning task to complete.
65+
66+
Args:
67+
node (str): The name of the Proxmox node.
68+
vm_id (int): The ID of the virtual machine to clone.
69+
data (dict): The parameters for cloning the virtual machine.
70+
wait (bool): Whether to wait for the task to complete (default is True).
71+
72+
Returns:
73+
str | bool | None: The task UPID if `wait` is False;
74+
`True` if the task finished successfully,
75+
`False` if task timed out.
76+
"""
77+
upid = self.api.nodes(node).qemu(vm_id).clone.create(data=data)
78+
if wait:
79+
return self.wait_task_done_sync(node, upid)
80+
return upid

src/ext_api/utils/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)