Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions gvm/protocols/gmp/_gmpnext.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Credentials,
CredentialStoreCredentialType,
CredentialStores,
IntegrationConfigs,
OCIImageTargets,
Tasks,
)
Expand Down Expand Up @@ -927,3 +928,68 @@ def stop_task(self, task_id: EntityID) -> T:
return self._send_request_and_transform_response(
Tasks.stop_task(task_id=task_id)
)

def get_integration_config(
self, integration_config_id: EntityID, *, details: Optional[bool] = None
) -> T:
"""Request a single Integration Configuration.

Args:
integration_config_id: UUID of the integration config to request.
details: Whether to include detail information.
"""
return self._send_request_and_transform_response(
IntegrationConfigs.get_integration_config(
integration_config_id=integration_config_id, details=details
)
)

def get_integration_configs(
self,
*,
filter_string: Optional[str] = None,
filter_id: Optional[EntityID] = None,
) -> T:
"""Request a list of Integration Configurations.

Args:
filter_string: Filter term to use for the query.
filter_id: UUID of an existing filter to use for the query.
"""
return self._send_request_and_transform_response(
IntegrationConfigs.get_integration_configs(
filter_string=filter_string,
filter_id=filter_id,
)
)

def modify_integration_config(
self,
integration_config_id: EntityID,
*,
service_url: Optional[str] = None,
service_cacert: Optional[str] = None,
oidc_provider_url: Optional[str] = None,
oidc_provider_client_id: Optional[str] = None,
oidc_provider_client_secret: Optional[str] = None,
) -> T:
"""Modify an existing Integration Configuration.

Args:
integration_config_id: UUID of configuration to modify.
service_url: Integration Service URL.
service_cacert: Integration Service Certificate.
oidc_provider_url: OIDC Provider URL.
oidc_provider_client_id: OIDC Provider Client ID.
oidc_provider_client_secret: OIDC Provider Client Secret.
"""
return self._send_request_and_transform_response(
IntegrationConfigs.modify_integration_config(
integration_config_id=integration_config_id,
service_url=service_url,
service_cacert=service_cacert,
oidc_provider_url=oidc_provider_url,
oidc_provider_client_id=oidc_provider_client_id,
oidc_provider_client_secret=oidc_provider_client_secret,
)
)
4 changes: 4 additions & 0 deletions gvm/protocols/gmp/requests/next/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
Credentials,
CredentialStoreCredentialType,
)
from gvm.protocols.gmp.requests.next._integration_configs import (
IntegrationConfigs,
)
from gvm.protocols.gmp.requests.next._oci_image_targets import OCIImageTargets
from gvm.protocols.gmp.requests.next._tasks import Tasks

Expand Down Expand Up @@ -118,6 +121,7 @@
"Hosts",
"HostsOrdering",
"InfoType",
"IntegrationConfigs",
"Notes",
"Nvts",
"OCIImageTargets",
Expand Down
112 changes: 112 additions & 0 deletions gvm/protocols/gmp/requests/next/_integration_configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from typing import Optional

from gvm.errors import RequiredArgument
from gvm.protocols.core import Request
from gvm.protocols.gmp.requests import EntityID
from gvm.utils import to_bool
from gvm.xml import XmlCommand


class IntegrationConfigs:
@classmethod
def get_integration_config(
cls, integration_config_id: EntityID, *, details: Optional[bool] = None
) -> Request:
"""Request a single Integration Configuration.

Args:
integration_config_id: UUID of the integration config to request.
details: Whether to include detail information.
"""
if not integration_config_id:
raise RequiredArgument(
function=cls.get_integration_config.__name__,
argument="integration_config_id",
)

cmd = XmlCommand("get_integration_configs")
cmd.set_attribute("integration_config_id", str(integration_config_id))

if details is not None:
cmd.set_attribute("details", to_bool(details))

return cmd

@classmethod
def get_integration_configs(
cls,
*,
filter_string: Optional[str] = None,
filter_id: Optional[EntityID] = None,
) -> Request:
"""Request a list of Integration Configurations.

Args:
filter_string: Filter term to use for the query.
filter_id: UUID of an existing filter to use for the query.
"""
cmd = XmlCommand("get_integration_configs")
cmd.add_filter(filter_string, filter_id)

return cmd

@classmethod
def modify_integration_config(
cls,
integration_config_id: EntityID,
*,
service_url: Optional[str] = None,
service_cacert: Optional[str] = None,
oidc_provider_url: Optional[str] = None,
oidc_provider_client_id: Optional[str] = None,
oidc_provider_client_secret: Optional[str] = None,
) -> Request:
"""Modify an existing Integration Configuration.

Args:
integration_config_id: UUID of configuration to modify.
service_url: Integration Service URL.
service_cacert: Integration Service Certificate.
oidc_provider_url: OIDC Provider URL.
oidc_provider_client_id: OIDC Provider Client ID.
oidc_provider_client_secret: OIDC Provider Client Secret.
"""
if not integration_config_id:
raise RequiredArgument(
function=cls.modify_integration_config.__name__,
argument="integration_config_id",
)

cmd = XmlCommand("modify_integration_config")
cmd.set_attribute("uuid", str(integration_config_id))

# <service> element
service = cmd.add_element("service")

if service_url is not None:
service.add_element("url", service_url)
else:
service.add_element("url")
if service_cacert is not None:
service.add_element("cacert", service_cacert)
else:
service.add_element("cacert")

# <oidc> element
oidc = cmd.add_element("oidc")
if oidc_provider_url is not None:
oidc.add_element("oidc_provider_url", oidc_provider_url)
else:
oidc.add_element("oidc_provider_url")

oidc_client = oidc.add_element("client")
if oidc_provider_client_id is not None:
oidc_client.add_element("id", oidc_provider_client_id)
else:
oidc_client.add_element("id")
if oidc_provider_client_secret is not None:
oidc_client.add_element("secret", oidc_provider_client_secret)
else:
oidc_client.add_element("secret")

return cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2026 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# SPDX-FileCopyrightText: 2026 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

from gvm.errors import RequiredArgument


class GmpGetIntegrationConfigTestMixin:
def test_get_integration_config(self):
self.gmp.get_integration_config("uuid")

self.connection.send.has_been_called_with(
b'<get_integration_configs integration_config_id="uuid"/>'
)
self.gmp.get_integration_config(integration_config_id="uuid")

self.connection.send.has_been_called_with(
b'<get_integration_configs integration_config_id="uuid"/>'
)

def test_get_integration_config_without_id(self):
with self.assertRaises(RequiredArgument):
self.gmp.get_integration_config(integration_config_id=None)

with self.assertRaises(RequiredArgument):
self.gmp.get_integration_config("")

def test_get_integration_config_with_details(self):
self.gmp.get_integration_config(
integration_config_id="uuid", details=True
)

self.connection.send.has_been_called_with(
b'<get_integration_configs integration_config_id="uuid" details="1"/>'
)

self.gmp.get_integration_config(
integration_config_id="uuid", details=False
)

self.connection.send.has_been_called_with(
b'<get_integration_configs integration_config_id="uuid" details="0"/>'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# SPDX-FileCopyrightText: 2026 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
#


class GmpGetIntegrationConfigsTestMixin:
def test_get_integration_configs(self):
self.gmp.get_integration_configs()

self.connection.send.has_been_called_with(b"<get_integration_configs/>")

def test_get_integration_configs_with_filter_string(self):
self.gmp.get_integration_configs(filter_string="foo=bar")

self.connection.send.has_been_called_with(
b'<get_integration_configs filter="foo=bar"/>'
)

def test_get_integration_configs_with_filter_id(self):
self.gmp.get_integration_configs(filter_id="f1")

self.connection.send.has_been_called_with(
b'<get_integration_configs filt_id="f1"/>'
)
Loading
Loading