Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 118 additions & 72 deletions cereeberus/cereeberus/compute/computereeb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from .unionfind import UnionFind
from ..reeb.lowerstar import LowerStar
from itertools import groupby as _groupby

import numpy as np

from ..reeb.lowerstar import LowerStar
from .unionfind import UnionFind


def is_face(sigma, tau):
"""
Expand Down Expand Up @@ -72,120 +75,163 @@ def computeReeb(K: LowerStar, verbose=False):

funcVals = [(i, K.filtration([i])) for i in K.iter_vertices()]
funcVals.sort(key=lambda x: x[1]) # Sort by filtration value
# Group vertices that share the same filtration value into batches.
# A horizontal edge (both endpoints at the same height) must be processed
# within one batch so it properly merges its endpoints into a single Reeb node.
grouped = [
(filt, list(grp))
for filt, grp in _groupby(funcVals, key=lambda x: x[1])
]

R = ReebGraph()

currentLevelSet = []
components = {}
half_edge_index = 0

# This will keep track of the components represented by every vertex in the graph so far.
# It will be vertName: connected_component (given as a list of lists) represented by that vertex
vert_to_component = {}

edges_at_prev_level = []

for i, (vert, filt) in enumerate(funcVals):
if verbose:
print(f"\n---\n Processing {vert} at func val {filt:.2f}")
def _dedup(lst):
seen = set()
out = []
for s in lst:
key = tuple(sorted(s))
if key not in seen:
seen.add(key)
out.append(s)
return out

for group_idx, (filt, group_verts) in enumerate(grouped):
now_min = filt
now_max = funcVals[i + 1][1] if i + 1 < len(funcVals) else np.inf
star = K.get_star([vert])
lower_star = [s[0] for s in star if s[1] <= filt and len(s[0]) > 1]
upper_star = [s[0] for s in star if s[1] > filt and len(s[0]) > 1]
now_max = (
grouped[group_idx + 1][0] if group_idx + 1 < len(grouped) else np.inf
)
vert_names = [v for v, _ in group_verts]

if verbose:
print(f" Lower star simplices: {lower_star}")
print(f" Upper star simplices: {upper_star}")
print(f"\n---\n Processing group at func val {filt:.2f}: {vert_names}")

# Classify all simplex stars for this batch into three groups:
#
# lower_nonhoriz: s_filt == filt AND at least one vertex strictly below filt.
# These were added to currentLevelSet by an earlier vertex; remove them now.
#
# horizontal: s_filt == filt AND ALL vertices are at filt.
# These connect same-height vertices in the same batch.
# Add them temporarily so they link the endpoints into one component.
#
# upper: s_filt > filt.
# Add persistently; they carry the level set upward to the next critical point.
#
# Note: because filt(simplex) = max(vertex filtrations) >= filt for any simplex
# in the star of a vertex at height filt, s_filt < filt is impossible here.
all_lower_nonhoriz = []
all_upper = []
all_horizontal = []

for vert in vert_names:
for s in K.get_star([vert]):
simplex, s_filt = s[0], s[1]
if len(simplex) <= 1:
continue
if s_filt > filt:
all_upper.append(simplex)
elif all(K.filtration([u]) == filt for u in simplex):
all_horizontal.append(simplex)
else:
all_lower_nonhoriz.append(simplex)

all_lower_nonhoriz = _dedup(all_lower_nonhoriz)
all_upper = _dedup(all_upper)
all_horizontal = _dedup(all_horizontal)

# ----
# Update the levelset list
# ----
if verbose:
print(f" Lower (non-horiz) simplices: {all_lower_nonhoriz}")
print(f" Horizontal simplices: {all_horizontal}")
print(f" Upper simplices: {all_upper}")

for s in lower_star:
# Remove from current level set
# Step 1: Remove the lower non-horizontal simplices from the active level set.
for s in all_lower_nonhoriz:
if s in currentLevelSet:
currentLevelSet.remove(s)

currentLevelSet.append([vert]) # Add the vertex itself to the level set
components_at_vertex = get_levelset_components(currentLevelSet)
# Step 2: Add this batch's vertices and its horizontal simplices to the level set.
for vert in vert_names:
currentLevelSet.append([vert])
for s in all_horizontal:
if s not in currentLevelSet:
currentLevelSet.append(s)

if verbose:
print(f" Current level set simplices: {currentLevelSet}")
print(f" Level set components at vertex {vert} (func val {filt:.2f}):")
for comp in components_at_vertex.values():
print(f" Component: {comp}")
print(f" Current level set: {currentLevelSet}")

# Step 3: Compute connected components; create one Reeb node per component.
components_at_level = get_levelset_components(currentLevelSet)

if verbose:
print(f" Level set components:")
for comp in components_at_level.values():
print(f" {comp}")

verts_at_level = []
for rep, comp in components_at_vertex.items():
# Add a vertex for each component in this levelset
for rep, comp in components_at_level.items():
nextNodeName = R.get_next_vert_name()
R.add_node(nextNodeName, now_min)
vert_to_component[nextNodeName] = (
comp # Store the component represented by this vertex
)
vert_to_component[nextNodeName] = comp
verts_at_level.append(nextNodeName)

# Check if any simplex in vertex component is a subset of any of simplices in a previous edge's component
# Connect incoming half-edge sentinels whose component contains a face
# of a simplex in this component.
for e in edges_at_prev_level:
prev_comp = vert_to_component[e]
if any(
[
is_face(prev_simp, simp)
for simp in comp
for prev_simp in prev_comp
]
is_face(prev_simp, simp)
for simp in comp
for prev_simp in prev_comp
):
R.add_edge(e, nextNodeName)

# ----
# Add the edge vertices for after the vertex is passed
# ----

# Remove the vertex from the level set
if [vert] in currentLevelSet:
currentLevelSet.remove([vert])
# Step 4: Remove vertices and horizontal simplices – they live only at this exact height.
for vert in vert_names:
if [vert] in currentLevelSet:
currentLevelSet.remove([vert])
for s in all_horizontal:
if s in currentLevelSet:
currentLevelSet.remove(s)

# Add the upper star to the current level set
for s in upper_star:
# Step 5: Add upper-star simplices to carry the level set forward.
for s in all_upper:
if s not in currentLevelSet:
currentLevelSet.append(s)

components = get_levelset_components(currentLevelSet)
if verbose:
print(f"\n Updated current level set simplices: {currentLevelSet}")
print(f" Level set components after vertex {vert} (func val {filt:.2f}):")
for comp in components.values():
print(f" Component: {comp}")
# ----
# Set up a vertex in the Reeb graph for each connected component
# These will represent edges
# These are at height (now_min + now_max)/2
# ----
print(f"\n Updated level set: {currentLevelSet}")

# Step 6: Compute components above this level; create half-edge sentinel nodes
# at height (now_min + now_max) / 2 and connect them downward to the Reeb nodes
# created in Step 3.
components_above = get_levelset_components(currentLevelSet)

if verbose:
print(f" Level set components above:")
for comp in components_above.values():
print(f" {comp}")

edges_at_prev_level = []
for comp in components.values():
# Create a new vertex in the Reeb graph
for comp in components_above.values():
e_name = "e_" + str(half_edge_index)
R.add_node(e_name, (now_min + now_max) / 2)
vert_to_component[e_name] = (
comp # Store the component represented by this half edge top
)
vert_to_component[e_name] = comp
half_edge_index += 1
edges_at_prev_level.append(e_name)

# Now connect to the vertices at this level
# Connect downward to Reeb nodes at this level.
for v in verts_at_level:

# Get the component represented by vertex v
prev_comp = vert_to_component[v]

if any(
[
is_face(simp, prev_simp)
for simp in comp
for prev_simp in prev_comp
]
is_face(simp, prev_simp)
for simp in comp
for prev_simp in prev_comp
):
R.add_edge(v, e_name)

return R

86 changes: 74 additions & 12 deletions doc_source/notebooks/compute_reeb.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "cereeberus"
version = "0.1.11"
version = "0.1.12"
authors = [
{ name="Liz Munch", email="muncheli@msu.edu" },
]
Expand Down
32 changes: 32 additions & 0 deletions tests/test_lowerstar_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,38 @@ def test_computeReeb(self):
self.assertGreater(len(R.nodes), 0)
self.assertGreater(len(R.edges), 0)

def test_computeReeb_horizontal_edge(self):
# Regression test: when two vertices share the same filtration value and
# are connected by an edge (a "horizontal" edge), the level-set connected
# component is that entire edge, so the Reeb graph must collapse them to
# a single node. Previously this raised ValueError.
#
# Complex: triangle with vertices 0,1,2 and edges [0,1],[0,2],[1,2].
# Filtration (x-coordinate): f(0)=0, f(1)=1, f(2)=0.
# Edge [0,2] is horizontal. The Reeb graph should have exactly 1 node at
# height 0 (not 2 separate nodes) and 2 edges leading up to vertex 1.
K = LowerStar()
K.insert([0, 1])
K.insert([0, 2])
K.insert([1, 2])

K.assign_filtration(0, 0.0)
K.assign_filtration(1, 1.0)
K.assign_filtration(2, 0.0)

R = computeReeb(K)

self.assertIsInstance(R, ReebGraph)

# Exactly one node at height 0 (the collapsed horizontal component)
nodes_at_zero = [v for v in R.nodes if R.f[v] == 0.0]
self.assertEqual(len(nodes_at_zero), 1,
"Horizontal edge should collapse to a single Reeb node")

# The graph should be connected
import networkx as nx
self.assertTrue(nx.is_weakly_connected(R))

def test_torus_example_class(self):
# Test the torus example from the documentation.
T = Torus()
Expand Down
Loading