Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cave/range.py
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to also change how ipv4 is used with Interfaces so the api is consistent in that matter. So that Interfaces work with the cidr representation as well instead of prefix_length and ipv4

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be wise also do this for ipv6 in general as well right?

Copy link
Copy Markdown
Owner

@sn0ja sn0ja Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah but ipv6 is not even fully implemented yet, i open an issue for that #2

Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ def add_pool(self, name: str, path: str):
self.pool.create(self.libvirt_connection)
return self.pool

def add_network(self, name: str, mode: str, ipv4: str, ipv4_subnet: str, ingress_route_subnet=None, ingress_route_gateway=None):
def add_network(self, name: str, mode: str, ipv4_cidr: str, ingress_route_subnet: str="", ingress_route_gateway: str="") -> Network:

n = Network(name=name,
host_isolated=True if mode == "" else False,
ipv4=ipv4, ipv4_subnet=ipv4_subnet,
ipv4_cidr=ipv4_cidr,
isolate_guests=False,
ipv6="", ipv6_prefix="",
ipv6_cidr="",
mode=mode,
ingress_route_subnet=ingress_route_subnet, ingress_route_gateway=ingress_route_gateway)

Expand All @@ -68,18 +68,18 @@ def add_network(self, name: str, mode: str, ipv4: str, ipv4_subnet: str, ingress
self.networks.append(n)
return n

def add_management_network(self, name: str, ipv4: str, ipv4_subnet: str):
def add_management_network(self, name: str, ipv4_cidr: str, ):
if self.management_network:
logger.error("Multiple management networks for a single range is not supported for now")
return

n = Network(name=name,
host_isolated=False,
ipv4=ipv4, ipv4_subnet=ipv4_subnet,
ipv4_cidr=ipv4_cidr,
isolate_guests=False,
ipv6="", ipv6_prefix="",
ipv6_cidr="",
mode="open",
ingress_route_subnet=None, ingress_route_gateway=None)
ingress_route_subnet="", ingress_route_gateway="")

n.create(self.libvirt_connection)
self.networks.append(n)
Expand Down Expand Up @@ -287,7 +287,7 @@ def cleanup(self):
for domain in self.domains:
logger.info(f"removing management network interface for domain {domain.name}")
domain.remove_interface_for_network_name(self.management_network.name)
self.management_network.rm()
self.management_network.remove()
self.networks.remove(self.management_network)
logger.info(f"done removing management network {self.management_network.name}")

Expand Down
3 changes: 1 addition & 2 deletions cave/src/domains/interface.py
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other components of cave (e.g. this line) use the ipv4 and prefix_length properties, so we cannot remove them.
Maybe change Interface to a normal class and do the calculation of ipv4 and prefix_length from ipv4_cidr in the constructor like done in network.py?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could then also include sanity checks in the constructor e.g. assert "/" in ipv4_cidr, this is not a clean solution but we are just building a prototype here for now.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to introduce a utility module to ease the introduction of functionalities, which are reused throughout cave? This would also ease the introduction of argument validation etc. in the future.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now it is not really necessary

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
class Interface:
mac: str
network: Network
ipv4: str
prefix_length: int
ipv4_cidr: str
is_mngmt: bool

139 changes: 120 additions & 19 deletions cave/src/networks/network/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,100 @@
import jinja2
import re

from libvirt import virConnect


def cidr_to_netmask(cidr: str) -> tuple[str, str]:
"""
Converts an IPv4 address in CIDR notation into a tuple of IP address and network mask.

Parameters
----------
cidr : str
IPv4 address in CIDR notation.

Returns
-------
tuple[str, str]
Tuple of IP address and netmask.
"""
interface = ipaddress.IPv4Interface(cidr)
address, netmask = interface.with_netmask.split("/")
return address, netmask

class Network(object):
"""

Args:
conn: libvirt connection object
mode: set this to empty string for isolated virt-network
ipv4: first ip in the network, not !net-id!, if dhcp or dns is used this will be the address of the dhcp/dns server
ipv4_subnetmask: subnetmask e.g. 255.255.255.0
ipv6: first ipv6 in ipv6 range of network
ipv6_prefix: e.g. 64
Object defining a network in the libvirt cyber range.

Parameters
----------
name : str
Name of the network to be defined
host_isolated : bool, default: False
If the libvirt host should be isolated from the systems in the range.
ipv4_cidr : str, optional
IPv4 address in CIDR notation which the libvirt host should use.
isolate_guest : bool, default: False
If the guests on this network should be isolated from eachother.
ipv6_cidr: str, optional
The IPv6 addrtess for the libvirt host in CIDR notation.
mode : str, optional
Network mode for the network.
ingress_route_subnet : str, optional
Subnet the libvirt host should route into the cyber range.
ingress_route_gateway : str, optional
Nexthop to use for routing traffic into the cyber range.
"""
RELATIVE_TEMPLATE_PATH = "network.jinja.xml"

def __init__(self, name: str, host_isolated=None, ipv4="", ipv4_subnet="", isolate_guests=None, ipv6="", ipv6_prefix="64", mode="", ingress_route_subnet=None, ingress_route_gateway=None ):
def __init__(
self,
name: str,
host_isolated: bool = False,
ipv4_cidr: str = "",
isolate_guests: bool = False,
ipv6_cidr: str = "",
mode: str = "",
ingress_route_subnet: str = "",
ingress_route_gateway: str = ""
):

if ipv4_cidr:
ipv4_tuple = cidr_to_netmask(ipv4_cidr)
self.ipv4 = ipv4_tuple[0]
self.ipv4_subnet = ipv4_tuple[1]
else:
self.ipv4 = ""
self.ipv4_subnet = ""

if ipv6_cidr:
ipv6_tuple = ipv6_cidr.split("/")
self.ipv6 = ipv6_tuple[0]
self.ipv6_prefix = ipv6_tuple[1]
else:
self.ipv6 = ""
self.ipv6_subnet = ""
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean self.ipv6_prefix = "" instead of self.ipv6_subnet = "" ?



self.name = name
self.mode = mode
self.ipv4 = ipv4
self.ipv4_subnet = ipv4_subnet
self.ipv6 = ipv6
self.ipv6_prefix = ipv6_prefix
self.host_isolated = True if host_isolated else False
self.host_isolated = host_isolated
self.isolate_guests = "yes" if isolate_guests else "no"
self.host_mac = None
self.ingress_route_subnet = ipaddress.ip_network(ingress_route_subnet) if ingress_route_subnet else None
self.ingress_route_gateway = ingress_route_gateway if ingress_route_gateway else None



def _get_config(self) -> dict:
"""
Turns the current configuration into a dictionary.

Returns
-------
dict
The curent configuration.
"""
config = {
"name": self.name,
"mode": self.mode,
Expand All @@ -47,19 +113,43 @@ def _get_config(self) -> dict:
}
return config

def to_dict(self):
def to_dict(self) -> dict:
"""
Returns dict of name and mode.

Returns
-------
dict
Name and mode of the network.
"""
return {"name":self.name,
"mode":self.mode}

def get_host_mac_from_xml(self):
def get_host_mac_from_xml(self) -> str:
"""
Retrieves the MAC address of the network from the definition XML.

Returns
-------
str
MAC address of the network.
"""
#<mac address='52:54:00:0e:4b:74'/>
assert self.libvirt_network
matches = re.search(r"<mac address='(?P<mac>.*)'/>", self.libvirt_network.XMLDesc())
if not matches:
raise Exception(f"no mac found in in network definition for {self.name} unable to set isolate host iptables rules")
return matches.group('mac')

def create(self, conn):
def create(self, conn: virConnect):
"""
Creates the network defined by this object.

Parameters
----------
conn : virConnect
The connection to the libvirt host, where the network should be created.
"""
# lookupByName throws if the name is not found
template_path = f"{os.path.dirname(
__file__)}/{Network.RELATIVE_TEMPLATE_PATH}"
Expand All @@ -70,15 +160,26 @@ def create(self, conn):
self.libvirt_network.create()
self.host_mac = self.get_host_mac_from_xml()

def rm(self):
def remove(self):
"""Removes the network."""
assert self.libvirt_network
if self.libvirt_network.isPersistent():
self.libvirt_network.undefine()
if self.libvirt_network.isActive():
self.libvirt_network.destroy()

@staticmethod
def destroy_by_name(conn, name):
def destroy_by_name(conn: virConnect, name: str):
"""
Destroys a network by a given name.

Parameters
----------
conn : virConnect
Libvirt conection where the network should be destroyed.
name : str
Name of the network which should be destroyed.
"""
network = conn.networkLookupByName(name)
network.destroy()
network.undefine()
Expand Down
24 changes: 8 additions & 16 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,20 @@

# isolated network
mngt = range.add_management_network(name="mngmt",
ipv4="10.10.0.1",
ipv4_subnet="255.255.255.0")
ipv4_cidr="10.10.0.1/24")

n1 = range.add_network(name="network1",
ipv4="",
ipv4_subnet="",
ipv4_cidr="",
mode="")

i1 = Interface(mac="",
network=n1,
ipv4="10.10.1.2",
prefix_length=24,
ipv4_cidr="10.10.1.2/24",
is_mngmt=False)

im = Interface(mac="",
network=mngt,
ipv4="10.10.0.2",
prefix_length=24,
ipv4_cidr="10.10.0.2/24",
is_mngmt=True)

ubuntu_img = f"{REMOTE_IMAGES_DIR}/{os.path.basename(UBUNTU_IMAGE)}"
Expand All @@ -87,28 +83,24 @@

i1 = Interface(mac="",
network=n1,
ipv4="10.10.1.4",
prefix_length=24,
ipv4_cidr="10.10.1.4/24",
is_mngmt=False)

im = Interface(mac="",
network=mngt,
ipv4="10.10.0.4",
prefix_length=24,
ipv4_cidr="10.10.0.4/24",
is_mngmt=True)

ld = range.add_linux_domain("linux02", "lin02", ubuntu_img, [i1, im], "pw", 8, 1024, 2, "6001", "no", "0.0.0.0", "10.10.1.1", "1.1.1.1", "10.10.0.1")

i1 = Interface(mac="",
network=n1,
ipv4="10.10.1.3",
prefix_length=24,
ipv4_cidr="10.10.1.3/24",
is_mngmt=False)

im = Interface(mac="",
network=mngt,
ipv4="10.10.0.3",
prefix_length=24,
ipv4_cidr="10.10.0.3",
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is here a /24 missing?

is_mngmt=True)

windows_server22_iso = f"{REMOTE_IMAGES_DIR}/{os.path.basename(WINDOWS_SERVER_ISO)}"
Expand Down