Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions core/imageroot/usr/local/agent/pypkg/agent/tasks/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ def runp_brief(tasks, **kwargs):

async def _runp(tasks, **kwargs):

if 'progress_callback' in kwargs and not kwargs['progress_callback'] is None:
parent_cbk = kwargs.pop('progress_callback', None)
if parent_cbk:
# Equally distribute the progress weight of each task
parent_cbk = kwargs['progress_callback']
del kwargs['progress_callback']
runp_progress = [0] * len(tasks)
last_value = -1
def create_task_cbk(idx):
Expand All @@ -87,11 +86,11 @@ def task_progress_callback(p, idx):
if curr_value > last_value:
last_value = curr_value
parent_cbk(curr_value)
else:
parent_cbk = None

nowait = kwargs.pop('nowait', False)
kwargs.setdefault('check_idle_time', 0 if nowait else 8) # Check the client connection is alive
task_extra_arg = kwargs.pop('extra', {})
on_sigterm_tasks = kwargs.pop('on_sigterm_tasks', [])

with AsyncSignalHandler(asyncio.get_running_loop(), signal.SIGTERM) as cancel_handler:
runners = []
Expand All @@ -102,8 +101,8 @@ def task_progress_callback(p, idx):
if not 'parent' in taskrq:
taskrq['parent'] = os.getenv("AGENT_TASK_ID", "")

if 'extra' in kwargs:
taskrq['extra'] = kwargs['extra']
if task_extra_arg:
taskrq['extra'] = task_extra_arg

if parent_cbk:
task_cbk = create_task_cbk(idx)
Expand All @@ -117,8 +116,19 @@ def task_progress_callback(p, idx):

runners.append(asyncio.create_task(tcoro, name=taskrq['agent_id'] + '/' + taskrq['action']))

return await asyncio.gather(*runners, return_exceptions=(len(tasks) > 1))
# Register a list of callback non-blocking tasks,
# triggered if the process receives SIGTERM.
for st in on_sigterm_tasks:
sigterm_task = st.copy() # avoid mutating caller dict
sigterm_task.setdefault('parent', os.getenv("AGENT_TASK_ID", ""))
sigterm_task.setdefault('data', {})
def generate_sigterm_task_handler(taskhd=sigterm_task): # bind current task into closure
kwargs_copy = kwargs.copy()
kwargs_copy["check_idle_time"] = 0
return _run_with_protocol_nowait(taskhd, **kwargs_copy)
cancel_handler.add_callback(generate_sigterm_task_handler)

return await asyncio.gather(*runners, return_exceptions=(len(tasks) > 1))

async def _run_with_protocol(taskrq, **pconn):
pconn.setdefault('progress_callback', None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import os
import sys
import json
import agent
import subprocess
import agent.volumes

request = json.load(sys.stdin)
rdb = agent.redis_connect(privileged=False)

try:
host = subprocess.run(['hostname', '-f'], text=True, capture_output=True, check=True).stdout.strip()
except:
host = "myserver.example.org"
ret = { "vpn": {"host": host, "port": 55820, "network": "10.5.4.0/24"} }
ret = {
"has_additional_disks": len(agent.volumes.get_base_paths()) > 0,
"vpn": {
"host": agent.get_hostname(),
"port": 55820,
"network": "10.5.4.0/24"
},
}

json.dump(ret, fp=sys.stdout)
Original file line number Diff line number Diff line change
@@ -1,10 +1 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "get-defaults input",
"$id": "http://schema.nethserver.org/cluster/get-defaults-input.json",
"description": "The action expects a null value as input",
"examples": [
null
],
"type": "null"
}
{}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"description": "Output schema of the get-defaults action",
"examples": [
{
"has_additional_disks": false,
"vpn": {
"host": "server.nethserver.org",
"port": 55820,
Expand All @@ -14,6 +15,10 @@
],
"type": "object",
"properties": {
"has_additional_disks": {
"description": "True if the OS has additional disks for application data",
"type": "boolean"
},
"vpn": {
"type": "object",
"description": "WireGuard VPN defaults",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python3

#
# Copyright (C) 2026 Nethesis S.r.l.
# SPDX-License-Identifier: GPL-3.0-or-later
#

import base64
import json
import os
import gzip
import subprocess
import sys
import agent

def import_destinations(cluster_backup):
"""Import backup destination configurations into Redis"""
rdb = agent.redis_connect(privileged=True)
imported_count = 0
skipped_count = 0
local_cluster_uuid = rdb.get("cluster/uuid") or "unknown_local"
import_cluster_uuid = cluster_backup["cluster"].get("uuid", "unknown_import")
for dkey, odest in cluster_backup["cluster"]["backup_repository"].items():
# Check if the destination UUID key exists. Historically, a backup
# destination was named "backup repository".
if odest["provider"] == "cluster" and local_cluster_uuid != import_cluster_uuid:
skipped_count += 1
continue # internal destination of another cluster, ignore
elif rdb.exists(f"cluster/backup_repository/{dkey}"):
skipped_count += 1
continue
print("Importing backup destination", dkey, odest["name"], file=sys.stderr)
# FIXME: validate configuration before storing odest in Redis
rdb.hset(f"cluster/backup_repository/{dkey}", mapping=odest)
imported_count += 1
return imported_count, skipped_count

def main():
request = json.load(sys.stdin)
try:
cipher_data = base64.b64decode(request["backup_data"], validate=True)
except Exception as ex:
print(agent.SD_ERR + "backup_data decode error:", ex, file=sys.stderr)
agent.set_status("validation-failed")
json.dump([{"error":"invalid_base64","field":"backup_data","parameter":"backup_data","value":""}], fp=sys.stdout)
sys.exit(2)

passphrase_fd, wfd = os.pipe()
os.write(wfd, request["backup_password"].encode("utf-8") + b"\n")
# NOTE: close wfd, assuming for simplicity that password is few bytes
# short and fits the pipe buffer size:
os.close(wfd)
gpg_cmd = [
"gpg", "--decrypt", "--batch",
"--pinentry-mode", "loopback",
"--passphrase-fd", str(passphrase_fd),
]
gpg_proc = subprocess.run(gpg_cmd,
input=cipher_data,
stdout=subprocess.PIPE,
pass_fds=[passphrase_fd]
)
os.close(passphrase_fd)
if gpg_proc.returncode != 0:
agent.set_status("validation-failed")
json.dump([{"error":"invalid_passphrase","field":"backup_password","parameter":"backup_password","value":""}], fp=sys.stdout)
sys.exit(3)
json_data = gzip.decompress(gpg_proc.stdout)
cluster_backup = json.loads(json_data)
imported_count, skipped_count = import_destinations(cluster_backup)
json.dump({"imported_destinations": imported_count, "skipped_destinations": skipped_count}, fp=sys.stdout)

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "import-backup-destinations input",
"$id": "http://schema.nethserver.org/cluster/import-backup-destinations-input.json",
"description": "Import backup destination configurations from raw cluster backup data",
"examples": [
{
"backup_password": "mypassword",
"backup_data": "jA0ECQMC2c...94FL/8y2KV"
}
],
"type": "object",
"required": [
"backup_password",
"backup_data"
],
"properties": {
"backup_password": {
"description": "GPG passphrase to decrypt backup_data field",
"type": "string",
"maxLength": 4096
},
"backup_data": {
"description": "base64-encoded string of GPG-encrypted, gzip-compressed backup data",
"type": "string",
"contentEncoding": "base64",
"contentMediaType": "application/octet-stream"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "import-backup-destinations output",
"$id": "http://schema.nethserver.org/cluster/import-backup-destinations-output.json",
"description": "Import backup destination configurations from raw cluster backup data",
"examples": [
{
"imported_destinations": 2,
"skipped_destinations": 1
}
],
"type": "object",
"required": [
"imported_destinations",
"skipped_destinations"
],
"properties": {
"imported_destinations": {
"description": "Number of backup destinations successfully imported",
"type": "integer",
"minimum": 0
},
"skipped_destinations": {
"description": "Number of backup destinations not imported because already configured in the cluster DB",
"type": "integer",
"minimum": 0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ else:
},
})

# If restore-module receives a SIGTERM, remove the half-restored module:
remove_module_on_sigterm = {
"agent_id": "cluster",
"action": "remove-module",
"data": {
"module_id": module_id,
"force": True,
"preserve_data": False,
},
"extra": {'isNotificationHidden': True},
}

restore_task_result = agent.tasks.run("module/" + module_id, "restore-module",
data={
"repository": repository,
Expand All @@ -99,10 +111,12 @@ restore_task_result = agent.tasks.run("module/" + module_id, "restore-module",
"replace": replace_requested or len(remove_tasks) == 0
},
endpoint="redis://cluster-leader",
on_sigterm_tasks=[remove_module_on_sigterm],
progress_callback=agent.get_progress_callback(16, 99 if len(remove_tasks) == 0 else 94),
)

agent.assert_exp(restore_task_result['exit_code'] == 0)
if restore_task_result['exit_code'] != 0:
sys.exit(1)

if len(remove_tasks) > 0 and replace_requested:
# Remove existing modules with the same MODULE_UUID:
Expand Down