diff --git a/core/imageroot/usr/local/agent/pypkg/agent/tasks/run.py b/core/imageroot/usr/local/agent/pypkg/agent/tasks/run.py index 542d596e4..4b3767d13 100644 --- a/core/imageroot/usr/local/agent/pypkg/agent/tasks/run.py +++ b/core/imageroot/usr/local/agent/pypkg/agent/tasks/run.py @@ -72,10 +72,9 @@ def runp_brief(tasks, **kwargs): async def _runp(tasks, **kwargs): - if 'progress_callback' in kwargs and not kwargs['progress_callback'] is None: + parent_cbk = kwargs.pop('progress_callback', None) + if parent_cbk: # Equally distribute the progress weight of each task - parent_cbk = kwargs['progress_callback'] - del kwargs['progress_callback'] runp_progress = [0] * len(tasks) last_value = -1 def create_task_cbk(idx): @@ -87,11 +86,11 @@ def task_progress_callback(p, idx): if curr_value > last_value: last_value = curr_value parent_cbk(curr_value) - else: - parent_cbk = None nowait = kwargs.pop('nowait', False) kwargs.setdefault('check_idle_time', 0 if nowait else 8) # Check the client connection is alive + task_extra_arg = kwargs.pop('extra', {}) + on_sigterm_tasks = kwargs.pop('on_sigterm_tasks', []) with AsyncSignalHandler(asyncio.get_running_loop(), signal.SIGTERM) as cancel_handler: runners = [] @@ -102,8 +101,8 @@ def task_progress_callback(p, idx): if not 'parent' in taskrq: taskrq['parent'] = os.getenv("AGENT_TASK_ID", "") - if 'extra' in kwargs: - taskrq['extra'] = kwargs['extra'] + if task_extra_arg: + taskrq['extra'] = task_extra_arg if parent_cbk: task_cbk = create_task_cbk(idx) @@ -117,8 +116,19 @@ def task_progress_callback(p, idx): runners.append(asyncio.create_task(tcoro, name=taskrq['agent_id'] + '/' + taskrq['action'])) - return await asyncio.gather(*runners, return_exceptions=(len(tasks) > 1)) + # Register a list of callback non-blocking tasks, + # triggered if the process receives SIGTERM. + for st in on_sigterm_tasks: + sigterm_task = st.copy() # avoid mutating caller dict + sigterm_task.setdefault('parent', os.getenv("AGENT_TASK_ID", "")) + sigterm_task.setdefault('data', {}) + def generate_sigterm_task_handler(taskhd=sigterm_task): # bind current task into closure + kwargs_copy = kwargs.copy() + kwargs_copy["check_idle_time"] = 0 + return _run_with_protocol_nowait(taskhd, **kwargs_copy) + cancel_handler.add_callback(generate_sigterm_task_handler) + return await asyncio.gather(*runners, return_exceptions=(len(tasks) > 1)) async def _run_with_protocol(taskrq, **pconn): pconn.setdefault('progress_callback', None) diff --git a/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/50read b/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/50read index 6cb4deb13..806d46588 100755 --- a/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/50read +++ b/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/50read @@ -24,15 +24,15 @@ import os import sys import json import agent -import subprocess +import agent.volumes -request = json.load(sys.stdin) -rdb = agent.redis_connect(privileged=False) - -try: - host = subprocess.run(['hostname', '-f'], text=True, capture_output=True, check=True).stdout.strip() -except: - host = "myserver.example.org" -ret = { "vpn": {"host": host, "port": 55820, "network": "10.5.4.0/24"} } +ret = { + "has_additional_disks": len(agent.volumes.get_base_paths()) > 0, + "vpn": { + "host": agent.get_hostname(), + "port": 55820, + "network": "10.5.4.0/24" + }, +} json.dump(ret, fp=sys.stdout) diff --git a/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/validate-input.json b/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/validate-input.json index ef6547451..0967ef424 100644 --- a/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/validate-input.json +++ b/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/validate-input.json @@ -1,10 +1 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "get-defaults input", - "$id": "http://schema.nethserver.org/cluster/get-defaults-input.json", - "description": "The action expects a null value as input", - "examples": [ - null - ], - "type": "null" -} +{} diff --git a/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/validate-output.json b/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/validate-output.json index 4217f59f8..33aae7d05 100644 --- a/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/validate-output.json +++ b/core/imageroot/var/lib/nethserver/cluster/actions/get-defaults/validate-output.json @@ -5,6 +5,7 @@ "description": "Output schema of the get-defaults action", "examples": [ { + "has_additional_disks": false, "vpn": { "host": "server.nethserver.org", "port": 55820, @@ -14,6 +15,10 @@ ], "type": "object", "properties": { + "has_additional_disks": { + "description": "True if the OS has additional disks for application data", + "type": "boolean" + }, "vpn": { "type": "object", "description": "WireGuard VPN defaults", diff --git a/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/10import_backup_destinations b/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/10import_backup_destinations new file mode 100755 index 000000000..f2cdf7a43 --- /dev/null +++ b/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/10import_backup_destinations @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 + +# +# Copyright (C) 2026 Nethesis S.r.l. +# SPDX-License-Identifier: GPL-3.0-or-later +# + +import base64 +import json +import os +import gzip +import subprocess +import sys +import agent + +def import_destinations(cluster_backup): + """Import backup destination configurations into Redis""" + rdb = agent.redis_connect(privileged=True) + imported_count = 0 + skipped_count = 0 + local_cluster_uuid = rdb.get("cluster/uuid") or "unknown_local" + import_cluster_uuid = cluster_backup["cluster"].get("uuid", "unknown_import") + for dkey, odest in cluster_backup["cluster"]["backup_repository"].items(): + # Check if the destination UUID key exists. Historically, a backup + # destination was named "backup repository". + if odest["provider"] == "cluster" and local_cluster_uuid != import_cluster_uuid: + skipped_count += 1 + continue # internal destination of another cluster, ignore + elif rdb.exists(f"cluster/backup_repository/{dkey}"): + skipped_count += 1 + continue + print("Importing backup destination", dkey, odest["name"], file=sys.stderr) + # FIXME: validate configuration before storing odest in Redis + rdb.hset(f"cluster/backup_repository/{dkey}", mapping=odest) + imported_count += 1 + return imported_count, skipped_count + +def main(): + request = json.load(sys.stdin) + try: + cipher_data = base64.b64decode(request["backup_data"], validate=True) + except Exception as ex: + print(agent.SD_ERR + "backup_data decode error:", ex, file=sys.stderr) + agent.set_status("validation-failed") + json.dump([{"error":"invalid_base64","field":"backup_data","parameter":"backup_data","value":""}], fp=sys.stdout) + sys.exit(2) + + passphrase_fd, wfd = os.pipe() + os.write(wfd, request["backup_password"].encode("utf-8") + b"\n") + # NOTE: close wfd, assuming for simplicity that password is few bytes + # short and fits the pipe buffer size: + os.close(wfd) + gpg_cmd = [ + "gpg", "--decrypt", "--batch", + "--pinentry-mode", "loopback", + "--passphrase-fd", str(passphrase_fd), + ] + gpg_proc = subprocess.run(gpg_cmd, + input=cipher_data, + stdout=subprocess.PIPE, + pass_fds=[passphrase_fd] + ) + os.close(passphrase_fd) + if gpg_proc.returncode != 0: + agent.set_status("validation-failed") + json.dump([{"error":"invalid_passphrase","field":"backup_password","parameter":"backup_password","value":""}], fp=sys.stdout) + sys.exit(3) + json_data = gzip.decompress(gpg_proc.stdout) + cluster_backup = json.loads(json_data) + imported_count, skipped_count = import_destinations(cluster_backup) + json.dump({"imported_destinations": imported_count, "skipped_destinations": skipped_count}, fp=sys.stdout) + +if __name__ == "__main__": + main() diff --git a/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/validate-input.json b/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/validate-input.json new file mode 100644 index 000000000..5155f48fe --- /dev/null +++ b/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/validate-input.json @@ -0,0 +1,30 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "import-backup-destinations input", + "$id": "http://schema.nethserver.org/cluster/import-backup-destinations-input.json", + "description": "Import backup destination configurations from raw cluster backup data", + "examples": [ + { + "backup_password": "mypassword", + "backup_data": "jA0ECQMC2c...94FL/8y2KV" + } + ], + "type": "object", + "required": [ + "backup_password", + "backup_data" + ], + "properties": { + "backup_password": { + "description": "GPG passphrase to decrypt backup_data field", + "type": "string", + "maxLength": 4096 + }, + "backup_data": { + "description": "base64-encoded string of GPG-encrypted, gzip-compressed backup data", + "type": "string", + "contentEncoding": "base64", + "contentMediaType": "application/octet-stream" + } + } +} diff --git a/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/validate-output.json b/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/validate-output.json new file mode 100644 index 000000000..f9b13d4ad --- /dev/null +++ b/core/imageroot/var/lib/nethserver/cluster/actions/import-backup-destinations/validate-output.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "import-backup-destinations output", + "$id": "http://schema.nethserver.org/cluster/import-backup-destinations-output.json", + "description": "Import backup destination configurations from raw cluster backup data", + "examples": [ + { + "imported_destinations": 2, + "skipped_destinations": 1 + } + ], + "type": "object", + "required": [ + "imported_destinations", + "skipped_destinations" + ], + "properties": { + "imported_destinations": { + "description": "Number of backup destinations successfully imported", + "type": "integer", + "minimum": 0 + }, + "skipped_destinations": { + "description": "Number of backup destinations not imported because already configured in the cluster DB", + "type": "integer", + "minimum": 0 + } + } +} diff --git a/core/imageroot/var/lib/nethserver/cluster/actions/restore-module/50restore_module b/core/imageroot/var/lib/nethserver/cluster/actions/restore-module/50restore_module index ee9e14041..260756c2c 100755 --- a/core/imageroot/var/lib/nethserver/cluster/actions/restore-module/50restore_module +++ b/core/imageroot/var/lib/nethserver/cluster/actions/restore-module/50restore_module @@ -89,6 +89,18 @@ else: }, }) +# If restore-module receives a SIGTERM, remove the half-restored module: +remove_module_on_sigterm = { + "agent_id": "cluster", + "action": "remove-module", + "data": { + "module_id": module_id, + "force": True, + "preserve_data": False, + }, + "extra": {'isNotificationHidden': True}, +} + restore_task_result = agent.tasks.run("module/" + module_id, "restore-module", data={ "repository": repository, @@ -99,10 +111,12 @@ restore_task_result = agent.tasks.run("module/" + module_id, "restore-module", "replace": replace_requested or len(remove_tasks) == 0 }, endpoint="redis://cluster-leader", + on_sigterm_tasks=[remove_module_on_sigterm], progress_callback=agent.get_progress_callback(16, 99 if len(remove_tasks) == 0 else 94), ) -agent.assert_exp(restore_task_result['exit_code'] == 0) +if restore_task_result['exit_code'] != 0: + sys.exit(1) if len(remove_tasks) > 0 and replace_requested: # Remove existing modules with the same MODULE_UUID: