Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions VascularFlow/Network/FlatTopHexagonalNetworkGeometry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import numpy as np


def flat_top_hexagonal_microfluidic_network(
outer_radius: float,
num_rows: int,
num_cols: int,
center: bool = False,
):
"""
Generate node coordinates and channel connectivity for a flat-top hexagonal microfluidic network.

Parameters
----------
outer_radius : float
Distance between the center of a hexagon cell and any of its vertices.
Also used as the edge length for channel spacing.
num_rows : int
Number of vertical hexagonal unit cells in the structure.
num_cols : int
Number of horizontal hexagonal unit cells in the structure.
center : bool
True → Hexagonal nodes + center inlet + outlets
False → Hexagonal nodes

Returns
-------
nodes : np.ndarray of shape (N, 2)
Array containing (x, y) coordinates of each network node.
connectivity_ci : np.ndarray of shape (M, 2)
Directed connectivity list. Each row (i, j) represents a channel
connecting inlet node i to outlet node j.

Notes
-----
* num_cols must be an odd integer.
* The generated geometry uses a flat-top hexagonal orientation.
* Connectivity is automatically constructed by identifying pairs of nodes
whose separation equals the outer_radius (i.e., nearest hex neighbors).
* Channel direction is assigned from left to right (smaller x to larger x)
to maintain consistent inlet/outlet order.
"""

# Compute hexagon spacing geometry
width = 2 * outer_radius
height = np.sqrt(3) * outer_radius

# Horizontal lattice positions using a repeating spacing pattern
horizontal_spacing = np.linspace(0, 0.75 * num_cols + 0.25, 3 * num_cols + 2)
pattern = np.array([True, True, False])
num_repeats = (len(horizontal_spacing) + 2) // 3
mask = np.tile(pattern, num_repeats)[: len(horizontal_spacing)]
new_horizontal_spacing = horizontal_spacing[mask]

# Vertical lattice positions
vertical_spacing = np.linspace(0, num_rows, 2 * num_rows + 1)

# Assemble node coordinates
nodes = []
val_list = [num_rows, num_rows + 1, num_rows + 1, num_rows]
min_val = min(val_list)

# First left column of nodes
# y_positions_left = vertical_spacing[1::2] * height
# x_left = -outer_radius
# for y in y_positions_left:
# nodes.append((x_left, y))

# Remaining columns, alternating number of nodes vertically
for q in range(2 * num_cols + 2):
val = val_list[q % 4]

if val == min_val:
y_positions = vertical_spacing[1::2] * height
else:
y_positions = vertical_spacing[::2] * height

x = new_horizontal_spacing[q] * width
for y in y_positions:
nodes.append((x, y))
# Convert to NumPy array
nodes = np.array(nodes)
nb_nodes = len(nodes)

# Build edge connectivity by checking nearest neighbors
edges = []

for i in range(nb_nodes):
for j in range(i + 1, nb_nodes):
dist = np.linalg.norm(nodes[i] - nodes[j])

# If distance equals outer_radius → they are connected
if abs(dist - outer_radius) < 1e-6:

# Orient edges left → right for flow direction consistency
if nodes[i, 0] < nodes[j, 0]:
edges.append((i, j))
elif nodes[j, 0] < nodes[i, 0]:
edges.append((j, i))
else:
# If same x-direction, break tie by y-coordinate
if nodes[i, 1] <= nodes[j, 1]:
edges.append((i, j))
else:
edges.append((j, i))

# Convert edges list to array
connectivity_ci = np.array(edges, dtype=int)

# ----------------------------------------------------
# Add center inlet/outlets if requested
# ----------------------------------------------------

if center:
x_min = np.min(nodes[:, 0])
x_max = np.max(nodes[:, 0])

tol = 1e-9

left_ids = np.where(np.abs(nodes[:, 0] - x_min) < tol)[0]
right_ids = np.where(np.abs(nodes[:, 0] - x_max) < tol)[0]

left_ids = left_ids[np.argsort(nodes[left_ids, 1])]
right_ids = right_ids[np.argsort(nodes[right_ids, 1])]

nodes_list = nodes.tolist()
edges_list = connectivity_ci.tolist()

# ------------------
# LEFT CENTRAL INLET
# ------------------

mid = len(left_ids) // 2
i = left_ids[mid - 1]
j = left_ids[mid]

p1 = nodes[i]
p2 = nodes[j]

merge_x = x_min - 0.45 * outer_radius
merge_y = 0.5 * (p1[1] + p2[1])

inlet_x = merge_x - outer_radius
inlet_y = merge_y

merge_node = len(nodes_list)
inlet_node = merge_node + 1

nodes_list.append((merge_x, merge_y))
nodes_list.append((inlet_x, inlet_y))

edges_list.append((inlet_node, merge_node))
edges_list.append((merge_node, i))
edges_list.append((merge_node, j))

# ------------------
# RIGHT OUTLETS
# ------------------

for k in range(0, len(right_ids) - 1, 2):
i = right_ids[k]
j = right_ids[k + 1]

p1 = nodes[i]
p2 = nodes[j]

merge_x = x_max + 0.45 * outer_radius
merge_y = 0.5 * (p1[1] + p2[1])

outlet_x = merge_x + outer_radius
outlet_y = merge_y

merge_node = len(nodes_list)
outlet_node = merge_node + 1

nodes_list.append((merge_x, merge_y))
nodes_list.append((outlet_x, outlet_y))

edges_list.append((i, merge_node))
edges_list.append((j, merge_node))
edges_list.append((merge_node, outlet_node))

nodes = np.array(nodes_list)
connectivity_ci = np.array(edges_list, dtype=int)

return nodes, connectivity_ci
147 changes: 147 additions & 0 deletions VascularFlow/Network/FlowNetworkPressureBCs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import numpy as np
from scipy.sparse import coo_array, csr_array



from VascularFlow.Network.NewtonSolver import newton


def flow_network_1d_pressure_boundary_condition(
connectivity_ci: np.ndarray,
boundary_nodes: np.ndarray,
boundary_pressures: np.ndarray,
resistance: float,
) -> np.ndarray:
"""
Solve fluid pressure distribution in a 1D network with linear or nonlinear pressure-flow relation.

Parameters
----------
connectivity_ci : np.ndarray of shape (C, 2)
Directed connectivity array where each row (i, j) indicates a channel from
inlet node i to outlet node j.
boundary_nodes : np.ndarray of shape (B,)
Node indices where pressure is prescribed (Dirichlet boundary nodes).
boundary_pressures : np.ndarray of shape (B,)
Corresponding fixed pressure values for each boundary node.
resistance : float
Hydraulic resistance of every channel.

Returns
-------
np.ndarray
Pressure solution for all nodes in the network.

Notes
-----
This function:
* Forms a linear or nonlinear system enforcing mass conservation at interior nodes.
* Enforces fixed pressure boundary conditions directly in the residual.
* Solves via Newton iteration (1 step for system is linear).
"""
# Extract inlet and outlet indices
inlet_id, outlet_id = np.transpose(connectivity_ci)

# Determine number of nodes
nb_nodes = max(np.max(inlet_id), np.max(outlet_id)) + 1
print(f"Network size: {nb_nodes} nodes, {len(boundary_nodes)} fixed boundaries.")

# Mark interior nodes (all except boundary)
interior_mask = np.ones(nb_nodes, dtype=bool)
interior_mask[boundary_nodes] = False

# Initial pressure guess
pressure = np.arange(nb_nodes, dtype=float)

def channel_flow(pin: np.ndarray, pout: np.ndarray) -> np.ndarray:
dp = pin - pout
#return resistance * (dp)
#return resistance * dp ** 2
return resistance * np.arctan(dp)

def dq_dpin(pin: np.ndarray, pout: np.ndarray) -> np.ndarray:
dp = pin - pout
#return resistance * np.ones_like(pin) # ∂Q/∂p_in = R
#return 2 * resistance * dp
return resistance / (1.0 + dp**2)

def dq_dpout(pin: np.ndarray, pout: np.ndarray) -> np.ndarray:
dp = pin - pout
#return -resistance * np.ones_like(pout) # ∂Q/∂p_out = -R
#return - 2 * resistance * dp
return -resistance / (1.0 + dp**2)
# --------------------------------------------------------------------------
# Node flow residuals: must equal zero at interior nodes
# --------------------------------------------------------------------------
def node_flow(p_vec: np.ndarray) -> np.ndarray:
pin = p_vec[inlet_id]
pout = p_vec[outlet_id]
flow = channel_flow(pin, pout)

# Flow imbalance: inflow - outflow = 0
return (
-np.bincount(inlet_id, weights=flow, minlength=nb_nodes) +
np.bincount(outlet_id, weights=flow, minlength=nb_nodes)
)

# --------------------------------------------------------------------------
# Jacobian Assembly
# --------------------------------------------------------------------------
def dnode_flow_dpressure(p_vec: np.ndarray) -> csr_array:
pin = p_vec[inlet_id]
pout = p_vec[outlet_id]
dQ_in = dq_dpin(pin, pout)
dQ_out = dq_dpout(pin, pout)

# Sparse assembly of Jacobian components
jac = (
coo_array((dQ_in, (outlet_id, inlet_id)), shape=(nb_nodes, nb_nodes)) -
coo_array((dQ_out, (inlet_id, outlet_id)), shape=(nb_nodes, nb_nodes)) -
coo_array((dQ_in, (inlet_id, inlet_id)), shape=(nb_nodes, nb_nodes)) +
coo_array((dQ_out, (outlet_id, outlet_id)), shape=(nb_nodes, nb_nodes))
).tocsr()

return jac

# --------------------------------------------------------------------------
# Global residual including pressure constraints at boundaries
# --------------------------------------------------------------------------
def residual(p_vec: np.ndarray) -> np.ndarray:
r = node_flow(p_vec)
r[boundary_nodes] = p_vec[boundary_nodes] - boundary_pressures
return r

# --------------------------------------------------------------------------
# Global Jacobian with BC row enforcement
# --------------------------------------------------------------------------
def jacobian(p_vec: np.ndarray) -> csr_array:
j = dnode_flow_dpressure(p_vec)

# Override BC rows
j[boundary_nodes, :] = 0
j[boundary_nodes, boundary_nodes] = 1

return j

# Progress callback for debugging / convergence info
def callback(iter_num: int, x: np.ndarray, f: np.ndarray, j) -> None:
_ = x # explicitly unused
_ = j # explicitly unused

if f is None:
print(f"Iteration {iter_num}: Initial guess assigned.")
return

residual_norm = np.linalg.norm(f, ord=np.inf)
print(f"Iteration {iter_num}: ||residual||∞ = {residual_norm:.3e}")

# Solve the system
pressure = newton(
fun=residual,
x0=pressure,
jac=jacobian,
callback=callback,
)
pressure = np.asarray(pressure, dtype=float) # ensure consistent return type
print("Pressure solution computed successfully.")
return pressure
64 changes: 64 additions & 0 deletions VascularFlow/Network/LatinHypercubeSampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import numpy as np
from scipy.stats import qmc

d = 5
n_samples = 10

x_channel_right_min, x_channel_right_max = 0.0 , 1.0
x_channel_left_min, x_channel_left_max = 2.0 , 3.0
dp_min, dp_max = 0.0 , 100.0
channel_height_min, channel_height_max = 1.0 , 5.0
p_external_min, p_external_max = 0.0 , 10

lower_bounds = np.array([
x_channel_right_min,
x_channel_left_min,
dp_min,
channel_height_min,
p_external_min,
])

upper_bounds = np.array([
x_channel_right_max,
x_channel_left_max,
dp_max,
channel_height_max,
p_external_max,
])

# LHS sampler generation
sampler = qmc.LatinHypercube(d=d)

sample_unit = sampler.random(n=n_samples)

X_design = qmc.scale(sample_unit, lower_bounds, upper_bounds)

param_names = ["idx", "x_right", "x_left", "dp", "channel_height", "p_external"]
widths = [4, 12, 12, 14, 17, 14]

# Header
header = "".join(f"{name:<{w}}" for name, w in zip(param_names, widths))
print(header)

# Rows
for i, row in enumerate(X_design):
print(f"{i:<{widths[0]}}"
f"{row[0]:<{widths[1]}.6f}"
f"{row[1]:<{widths[2]}.6f}"
f"{row[2]:<{widths[3]}.6f}"
f"{row[3]:<{widths[4]}.6f}"
f"{row[4]:<{widths[5]}.6f}")


n_samples = X_design.shape[0]
Q_values = np.array([0.01265, 0.944, 2.0998, 3.3346, 10.8706])

for i in range(n_samples):
x_right, x_left, dp, channel_height, p_external = X_design[i,:]

########################################################################################################################





Loading
Loading