diff --git a/pm4py/__init__.py b/pm4py/__init__.py index 4852e1207..58aa2bca8 100644 --- a/pm4py/__init__.py +++ b/pm4py/__init__.py @@ -146,6 +146,7 @@ discover_eventually_follows_graph, discover_directly_follows_graph, discover_bpmn_inductive, + discover_bpmn_split_miner, discover_performance_dfg, discover_transition_system, discover_prefix_tree, diff --git a/pm4py/algo/discovery/__init__.py b/pm4py/algo/discovery/__init__.py index 4b1864954..65e764495 100644 --- a/pm4py/algo/discovery/__init__.py +++ b/pm4py/algo/discovery/__init__.py @@ -39,6 +39,7 @@ ocel, performance_spectrum, powl, + split_miner, temporal_profile, - transition_system + transition_system, ) diff --git a/pm4py/algo/discovery/split_miner/__init__.py b/pm4py/algo/discovery/split_miner/__init__.py new file mode 100644 index 000000000..b5ea09952 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/__init__.py @@ -0,0 +1,36 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner import ( + algorithm, + bpmn_export, + bpmn_init, + sese, + concurrency, + dfg_discovery, + dtypes, + filtering, + heuristics, + joins, + or_min, + splits, + variants, +) diff --git a/pm4py/algo/discovery/split_miner/algorithm.py b/pm4py/algo/discovery/split_miner/algorithm.py new file mode 100644 index 000000000..f90b370ac --- /dev/null +++ b/pm4py/algo/discovery/split_miner/algorithm.py @@ -0,0 +1,77 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Top-level dispatcher for Split Miner. + +Two variants are exposed: + +* :data:`CLASSIC` — the classic Split Miner pipeline. +* :data:`SM2` — Split Miner 2.0, with a lifecycle-aware refined DFG, + a lifecycle-overlap concurrency oracle, and two heuristics for + improper-completion repair and OR-split identification. + +Both variants return a :class:`pm4py.objects.bpmn.obj.BPMN`. +""" +from enum import Enum +from typing import Any, Dict, Optional, Tuple, Union + +import pandas as pd + +from pm4py.algo.discovery.split_miner.variants import classic, sm2 +from pm4py.objects.bpmn.obj import BPMN +from pm4py.objects.log.obj import EventLog, EventStream +from pm4py.util import exec_utils + + +class Variants(Enum): + CLASSIC = classic + SM2 = sm2 + + +CLASSIC = Variants.CLASSIC +SM2 = Variants.SM2 +DEFAULT_VARIANT = CLASSIC + +VERSIONS = {CLASSIC, SM2} + + +def apply( + log: Union[ + EventLog, EventStream, pd.DataFrame, Dict[Tuple[str, str], int] + ], + parameters: Optional[Dict[Any, Any]] = None, + variant: Variants = DEFAULT_VARIANT, +) -> BPMN: + """Discover a BPMN model from a log using Split Miner. + + Parameters + ---------- + log + Event log (``EventLog`` / ``EventStream`` / ``pandas.DataFrame``) + or a precomputed DFG (only accepted by the classic variant). + parameters + Variant-specific parameters; see ``classic.Parameters`` and + ``sm2.Parameters`` for the supported keys (``EPSILON``, ``ETA``, + ``OR_MINIMISE``, ``ACTIVITY_KEY``, …). + variant + Either :data:`CLASSIC` (default) or :data:`SM2`. + """ + return exec_utils.get_variant(variant).apply(log, parameters=parameters) diff --git a/pm4py/algo/discovery/split_miner/bpmn_export/__init__.py b/pm4py/algo/discovery/split_miner/bpmn_export/__init__.py new file mode 100644 index 000000000..2768578a9 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/bpmn_export/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.bpmn_export import abc, classic diff --git a/pm4py/algo/discovery/split_miner/bpmn_export/abc.py b/pm4py/algo/discovery/split_miner/bpmn_export/abc.py new file mode 100644 index 000000000..bf10dbf18 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/bpmn_export/abc.py @@ -0,0 +1,40 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for the BPMN-export phase.""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.objects.bpmn.obj import BPMN + + +class BPMNExporter(ABC): + """Convert the internal :class:`WorkingGraph` into a pm4py BPMN object.""" + + @classmethod + @abstractmethod + def apply( + cls, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> BPMN: + ... diff --git a/pm4py/algo/discovery/split_miner/bpmn_export/classic.py b/pm4py/algo/discovery/split_miner/bpmn_export/classic.py new file mode 100644 index 000000000..869caf5e6 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/bpmn_export/classic.py @@ -0,0 +1,118 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Convert :class:`WorkingGraph` into a pm4py :class:`BPMN`. + +Self-loops detected during the loops phase are reattached here by +wrapping the looped task with an XOR-join (predecessor side) and an +XOR-split (successor side) that connects back to the join. +""" +from typing import Any, Dict, Optional + +from pm4py.algo.discovery.split_miner.bpmn_export.abc import BPMNExporter +from pm4py.algo.discovery.split_miner.dtypes.log import END_LABEL, START_LABEL +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.objects.bpmn.obj import BPMN + + +def _make_node(kind: str, label: str, node_id: str) -> BPMN.BPMNNode: + if kind == "start": + return BPMN.StartEvent(id=node_id, name="") + if kind == "end": + return BPMN.EndEvent(id=node_id, name="") + if kind == "task": + return BPMN.Task(id=node_id, name=label) + if kind == "xor": + return BPMN.ExclusiveGateway(id=node_id, name="") + if kind == "and": + return BPMN.ParallelGateway(id=node_id, name="") + if kind == "or": + return BPMN.InclusiveGateway(id=node_id, name="") + raise ValueError(f"Unknown node kind: {kind}") + + +class ClassicBPMNExporter(BPMNExporter): + """Materialise the pm4py :class:`BPMN` from the working graph.""" + + @classmethod + def apply( + cls, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> BPMN: + bpmn = BPMN() + node_map: Dict[str, BPMN.BPMNNode] = {} + for nid, n in wg.nodes.items(): + bnode = _make_node(n.kind, n.label, nid) + bpmn.add_node(bnode) + node_map[nid] = bnode + + for src, tgt in wg.edges(): + bpmn.add_flow( + BPMN.SequenceFlow(node_map[src], node_map[tgt]) + ) + + # Sort to keep self-loop attachment order independent of + # hash randomization; semantically the model is the same, but + # node/flow ids and rendering order are then reproducible. + for task_id in sorted(wg.self_loops, reverse=True): + if task_id not in node_map: + continue + if task_id in {START_LABEL, END_LABEL}: + continue + cls._attach_self_loop(bpmn, node_map, task_id) + return bpmn + + # ------------------------------------------------------------------ + # helpers + # ------------------------------------------------------------------ + + @staticmethod + def _attach_self_loop( + bpmn: BPMN, + node_map: Dict[str, BPMN.BPMNNode], + task_id: str, + ) -> None: + task_node = node_map[task_id] + in_flows = [ + f for f in bpmn.get_flows() if f.get_target() is task_node + ] + out_flows = [ + f for f in bpmn.get_flows() if f.get_source() is task_node + ] + + loop_join = BPMN.ExclusiveGateway(id=f"{task_id}__loop_join", name="") + loop_split = BPMN.ExclusiveGateway(id=f"{task_id}__loop_split", name="") + bpmn.add_node(loop_join) + bpmn.add_node(loop_split) + + for f in in_flows: + src = f.get_source() + bpmn.remove_flow(f) + bpmn.add_flow(BPMN.SequenceFlow(src, loop_join)) + for f in out_flows: + tgt = f.get_target() + bpmn.remove_flow(f) + bpmn.add_flow(BPMN.SequenceFlow(loop_split, tgt)) + + bpmn.add_flow(BPMN.SequenceFlow(loop_join, task_node)) + bpmn.add_flow(BPMN.SequenceFlow(task_node, loop_split)) + bpmn.add_flow(BPMN.SequenceFlow(loop_split, loop_join)) diff --git a/pm4py/algo/discovery/split_miner/bpmn_init/__init__.py b/pm4py/algo/discovery/split_miner/bpmn_init/__init__.py new file mode 100644 index 000000000..ced04efd2 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/bpmn_init/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.bpmn_init import abc, classic diff --git a/pm4py/algo/discovery/split_miner/bpmn_init/abc.py b/pm4py/algo/discovery/split_miner/bpmn_init/abc.py new file mode 100644 index 000000000..bd327881f --- /dev/null +++ b/pm4py/algo/discovery/split_miner/bpmn_init/abc.py @@ -0,0 +1,46 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for the BPMN-initialisation phase.""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from pm4py.algo.discovery.split_miner.dtypes.concurrency import ( + ConcurrencyResult, +) +from pm4py.algo.discovery.split_miner.dtypes.filtering import FilterResult +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph + + +class BPMNInitializer(ABC): + """Materialise a :class:`WorkingGraph` from the filtered PDFG.""" + + @classmethod + @abstractmethod + def apply( + cls, + filtered: FilterResult, + concurrency: ConcurrencyResult, + loops: LoopInfo, + parameters: Optional[Dict[str, Any]] = None, + ) -> WorkingGraph: + """Return a fresh working graph ready for the splits phase.""" diff --git a/pm4py/algo/discovery/split_miner/bpmn_init/classic.py b/pm4py/algo/discovery/split_miner/bpmn_init/classic.py new file mode 100644 index 000000000..b8dfe1b73 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/bpmn_init/classic.py @@ -0,0 +1,82 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Initial BPMN construction from a filtered PDFG. + +Sentinel start / end labels in the filtered DFG become the BPMN start +and end events; every other node becomes a task. Concurrency and self- +loop metadata is attached to the working graph for the later phases. +""" +from typing import Any, Dict, Optional, Set + +from pm4py.algo.discovery.split_miner.bpmn_init.abc import BPMNInitializer +from pm4py.algo.discovery.split_miner.dtypes.concurrency import ( + ConcurrencyResult, +) +from pm4py.algo.discovery.split_miner.dtypes.filtering import FilterResult +from pm4py.algo.discovery.split_miner.dtypes.log import END_LABEL, START_LABEL +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph + + +class ClassicBPMNInitializer(BPMNInitializer): + """Build a fresh working graph from a filtered PDFG and metadata.""" + + @classmethod + def apply( + cls, + filtered: FilterResult, + concurrency: ConcurrencyResult, + loops: LoopInfo, + parameters: Optional[Dict[str, Any]] = None, + ) -> WorkingGraph: + wg = WorkingGraph() + + # The filtered edge set is a Python ``set``; iterating it directly + # would expose ``PYTHONHASHSEED``-dependent ordering and make the + # whole pipeline non-deterministic across processes. Sort once + # here so every downstream phase sees a stable order of edges + # and node-insertion. + sorted_edges = sorted(filtered.edges, reverse=True) + + nodes: Set[str] = set() + for a, b in sorted_edges: + nodes.add(a) + nodes.add(b) + nodes.add(filtered.source) + nodes.add(filtered.sink) + + for label in sorted(nodes, reverse=True): + if label == START_LABEL: + wg.add_node("start", label="start", node_id=label) + wg.start_id = label + elif label == END_LABEL: + wg.add_node("end", label="end", node_id=label) + wg.end_id = label + else: + wg.add_node("task", label=label, node_id=label) + + for a, b in sorted_edges: + wg.add_edge(a, b) + + wg.concurrency = set(concurrency.concurrent_pairs) + wg.self_loops = set(loops.self_loops) + return wg diff --git a/pm4py/algo/discovery/split_miner/concurrency/__init__.py b/pm4py/algo/discovery/split_miner/concurrency/__init__.py new file mode 100644 index 000000000..d93920032 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/concurrency/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.concurrency import abc, classic, refined diff --git a/pm4py/algo/discovery/split_miner/concurrency/abc.py b/pm4py/algo/discovery/split_miner/concurrency/abc.py new file mode 100644 index 000000000..e8f61c2d6 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/concurrency/abc.py @@ -0,0 +1,53 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for the concurrency-discovery phase. + +A :class:`ConcurrencyOracle` takes a DFG (and, optionally, the underlying +trace list) and returns both the set of unordered concurrent pairs and +the *pruned* DFG with the concurrent arcs removed. +""" +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, TypeVar + +from pm4py.algo.discovery.split_miner.dtypes.concurrency import ( + ConcurrencyResult, +) +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo + + +TraceT = TypeVar("TraceT") + + +class ConcurrencyOracle(ABC): + """Detect concurrent activity pairs and prune the DFG accordingly.""" + + @classmethod + @abstractmethod + def apply( + cls, + dfg: DFG, + traces: Optional[List[TraceT]], + loops: LoopInfo, + parameters: Optional[Dict[str, Any]] = None, + ) -> ConcurrencyResult: + """Return the pruned DFG together with the concurrency relation.""" diff --git a/pm4py/algo/discovery/split_miner/concurrency/classic.py b/pm4py/algo/discovery/split_miner/concurrency/classic.py new file mode 100644 index 000000000..512510e19 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/concurrency/classic.py @@ -0,0 +1,106 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Classic Split Miner concurrency oracle. + +Two activities are flagged as concurrent when they appear as ``a -> b`` +and ``b -> a`` in the DFG with roughly balanced frequencies, are not a +short-loop pair, and neither is a self-loop. Imbalanced bidirectional +pairs keep only the more frequent direction. +""" +from enum import Enum +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple + +from pm4py.algo.discovery.split_miner.concurrency.abc import ConcurrencyOracle +from pm4py.algo.discovery.split_miner.dtypes.concurrency import ( + ConcurrencyResult, +) +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo +from pm4py.util import exec_utils + + +class Parameters(Enum): + EPSILON = "split_miner_epsilon" + + +DEFAULT_EPSILON = 0.1 + + +class ClassicConcurrencyOracle(ConcurrencyOracle): + """Three-condition test on directly-follows frequencies. + + The imbalance condition uses ``<= eps`` rather than ``< eps`` to + mirror the Java reference implementation; with strict ``<`` the + boundary case at exactly ``eps`` is missed. + """ + + @classmethod + def apply( + cls, + dfg: DFG, + traces: Optional[List[Any]], # unused (kept to share signature) + loops: LoopInfo, + parameters: Optional[Dict[str, Any]] = None, + ) -> ConcurrencyResult: + eps = exec_utils.get_param_value( + Parameters.EPSILON, parameters or {}, DEFAULT_EPSILON + ) + + concurrent: Set[FrozenSet[str]] = set() + drop_infrequent: Set[Tuple[str, str]] = set() + seen: Set[FrozenSet[str]] = set() + + for (a, b), f_ab in list(dfg.items()): + if a == b: + continue + pair = frozenset((a, b)) + if pair in seen: + continue + seen.add(pair) + + f_ba = dfg.get((b, a), 0) + if f_ab <= 0 or f_ba <= 0: + continue + if pair in loops.short_loops: + continue + + denom = f_ab + f_ba + if denom == 0: + continue + imbalance = abs(f_ab - f_ba) / denom + + if imbalance <= eps: + concurrent.add(pair) + else: + if f_ab < f_ba: + drop_infrequent.add((a, b)) + else: + drop_infrequent.add((b, a)) + + pdfg: DFG = {} + for (a, b), f in dfg.items(): + if frozenset((a, b)) in concurrent: + continue + if (a, b) in drop_infrequent: + continue + pdfg[(a, b)] = f + return ConcurrencyResult(pdfg=pdfg, concurrent_pairs=concurrent) diff --git a/pm4py/algo/discovery/split_miner/concurrency/refined.py b/pm4py/algo/discovery/split_miner/concurrency/refined.py new file mode 100644 index 000000000..4056f9b10 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/concurrency/refined.py @@ -0,0 +1,124 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Lifecycle-overlap concurrency oracle. + +Two activities are flagged as concurrent when, over the whole log, the +fraction of observed lifecycle overlaps relative to their combined +number of complete executions is at least ``eps``: + + a || b iff 2 * |a >< b| / (|a| + |b|) >= eps + +where ``|a >< b|`` is the number of times an execution of ``a`` overlaps +in wall-clock time with an execution of ``b``. +""" +from collections import defaultdict +from enum import Enum +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple + +from pm4py.algo.discovery.split_miner.concurrency.abc import ConcurrencyOracle +from pm4py.algo.discovery.split_miner.dtypes.concurrency import ( + ConcurrencyResult, +) +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.log import RefinedTrace +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo +from pm4py.util import exec_utils + + +class Parameters(Enum): + EPSILON = "split_miner_epsilon" + + +DEFAULT_EPSILON = 0.1 + + +def _build_intervals( + trace: RefinedTrace, +) -> List[Tuple[str, int, int]]: + intervals: List[Tuple[str, int, int]] = [] + open_starts: Dict[str, List[int]] = defaultdict(list) + for idx, (label, lc, _) in enumerate(trace): + if lc == "start": + open_starts[label].append(idx) + else: + if open_starts[label]: + s = open_starts[label].pop(0) + intervals.append((label, s, idx)) + else: + intervals.append((label, idx, idx)) + return intervals + + +class RefinedConcurrencyOracle(ConcurrencyOracle): + """Concurrency test based on lifecycle overlaps.""" + + @classmethod + def apply( + cls, + dfg: DFG, + traces: Optional[List[RefinedTrace]], + loops: LoopInfo, + parameters: Optional[Dict[str, Any]] = None, + ) -> ConcurrencyResult: + if traces is None: + raise ValueError( + "RefinedConcurrencyOracle requires the refined log" + ) + eps = exec_utils.get_param_value( + Parameters.EPSILON, parameters or {}, DEFAULT_EPSILON + ) + + counts: Dict[str, int] = defaultdict(int) + overlaps: Dict[FrozenSet[str], int] = defaultdict(int) + for trace in traces: + intervals = _build_intervals(trace) + for label, _, _ in intervals: + counts[label] += 1 + for i, (l1, s1, e1) in enumerate(intervals): + for l2, s2, e2 in intervals[i + 1:]: + if l1 == l2: + continue + if s1 < e2 and s2 < e1: + overlaps[frozenset((l1, l2))] += 1 + + concurrent: Set[FrozenSet[str]] = set() + for pair, ov in overlaps.items(): + if ov == 0: + continue + if pair in loops.short_loops: + continue + a, b = tuple(pair) + if a in loops.self_loops or b in loops.self_loops: + continue + total = counts.get(a, 0) + counts.get(b, 0) + if total == 0: + continue + score = 2.0 * ov / total + if score >= eps: + concurrent.add(pair) + + pdfg: DFG = {} + for (a, b), f in dfg.items(): + if frozenset((a, b)) in concurrent: + continue + pdfg[(a, b)] = f + return ConcurrencyResult(pdfg=pdfg, concurrent_pairs=concurrent) diff --git a/pm4py/algo/discovery/split_miner/dfg_discovery/__init__.py b/pm4py/algo/discovery/split_miner/dfg_discovery/__init__.py new file mode 100644 index 000000000..ab1b79907 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dfg_discovery/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.dfg_discovery import abc, classic, refined diff --git a/pm4py/algo/discovery/split_miner/dfg_discovery/abc.py b/pm4py/algo/discovery/split_miner/dfg_discovery/abc.py new file mode 100644 index 000000000..97d6b4de5 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dfg_discovery/abc.py @@ -0,0 +1,43 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for the DFG-discovery phase.""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar + +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo + + +TraceT = TypeVar("TraceT") + + +class DFGDiscoverer(ABC, Generic[TraceT]): + """Build a DFG and the corresponding ``LoopInfo`` from a list of traces.""" + + @classmethod + @abstractmethod + def apply( + cls, + traces: List[TraceT], + parameters: Optional[Dict[str, Any]] = None, + ) -> Tuple[DFG, LoopInfo]: + """Return the directly-follows graph and its self/short-loop summary.""" diff --git a/pm4py/algo/discovery/split_miner/dfg_discovery/classic.py b/pm4py/algo/discovery/split_miner/dfg_discovery/classic.py new file mode 100644 index 000000000..91de037ef --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dfg_discovery/classic.py @@ -0,0 +1,100 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Classic directly-follows graph + loop discovery. + +Builds the DFG from a list of activity-label traces and detects: + + * self-loops — activities ``a`` with ``|a -> a| > 0``; + * short-loops — pairs ``{a, b}`` for which an ``a, b, a`` sub-sequence + appears in some trace while neither ``a`` nor ``b`` is a self-loop. +""" +from collections import defaultdict +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple + +from pm4py.algo.discovery.split_miner.dfg_discovery.abc import DFGDiscoverer +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.log import LabelTrace +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo + + +def _build_dfg(traces: List[LabelTrace]) -> Tuple[DFG, Set[str]]: + dfg: Dict[Tuple[str, str], int] = defaultdict(int) + labels: Set[str] = set() + for trace in traces: + for label in trace: + labels.add(label) + for a, b in zip(trace, trace[1:]): + dfg[(a, b)] += 1 + return dict(dfg), labels + + +def short_loop_frequencies( + traces: List[LabelTrace], +) -> Dict[Tuple[str, str], int]: + """Number of (a, b, a) sub-sequences over the supplied traces.""" + freq: Dict[Tuple[str, str], int] = defaultdict(int) + for trace in traces: + for i in range(len(trace) - 2): + a, b, c = trace[i], trace[i + 1], trace[i + 2] + if a == c and a != b: + freq[(a, b)] += 1 + return dict(freq) + + +def _discover_loops(dfg: DFG, traces: List[LabelTrace]) -> LoopInfo: + self_loops: Set[str] = { + a for (a, b), f in dfg.items() if a == b and f > 0 + } + short_freq = short_loop_frequencies(traces) + short_loops: Set[FrozenSet[str]] = set() + for (a, b), f in short_freq.items(): + if f == 0: + continue + if a in self_loops or b in self_loops: + continue + if short_freq.get((a, b), 0) + short_freq.get((b, a), 0) == 0: + continue + short_loops.add(frozenset((a, b))) + return LoopInfo( + self_loops=self_loops, + short_loops=short_loops, + short_loop_freq=short_freq, + ) + + +class ClassicDFGDiscoverer(DFGDiscoverer[LabelTrace]): + """Directly-follows graph built from flat label traces.""" + + @classmethod + def apply( + cls, + traces: List[LabelTrace], + parameters: Optional[Dict[str, Any]] = None, + ) -> Tuple[DFG, LoopInfo]: + dfg, _ = _build_dfg(traces) + loops = _discover_loops(dfg, traces) + return dfg, loops + + +def strip_self_loops(dfg: DFG) -> DFG: + """Drop ``a -> a`` arcs; they are re-attached during BPMN export.""" + return {(a, b): f for (a, b), f in dfg.items() if a != b} diff --git a/pm4py/algo/discovery/split_miner/dfg_discovery/refined.py b/pm4py/algo/discovery/split_miner/dfg_discovery/refined.py new file mode 100644 index 000000000..919223021 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dfg_discovery/refined.py @@ -0,0 +1,100 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Lifecycle-aware directly-follows graph + loop discovery. + +The refined DFG records ``a -> b`` whenever a lifecycle ``end`` of ``a`` +is followed by a lifecycle ``start`` of ``b`` in the same trace with no +other ``end`` event observed in between. Short-loop detection then runs +on the end-event projection of the refined trace, which mirrors the +classic short-loop semantics over completed activity executions. +""" +from collections import defaultdict +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple + +from pm4py.algo.discovery.split_miner.dfg_discovery.abc import DFGDiscoverer +from pm4py.algo.discovery.split_miner.dfg_discovery.classic import ( + short_loop_frequencies, +) +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.log import RefinedTrace +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo + + +def _build_refined_dfg( + refined_traces: List[RefinedTrace], +) -> Tuple[DFG, Set[str]]: + """Build the refined DFG: ``a -> b`` iff ``a_end`` is followed by + ``b_start`` in the same trace with no intervening ``end`` event.""" + dfg: Dict[Tuple[str, str], int] = defaultdict(int) + labels: Set[str] = set() + for trace in refined_traces: + for i, (a, lc_a, _) in enumerate(trace): + labels.add(a) + if lc_a != "end": + continue + for j in range(i + 1, len(trace)): + b, lc_b, _ = trace[j] + if lc_b == "end": + break + if lc_b == "start": + dfg[(a, b)] += 1 + return dict(dfg), labels + + +def _discover_loops_refined( + dfg: DFG, refined_traces: List[RefinedTrace] +) -> LoopInfo: + """Short-loop detection on the end-event projection of the refined log.""" + self_loops = {a for (a, b), f in dfg.items() if a == b and f > 0} + end_projection = [ + [lbl for lbl, lc, _ in trace if lc == "end"] + for trace in refined_traces + ] + short_freq = short_loop_frequencies(end_projection) + short_loops: Set[FrozenSet[str]] = set() + for (a, b), f in short_freq.items(): + if f == 0: + continue + if a in self_loops or b in self_loops: + continue + if short_freq.get((a, b), 0) + short_freq.get((b, a), 0) == 0: + continue + short_loops.add(frozenset((a, b))) + return LoopInfo( + self_loops=self_loops, + short_loops=short_loops, + short_loop_freq=short_freq, + ) + + +class RefinedDFGDiscoverer(DFGDiscoverer[RefinedTrace]): + """Lifecycle-aware refined directly-follows graph.""" + + @classmethod + def apply( + cls, + traces: List[RefinedTrace], + parameters: Optional[Dict[str, Any]] = None, + ) -> Tuple[DFG, LoopInfo]: + dfg, _ = _build_refined_dfg(traces) + loops = _discover_loops_refined(dfg, traces) + return dfg, loops diff --git a/pm4py/algo/discovery/split_miner/dtypes/__init__.py b/pm4py/algo/discovery/split_miner/dtypes/__init__.py new file mode 100644 index 000000000..1f4ba5253 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dtypes/__init__.py @@ -0,0 +1,29 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.dtypes import ( + working_graph, + dfg, + loops, + concurrency, + filtering, + log, +) diff --git a/pm4py/algo/discovery/split_miner/dtypes/concurrency.py b/pm4py/algo/discovery/split_miner/dtypes/concurrency.py new file mode 100644 index 000000000..b0cc44902 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dtypes/concurrency.py @@ -0,0 +1,32 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Output of the concurrency phase: pruned DFG + concurrency relation.""" +from dataclasses import dataclass, field +from typing import FrozenSet, Set + +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG + + +@dataclass +class ConcurrencyResult: + pdfg: DFG = field(default_factory=dict) + concurrent_pairs: Set[FrozenSet[str]] = field(default_factory=set) diff --git a/pm4py/algo/discovery/split_miner/dtypes/dfg.py b/pm4py/algo/discovery/split_miner/dtypes/dfg.py new file mode 100644 index 000000000..7d5fc661a --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dtypes/dfg.py @@ -0,0 +1,26 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Directly-follows graph type alias used across Split Miner phases.""" +from typing import Dict, Tuple + +#: ``DFG[(a, b)] = number of times b directly follows a``. +DFG = Dict[Tuple[str, str], int] diff --git a/pm4py/algo/discovery/split_miner/dtypes/filtering.py b/pm4py/algo/discovery/split_miner/dtypes/filtering.py new file mode 100644 index 000000000..483eeffe6 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dtypes/filtering.py @@ -0,0 +1,31 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Output of the PDFG filtering phase.""" +from dataclasses import dataclass, field +from typing import Set, Tuple + + +@dataclass +class FilterResult: + edges: Set[Tuple[str, str]] = field(default_factory=set) + source: str = "" + sink: str = "" diff --git a/pm4py/algo/discovery/split_miner/dtypes/log.py b/pm4py/algo/discovery/split_miner/dtypes/log.py new file mode 100644 index 000000000..f398ef99e --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dtypes/log.py @@ -0,0 +1,39 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Trace types used by the Split Miner phases.""" +from typing import Any, List, Tuple + +# A flat label trace consumed by the classic Split Miner pipeline. +LabelTrace = List[str] +LabelLog = List[LabelTrace] + +# A refined event keeps the activity label, the lifecycle phase +# (``start`` or ``end``) and the timestamp. The lifecycle-aware variant +# of the pipeline operates on lists of these. +RefinedEvent = Tuple[str, str, Any] +RefinedTrace = List[RefinedEvent] +RefinedLog = List[RefinedTrace] + +# Sentinel labels added to every trace so the resulting BPMN has a single +# start event and a single end event. +START_LABEL = "__start__" +END_LABEL = "__end__" diff --git a/pm4py/algo/discovery/split_miner/dtypes/loops.py b/pm4py/algo/discovery/split_miner/dtypes/loops.py new file mode 100644 index 000000000..a45fef2cd --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dtypes/loops.py @@ -0,0 +1,33 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Output of the loop-discovery phase.""" +from dataclasses import dataclass, field +from typing import Dict, FrozenSet, Set, Tuple + + +@dataclass +class LoopInfo: + """Self-loops, short-loops, and the underlying frequency map.""" + + self_loops: Set[str] = field(default_factory=set) + short_loops: Set[FrozenSet[str]] = field(default_factory=set) + short_loop_freq: Dict[Tuple[str, str], int] = field(default_factory=dict) diff --git a/pm4py/algo/discovery/split_miner/dtypes/working_graph.py b/pm4py/algo/discovery/split_miner/dtypes/working_graph.py new file mode 100644 index 000000000..e79f3ff57 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/dtypes/working_graph.py @@ -0,0 +1,116 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Mutable adjacency representation used during the Split Miner pipeline. + +The split / join discovery phases need fast structural edits (re-target an +edge, insert a gateway, remove a node) that the immutable pm4py BPMN +object does not support efficiently. We therefore keep an internal +``WorkingGraph`` for the duration of the discovery and only materialise +the final :class:`pm4py.objects.bpmn.obj.BPMN` object at the end (see +:mod:`pm4py.algo.discovery.split_miner.bpmn_export`). +""" +from dataclasses import dataclass, field +from typing import Dict, FrozenSet, List, Literal, Optional, Set, Tuple + +NodeKind = Literal["task", "xor", "and", "or", "start", "end"] + + +@dataclass +class Node: + id: str + kind: NodeKind + label: str = "" + + +@dataclass +class WorkingGraph: + """Adjacency-list representation of a BPMN graph in construction.""" + + nodes: Dict[str, Node] = field(default_factory=dict) + out_edges: Dict[str, List[str]] = field(default_factory=dict) + in_edges: Dict[str, List[str]] = field(default_factory=dict) + + start_id: str = "" + end_id: str = "" + + concurrency: Set[FrozenSet[str]] = field(default_factory=set) + self_loops: Set[str] = field(default_factory=set) + + _id_counter: int = 0 + + # ------------------------------------------------------------------ + # mutation helpers + # ------------------------------------------------------------------ + + def fresh_id(self, prefix: str) -> str: + self._id_counter += 1 + return f"{prefix}_{self._id_counter}" + + def add_node( + self, + kind: NodeKind, + label: str = "", + node_id: Optional[str] = None, + ) -> str: + if node_id is None: + node_id = self.fresh_id(kind) + self.nodes[node_id] = Node(id=node_id, kind=kind, label=label) + self.out_edges.setdefault(node_id, []) + self.in_edges.setdefault(node_id, []) + return node_id + + def add_edge(self, src: str, tgt: str) -> None: + if tgt not in self.out_edges[src]: + self.out_edges[src].append(tgt) + if src not in self.in_edges[tgt]: + self.in_edges[tgt].append(src) + + def remove_edge(self, src: str, tgt: str) -> None: + if tgt in self.out_edges.get(src, []): + self.out_edges[src].remove(tgt) + if src in self.in_edges.get(tgt, []): + self.in_edges[tgt].remove(src) + + def remove_node(self, node_id: str) -> None: + for s in list(self.in_edges.get(node_id, [])): + self.remove_edge(s, node_id) + for t in list(self.out_edges.get(node_id, [])): + self.remove_edge(node_id, t) + self.in_edges.pop(node_id, None) + self.out_edges.pop(node_id, None) + self.nodes.pop(node_id, None) + + # ------------------------------------------------------------------ + # queries + # ------------------------------------------------------------------ + + def successors(self, node_id: str) -> List[str]: + return list(self.out_edges.get(node_id, [])) + + def predecessors(self, node_id: str) -> List[str]: + return list(self.in_edges.get(node_id, [])) + + def edges(self) -> List[Tuple[str, str]]: + return [(s, t) for s, ts in self.out_edges.items() for t in ts] + + def is_concurrent(self, a: str, b: str) -> bool: + return frozenset((a, b)) in self.concurrency diff --git a/pm4py/algo/discovery/split_miner/filtering/__init__.py b/pm4py/algo/discovery/split_miner/filtering/__init__.py new file mode 100644 index 000000000..d77079b93 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/filtering/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.filtering import abc, max_min diff --git a/pm4py/algo/discovery/split_miner/filtering/abc.py b/pm4py/algo/discovery/split_miner/filtering/abc.py new file mode 100644 index 000000000..cbde5389d --- /dev/null +++ b/pm4py/algo/discovery/split_miner/filtering/abc.py @@ -0,0 +1,40 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for the PDFG filtering phase.""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.filtering import FilterResult + + +class Filterer(ABC): + """Reduce a pruned DFG to a sound, low-complexity edge set.""" + + @classmethod + @abstractmethod + def apply( + cls, + pdfg: DFG, + parameters: Optional[Dict[str, Any]] = None, + ) -> FilterResult: + """Return the source/sink and the kept edges.""" diff --git a/pm4py/algo/discovery/split_miner/filtering/max_min.py b/pm4py/algo/discovery/split_miner/filtering/max_min.py new file mode 100644 index 000000000..5fd3f6992 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/filtering/max_min.py @@ -0,0 +1,193 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Max-min-frequency BFS filter for the pruned DFG. + +A Dijkstra-style BFS retains every node on at least one source-to-sink +path while minimising the number of edges kept. The output is the union +of each node's best-incoming and best-outgoing edges plus every edge +with frequency above the eta-percentile threshold. +""" +import math +from collections import deque +from enum import Enum +from typing import Any, Dict, List, Optional, Set, Tuple + +import numpy as np + +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.filtering import FilterResult +from pm4py.algo.discovery.split_miner.dtypes.log import END_LABEL, START_LABEL +from pm4py.algo.discovery.split_miner.filtering.abc import Filterer +from pm4py.util import exec_utils + + +class Parameters(Enum): + ETA = "split_miner_eta" + + +DEFAULT_ETA = 0.4 + + +def _node_set(dfg: DFG) -> Set[str]: + s: Set[str] = set() + for a, b in dfg.keys(): + s.add(a) + s.add(b) + return s + + +def _find_source_sink(dfg: DFG, nodes: Set[str]) -> Tuple[str, str]: + has_in = {b for (_, b) in dfg.keys()} + has_out = {a for (a, _) in dfg.keys()} + sources = [n for n in nodes if n not in has_in] + sinks = [n for n in nodes if n not in has_out] + if len(sources) != 1 or len(sinks) != 1: + if START_LABEL in nodes and END_LABEL in nodes: + return START_LABEL, END_LABEL + raise ValueError( + f"Filtered PDFG must have exactly one source/sink; " + f"got sources={sources}, sinks={sinks}" + ) + return sources[0], sinks[0] + + +def _best_incoming( + dfg: DFG, source: str, nodes: Set[str] +) -> Tuple[Dict[str, float], Dict[str, Tuple[str, str]]]: + capacity: Dict[str, float] = {n: 0 for n in nodes} + capacity[source] = math.inf + best: Dict[str, Tuple[str, str]] = {} + + out_adj: Dict[str, List[Tuple[str, int]]] = {n: [] for n in nodes} + for (a, b), f in dfg.items(): + out_adj[a].append((b, f)) + + in_queue: Set[str] = {source} + unexplored: Set[str] = set(nodes) - {source} + queue = deque([source]) + while queue: + p = queue.popleft() + in_queue.discard(p) + for n, f_e in out_adj[p]: + c_max = min(capacity[p], f_e) + updated = False + if c_max > capacity[n]: + capacity[n] = c_max + best[n] = (p, n) + updated = True + if updated: + if n in unexplored: + unexplored.discard(n) + if n not in in_queue: + queue.append(n) + in_queue.add(n) + elif n in unexplored: + unexplored.discard(n) + if n not in in_queue: + queue.append(n) + in_queue.add(n) + return capacity, best + + +def _best_outgoing( + dfg: DFG, sink: str, nodes: Set[str] +) -> Tuple[Dict[str, float], Dict[str, Tuple[str, str]]]: + capacity: Dict[str, float] = {n: 0 for n in nodes} + capacity[sink] = math.inf + best: Dict[str, Tuple[str, str]] = {} + + in_adj: Dict[str, List[Tuple[str, int]]] = {n: [] for n in nodes} + for (a, b), f in dfg.items(): + in_adj[b].append((a, f)) + + in_queue: Set[str] = {sink} + unexplored: Set[str] = set(nodes) - {sink} + queue = deque([sink]) + while queue: + n = queue.popleft() + in_queue.discard(n) + for p, f_e in in_adj[n]: + c_max = min(capacity[n], f_e) + updated = False + if c_max > capacity[p]: + capacity[p] = c_max + best[p] = (p, n) + updated = True + if updated: + if p in unexplored: + unexplored.discard(p) + if p not in in_queue: + queue.append(p) + in_queue.add(p) + elif p in unexplored: + unexplored.discard(p) + if p not in in_queue: + queue.append(p) + in_queue.add(p) + return capacity, best + + +class MaxMinFilterer(Filterer): + """Dijkstra-style BFS that retains each node's best in/out edges.""" + + @classmethod + def apply( + cls, + pdfg: DFG, + parameters: Optional[Dict[str, Any]] = None, + ) -> FilterResult: + eta = exec_utils.get_param_value( + Parameters.ETA, parameters or {}, DEFAULT_ETA + ) + nodes = _node_set(pdfg) + source, sink = _find_source_sink(pdfg, nodes) + + fmax_in: Dict[str, int] = {n: 0 for n in nodes} + fmax_out: Dict[str, int] = {n: 0 for n in nodes} + for (a, b), f in pdfg.items(): + if f > fmax_out[a]: + fmax_out[a] = f + if f > fmax_in[b]: + fmax_in[b] = f + + frequencies: List[int] = [] + for n in nodes: + if n != source: + frequencies.append(fmax_in[n]) + if n != sink: + frequencies.append(fmax_out[n]) + + f_th = ( + float(np.percentile(frequencies, eta * 100.0)) if frequencies else 0.0 + ) + + _, best_in = _best_incoming(pdfg, source, nodes) + _, best_out = _best_outgoing(pdfg, sink, nodes) + kept_best: Set[Tuple[str, str]] = set(best_in.values()) | set( + best_out.values() + ) + + edges_out: Set[Tuple[str, str]] = set() + for (a, b), f in pdfg.items(): + if (a, b) in kept_best or f > f_th: + edges_out.add((a, b)) + return FilterResult(edges=edges_out, source=source, sink=sink) diff --git a/pm4py/algo/discovery/split_miner/heuristics/__init__.py b/pm4py/algo/discovery/split_miner/heuristics/__init__.py new file mode 100644 index 000000000..d0b6e0107 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/heuristics/__init__.py @@ -0,0 +1,26 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.heuristics import ( + abc, + improper_completion, + or_split, +) diff --git a/pm4py/algo/discovery/split_miner/heuristics/abc.py b/pm4py/algo/discovery/split_miner/heuristics/abc.py new file mode 100644 index 000000000..341a63044 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/heuristics/abc.py @@ -0,0 +1,45 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for working-graph heuristics.""" +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional + +from pm4py.algo.discovery.split_miner.dtypes.log import RefinedTrace +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph + + +class Heuristic(ABC): + """A post-processing pass that mutates the working graph in-place. + + Heuristics may inspect the refined log (lifecycle-aware trace list) + to decide what to change. + """ + + @classmethod + @abstractmethod + def apply( + cls, + wg: WorkingGraph, + refined_traces: Optional[List[RefinedTrace]] = None, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + ... diff --git a/pm4py/algo/discovery/split_miner/heuristics/improper_completion.py b/pm4py/algo/discovery/split_miner/heuristics/improper_completion.py new file mode 100644 index 000000000..a27ba69e5 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/heuristics/improper_completion.py @@ -0,0 +1,138 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Improper-completion heuristic. + +When an AND-split participates in a cycle — i.e. a loop re-enters the +parallel block before it has completed — Split Miner can produce a +model with improper completion. This heuristic repairs that by + + 1. inserting a new XOR-split between the AND-split and its single + parent, and + 2. relocating the loop-closing back-edge so that it now originates + from the new XOR-split instead of from inside the parallel block. + +A gateway left trivial (a single incoming and a single outgoing edge) +by the relocation is spliced out. The net effect matches Fig. 4b of the +paper: the parent activity (``A`` in the running example) can be +repeated through the new XOR-split's loop-back edge without committing +to the parallel block, while the activity that used to close the loop +(``D``) now flows straight on instead of looping. +""" +from typing import Any, Dict, List, Optional, Set, Tuple + +import networkx as nx + +from pm4py.algo.discovery.split_miner.dtypes.log import RefinedTrace +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.algo.discovery.split_miner.heuristics.abc import Heuristic +from pm4py.algo.discovery.split_miner.sese.rpst import analyse + + +def _to_digraph(wg: WorkingGraph) -> nx.DiGraph: + g = nx.DiGraph() + g.add_nodes_from(wg.nodes.keys()) + for s, t in wg.edges(): + g.add_edge(s, t) + return g + + +def _splice_if_trivial(wg: WorkingGraph, node: str) -> None: + """Remove ``node`` if it is a gateway with one incoming and one + outgoing edge, reconnecting its predecessor to its successor.""" + n = wg.nodes.get(node) + if n is None or n.kind not in {"xor", "and", "or"}: + return + ins = wg.predecessors(node) + outs = wg.successors(node) + if len(ins) == 1 and len(outs) == 1: + p, s = ins[0], outs[0] + wg.remove_edge(p, node) + wg.remove_edge(node, s) + if s != p: + wg.add_edge(p, s) + wg.remove_node(node) + + +class ImproperCompletionHeuristic(Heuristic): + """Relocate an AND-split's loop-closing back-edge onto a new + preceding XOR-split.""" + + @classmethod + def apply( + cls, + wg: WorkingGraph, + refined_traces: Optional[List[RefinedTrace]] = None, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + # Iterate over a snapshot of the AND-split ids: new nodes + # created by the heuristic must not be re-processed. + for and_id in [ + nid for nid, n in list(wg.nodes.items()) if n.kind == "and" + ]: + if and_id not in wg.nodes: + continue + if len(wg.successors(and_id)) <= 1: + continue + + graph = _to_digraph(wg) + back_edges = analyse(wg).back_edges + + try: + and_descendants = nx.descendants(graph, and_id) + except nx.NodeNotFound: + continue + + # A loop-closing back-edge of this AND-split is an edge + # (u, v) such that the AND-split can reach u and v can reach + # the AND-split — following it therefore re-enters the + # parallel block. + closing: List[Tuple[str, str]] = [] + for (u, v) in back_edges: + reaches_u = u == and_id or u in and_descendants + if not reaches_u: + continue + v_reaches_and = v == and_id or ( + v in graph and nx.has_path(graph, v, and_id) + ) + if v_reaches_and: + closing.append((u, v)) + if not closing: + continue + + preds = wg.predecessors(and_id) + if len(preds) != 1: + continue + parent = preds[0] + + # Insert the new XOR-split between the parent and the + # AND-split, keeping every parallel branch on the AND-split. + xor_id = wg.add_node("xor", label="xor_lc") + wg.remove_edge(parent, and_id) + wg.add_edge(parent, xor_id) + wg.add_edge(xor_id, and_id) + + # Relocate every loop-closing back-edge so its source is the + # new XOR-split; the parallel block then completes properly. + for (u, v) in closing: + wg.remove_edge(u, v) + wg.add_edge(xor_id, v) + _splice_if_trivial(wg, u) diff --git a/pm4py/algo/discovery/split_miner/heuristics/or_split.py b/pm4py/algo/discovery/split_miner/heuristics/or_split.py new file mode 100644 index 000000000..2fbb892f6 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/heuristics/or_split.py @@ -0,0 +1,149 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""OR-split identification heuristic. + +For every AND-split we check, pairwise, whether its task successors are +sometimes mutually exclusive and sometimes concurrent (in roughly equal +proportions). When that pattern holds for the majority of pairs, the +AND-split is rewritten as an OR-split — modelling inclusive-choice +behaviour rather than strict parallelism. +""" +from collections import defaultdict +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple + +from pm4py.algo.discovery.split_miner.dtypes.log import RefinedTrace +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.algo.discovery.split_miner.heuristics.abc import Heuristic + + +def _collect_intervals( + trace: RefinedTrace, +) -> Dict[str, List[Tuple[int, int]]]: + open_starts: Dict[str, List[int]] = defaultdict(list) + intervals: Dict[str, List[Tuple[int, int]]] = defaultdict(list) + for idx, (label, lc, _) in enumerate(trace): + if lc == "start": + open_starts[label].append(idx) + else: + if open_starts[label]: + s = open_starts[label].pop(0) + intervals[label].append((s, idx)) + else: + intervals[label].append((idx, idx)) + return intervals + + +def _pair_observation( + refined_traces: List[RefinedTrace], +) -> Tuple[Dict[FrozenSet[str], int], Dict[FrozenSet[str], int]]: + concurrent: Dict[FrozenSet[str], int] = defaultdict(int) + exclusive: Dict[FrozenSet[str], int] = defaultdict(int) + + universe: Set[str] = set() + for trace in refined_traces: + for label, _, _ in trace: + universe.add(label) + + for trace in refined_traces: + intervals = _collect_intervals(trace) + labels = list(intervals.keys()) + for i, a in enumerate(labels): + for b in labels[i + 1:]: + pair = frozenset((a, b)) + if any( + s1 < e2 and s2 < e1 + for (s1, e1) in intervals[a] + for (s2, e2) in intervals[b] + ): + concurrent[pair] += 1 + present = {label for label, _, _ in trace} + absent = universe - present + for a in present: + for b in absent: + exclusive[frozenset((a, b))] += 1 + return concurrent, exclusive + + +def _pair_eligible(conc: int, excl: int) -> bool: + if conc == 0 or excl == 0: + return False + return 2 * conc >= excl and 2 * excl >= conc + + +def _resolve_to_task( + wg: WorkingGraph, node: str, depth: int = 0 +) -> Optional[str]: + if depth > 32: + return None + n = wg.nodes.get(node) + if n is None: + return None + if n.kind == "task": + return n.label + if n.kind in {"start", "end"}: + return None + for s in wg.successors(node): + label = _resolve_to_task(wg, s, depth + 1) + if label is not None: + return label + return None + + +class OrSplitHeuristic(Heuristic): + """Relabel AND-splits as OR-splits when the log supports it.""" + + @classmethod + def apply( + cls, + wg: WorkingGraph, + refined_traces: Optional[List[RefinedTrace]] = None, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + if not refined_traces: + return + conc_counts, excl_counts = _pair_observation(refined_traces) + + for and_id in [ + nid for nid, n in list(wg.nodes.items()) if n.kind == "and" + ]: + succs = wg.successors(and_id) + if len(succs) < 2: + continue + resolved = [ + lbl + for lbl in (_resolve_to_task(wg, s) for s in succs) + if lbl is not None + ] + if len(resolved) < 2: + continue + eligible = 0 + total = 0 + for i, a in enumerate(resolved): + for b in resolved[i + 1:]: + pair = frozenset((a, b)) + total += 1 + if _pair_eligible( + conc_counts.get(pair, 0), excl_counts.get(pair, 0) + ): + eligible += 1 + if total > 0 and eligible * 2 > total: + wg.nodes[and_id].kind = "or" diff --git a/pm4py/algo/discovery/split_miner/joins/__init__.py b/pm4py/algo/discovery/split_miner/joins/__init__.py new file mode 100644 index 000000000..ee36c3d5d --- /dev/null +++ b/pm4py/algo/discovery/split_miner/joins/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.joins import abc, classic diff --git a/pm4py/algo/discovery/split_miner/joins/abc.py b/pm4py/algo/discovery/split_miner/joins/abc.py new file mode 100644 index 000000000..fbe719980 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/joins/abc.py @@ -0,0 +1,39 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for the joins-discovery phase.""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph + + +class JoinsDiscoverer(ABC): + """Insert join gateways for every node with multiple incoming edges.""" + + @classmethod + @abstractmethod + def apply( + cls, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + """Mutate ``wg`` in-place by inserting the discovered joins.""" diff --git a/pm4py/algo/discovery/split_miner/joins/classic.py b/pm4py/algo/discovery/split_miner/joins/classic.py new file mode 100644 index 000000000..ae956f41b --- /dev/null +++ b/pm4py/algo/discovery/split_miner/joins/classic.py @@ -0,0 +1,170 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Join-gateway discovery. + +Without an explicit RPST we approximate the SESE hierarchy by computing, +for each incoming edge of a multi-incoming target, the *set* of split +gateways that gate tokens reaching that edge — the nearest split on +every backward path, traversed transitively through intermediate join +gateways. Two predecessors can be grouped under one homogeneous join +only if their origin sets are identical and contain exactly one split; +the resulting join carries the same type as that split. Otherwise the +predecessors fall through to a single OR-join, modelling the +heterogeneous SESE fragment they sit in. Loop-joins (any incoming +back-edge) collapse into a single XOR-join as a special case. +""" +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple + +from pm4py.algo.discovery.split_miner.sese.rpst import analyse +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.algo.discovery.split_miner.joins.abc import JoinsDiscoverer + + +_SPLIT_KINDS = {"xor", "and", "or"} + + +def _is_split(wg: WorkingGraph, node: str) -> bool: + n = wg.nodes.get(node) + if n is None: + return False + return n.kind in _SPLIT_KINDS and len(wg.out_edges.get(node, [])) > 1 + + +def _split_origins( + wg: WorkingGraph, + edge_source: str, + skip: Set[str], + back_edges: Set[Tuple[str, str]], +) -> Set[str]: + """Collect every split that is the first split on a backward path. + + Walks backward from ``edge_source``. When a split gateway is reached + on a path the walk terminates *that path* and records the split. + When a join gateway is reached (single outgoing edge, multiple + incoming edges) the walk recurses into each predecessor — a join + receives tokens from every split feeding it, so every such split is + a legitimate origin for any edge leaving the join. Cycles and + back-edges are skipped. + """ + origins: Set[str] = set() + on_path: Set[str] = set() + + def visit(node: str) -> None: + if node in on_path: + return + if _is_split(wg, node) and node not in skip: + origins.add(node) + return + on_path.add(node) + for p in wg.predecessors(node): + if (p, node) in back_edges: + continue + visit(p) + on_path.discard(node) + + visit(edge_source) + return origins + + +def _add_single_join( + wg: WorkingGraph, + t: str, + kind: str, + sources: List[str], +) -> None: + g = wg.add_node(kind, label=kind) + for p in sources: + wg.remove_edge(p, t) + wg.add_edge(p, g) + wg.add_edge(g, t) + + +def _join_one( + wg: WorkingGraph, + t: str, + back_edges: Set[Tuple[str, str]], +) -> None: + if any((p, t) in back_edges for p in wg.predecessors(t)): + _add_single_join(wg, t, "xor", list(wg.predecessors(t))) + return + + skip: Set[str] = set() + max_rounds = len(wg.nodes) + 4 + for _ in range(max_rounds): + preds = list(wg.predecessors(t)) + if len(preds) <= 1: + return + + # Group predecessors by their full origin set. Only predecessors + # whose origin sets are identical (and contain exactly one + # split) can collapse into a homogeneous join. + pred_origins: Dict[str, FrozenSet[str]] = { + p: frozenset(_split_origins(wg, p, skip, back_edges)) + for p in preds + } + groups: Dict[FrozenSet[str], List[str]] = {} + for p in preds: + key = pred_origins[p] + if not key: + continue + groups.setdefault(key, []).append(p) + + progress = False + for origin_set, group in groups.items(): + if len(group) < 2: + continue + if len(origin_set) != 1: + # Heterogeneous origin set — leave for the fallback + # OR-join below; trying to merge here would silently + # synchronise tokens from unrelated splits. + continue + single_origin = next(iter(origin_set)) + kind = wg.nodes[single_origin].kind + _add_single_join(wg, t, kind, group) + skip.add(single_origin) + progress = True + + if not progress: + remaining = list(wg.predecessors(t)) + if len(remaining) > 1: + _add_single_join(wg, t, "or", remaining) + return + + +class ClassicJoinsDiscoverer(JoinsDiscoverer): + """Bottom-up join insertion guided by split-origin grouping.""" + + @classmethod + def apply( + cls, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + info = analyse(wg) + targets = [ + nid + for nid, n in list(wg.nodes.items()) + if n.kind in {"task", "end"} + and len(wg.in_edges.get(nid, [])) > 1 + ] + for t in targets: + _join_one(wg, t, info.back_edges) diff --git a/pm4py/algo/discovery/split_miner/or_min/__init__.py b/pm4py/algo/discovery/split_miner/or_min/__init__.py new file mode 100644 index 000000000..a2800754c --- /dev/null +++ b/pm4py/algo/discovery/split_miner/or_min/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.or_min import abc, classic diff --git a/pm4py/algo/discovery/split_miner/or_min/abc.py b/pm4py/algo/discovery/split_miner/or_min/abc.py new file mode 100644 index 000000000..4678acf25 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/or_min/abc.py @@ -0,0 +1,39 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for the OR-join minimisation phase.""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph + + +class OrJoinMinimizer(ABC): + """Replace trivial OR-joins by their XOR or AND equivalent.""" + + @classmethod + @abstractmethod + def apply( + cls, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + """Mutate ``wg`` in-place.""" diff --git a/pm4py/algo/discovery/split_miner/or_min/classic.py b/pm4py/algo/discovery/split_miner/or_min/classic.py new file mode 100644 index 000000000..0213a2db0 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/or_min/classic.py @@ -0,0 +1,145 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""OR-join minimisation. + +Replace every *trivial* OR-join with the semantically equivalent +XOR- or AND-join. An OR-join is trivial when, for every split gateway +between its minimal dominator and itself, the incoming edges that may +receive tokens via that split all carry the same semantic (all XOR or +all AND). +""" +from typing import Any, Dict, Optional, Set, Tuple + +import networkx as nx + +from pm4py.algo.discovery.split_miner.sese.rpst import analyse +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.algo.discovery.split_miner.or_min.abc import OrJoinMinimizer + + +def _to_digraph(wg: WorkingGraph) -> "nx.DiGraph": + g = nx.DiGraph() + g.add_nodes_from(wg.nodes.keys()) + for s, t in wg.edges(): + g.add_edge(s, t) + return g + + +def _check_or_semantic( + wg: WorkingGraph, + g: "nx.DiGraph", + j: str, + info, +) -> str: + d = info.dominator.get(j) + if d is None: + return "or" + + forward = nx.descendants(g, d) | {d} + backward = nx.ancestors(g, j) | {j} + between = forward & backward + # ``between`` is a Python ``set``; sort before iterating so the + # eventual semantic decision is independent of hash randomisation. + splits = [ + n + for n in sorted(between, reverse=True) + if wg.nodes[n].kind in {"xor", "and", "or"} + and len(wg.out_edges.get(n, [])) > 1 + and n != j + ] + if not splits: + return "or" + + incoming_of_j = set(wg.in_edges.get(j, [])) + + def reaches(x: str) -> Set[Tuple[str, str]]: + if x == j: + return set() + try: + descendants_x = nx.descendants(g, x) | {x} + except nx.NetworkXError: + return set() + return {(p, j) for p in incoming_of_j if p in descendants_x} + + semantic: str = "" + for g_s in splits: + outs = list(wg.out_edges.get(g_s, [])) + reach: Dict[str, Set[Tuple[str, str]]] = { + x: reaches(x) for x in outs + } + g_kind = wg.nodes[g_s].kind + if g_kind == "or": + return "or" + + for i in range(len(outs)): + for k in range(len(outs)): + if i == k: + continue + t1, t2 = reach[outs[i]], reach[outs[k]] + if t1 == t2: + continue + inter = t1 & t2 + s1 = t1 - inter + s2 = t2 - inter + fully_separating = bool(s1 and s2) + asymmetric = bool(s1) ^ bool(s2) + if fully_separating or (asymmetric and g_kind == "and"): + if semantic and semantic != g_kind: + return "or" + semantic = g_kind + + if g_kind == "xor" and any(not reach[x] for x in outs): + if semantic and semantic != "xor": + return "or" + semantic = "xor" + + return semantic if semantic else "or" + + +class ClassicOrJoinMinimizer(OrJoinMinimizer): + """Replace trivial OR-joins in-place.""" + + @classmethod + def apply( + cls, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + or_joins = [ + nid + for nid, n in list(wg.nodes.items()) + if n.kind == "or" and len(wg.in_edges.get(nid, [])) > 1 + ] + if not or_joins: + return + + info = analyse(wg) + g = _to_digraph(wg) + for be in info.back_edges: + if g.has_edge(*be): + g.remove_edge(*be) + + for j in or_joins: + new_kind = _check_or_semantic(wg, g, j, info) + if new_kind in {"xor", "and"}: + wg.nodes[j].kind = new_kind + wg.nodes[j].label = new_kind diff --git a/pm4py/algo/discovery/split_miner/sese/__init__.py b/pm4py/algo/discovery/split_miner/sese/__init__.py new file mode 100644 index 000000000..3878b7984 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/sese/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.sese import rpst diff --git a/pm4py/algo/discovery/split_miner/sese/rpst.py b/pm4py/algo/discovery/split_miner/sese/rpst.py new file mode 100644 index 000000000..90e714044 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/sese/rpst.py @@ -0,0 +1,104 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""SESE / RPST helpers shared by joins discovery and OR-join minimisation. + +A full Refined Process Structure Tree implementation needs triconnected- +component decomposition. For Split Miner's purposes we only need to know, +for every task with multiple incoming edges, (a) which incoming edges are +back-edges of a loop and (b) the unique entry of the smallest enclosing +single-entry single-exit fragment. We compute (a) with an iterative DFS +and (b) with NetworkX's ``immediate_dominators`` on the back-edge-free +skeleton. +""" +from dataclasses import dataclass +from typing import Dict, Set, Tuple + +import networkx as nx + +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.util import nx_utils + + +@dataclass +class SeseInfo: + back_edges: Set[Tuple[str, str]] + dominator: Dict[str, str] + + +def _to_digraph(wg: WorkingGraph): + g = nx_utils.DiGraph() + g.add_nodes_from(wg.nodes.keys()) + for s, t in wg.edges(): + g.add_edge(s, t) + return g + + +def _back_edges(g, source: str) -> Set[Tuple[str, str]]: + color: Dict[str, int] = {n: 0 for n in g.nodes} + back: Set[Tuple[str, str]] = set() + + def _dfs(start: str) -> None: + color[start] = 1 + stack = [(start, list(g.successors(start)))] + while stack: + u, children = stack[-1] + if not children: + color[u] = 2 + stack.pop() + continue + v = children.pop() + if color[v] == 1: + back.add((u, v)) + elif color[v] == 0: + color[v] = 1 + stack.append((v, list(g.successors(v)))) + + _dfs(source) + for n in g.nodes: + if color[n] == 0: + _dfs(n) + return back + + +def analyse(wg: WorkingGraph) -> SeseInfo: + """Compute back-edges + immediate dominators of ``wg``.""" + g = _to_digraph(wg) + if not wg.start_id: + raise ValueError( + "WorkingGraph.start_id must be set before SESE analysis" + ) + back = _back_edges(g, wg.start_id) + + acyclic = nx_utils.DiGraph() + acyclic.add_nodes_from(g.nodes) + for e in g.edges: + if e not in back: + acyclic.add_edge(*e) + + dom: Dict[str, str] = {} + reachable = nx_utils.descendants(acyclic, wg.start_id) | {wg.start_id} + sub = acyclic.subgraph(reachable) + imm = nx.immediate_dominators(sub, wg.start_id) + for v, d in imm.items(): + if v != d: + dom[v] = d + return SeseInfo(back_edges=back, dominator=dom) diff --git a/pm4py/algo/discovery/split_miner/splits/__init__.py b/pm4py/algo/discovery/split_miner/splits/__init__.py new file mode 100644 index 000000000..cf3fa1b0b --- /dev/null +++ b/pm4py/algo/discovery/split_miner/splits/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.splits import abc, classic diff --git a/pm4py/algo/discovery/split_miner/splits/abc.py b/pm4py/algo/discovery/split_miner/splits/abc.py new file mode 100644 index 000000000..32c96e02f --- /dev/null +++ b/pm4py/algo/discovery/split_miner/splits/abc.py @@ -0,0 +1,39 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Abstract base class for the splits-discovery phase.""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph + + +class SplitsDiscoverer(ABC): + """Insert split gateways for every task with multiple successors.""" + + @classmethod + @abstractmethod + def apply( + cls, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + """Mutate ``wg`` in-place with the discovered split hierarchy.""" diff --git a/pm4py/algo/discovery/split_miner/splits/classic.py b/pm4py/algo/discovery/split_miner/splits/classic.py new file mode 100644 index 000000000..efe1e7693 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/splits/classic.py @@ -0,0 +1,175 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Split-gateway discovery. + +For every task with more than one outgoing edge we build a hierarchy of +XOR / AND gateways that captures the exclusion and concurrency relations +between its direct successors. Cover and future sets are tracked per +successor and per newly inserted gateway; the iteration stops once only +one root remains. A fallback OR-split is inserted when no further XOR or +AND grouping can be discovered. +""" +from typing import Any, Dict, List, Optional, Set + +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.algo.discovery.split_miner.splits.abc import SplitsDiscoverer + + +def _initial_cover_future( + wg: WorkingGraph, d_successors: List[str] +): + cover: Dict[str, Set[str]] = {} + future: Dict[str, Set[str]] = {} + for s in d_successors: + cover[s] = {s} + future[s] = { + other + for other in d_successors + if other != s and wg.is_concurrent(s, other) + } + return cover, future + + +def _discover_xor_split( + wg: WorkingGraph, + s_set: List[str], + cover: Dict[str, Set[str]], + future: Dict[str, Set[str]], +) -> Optional[str]: + for s1 in s_set: + group: Set[str] = set() + c_union: Set[str] = set(cover[s1]) + for s2 in s_set: + if s2 == s1: + continue + if future[s1] == future[s2]: + group.add(s2) + c_union |= cover[s2] + if group: + group.add(s1) + g = wg.add_node("xor", label="xor") + for s in group: + wg.add_edge(g, s) + s_set.remove(s) + s_set.append(g) + cover[g] = c_union + future[g] = set(future[s1]) + return g + return None + + +def _discover_and_split( + wg: WorkingGraph, + s_set: List[str], + cover: Dict[str, Set[str]], + future: Dict[str, Set[str]], +) -> Optional[str]: + for s1 in s_set: + group: Set[str] = set() + c_union: Set[str] = set(cover[s1]) + f_inter: Set[str] = set(future[s1]) + cf_s1 = cover[s1] | future[s1] + for s2 in s_set: + if s2 == s1: + continue + cf_s2 = cover[s2] | future[s2] + if cf_s1 == cf_s2: + group.add(s2) + c_union |= cover[s2] + f_inter &= future[s2] + if group: + group.add(s1) + g = wg.add_node("and", label="and") + for s in group: + wg.add_edge(g, s) + s_set.remove(s) + s_set.append(g) + cover[g] = c_union + future[g] = f_inter + return g + return None + + +def _fallback_or_split( + wg: WorkingGraph, + s_set: List[str], + cover: Dict[str, Set[str]], + future: Dict[str, Set[str]], +) -> str: + g = wg.add_node("or", label="or") + c_union: Set[str] = set() + for s in list(s_set): + wg.add_edge(g, s) + c_union |= cover[s] + s_set.clear() + s_set.append(g) + cover[g] = c_union + future[g] = set() + return g + + +def _split_one(wg: WorkingGraph, t: str) -> None: + d_succs = wg.successors(t) + cover, future = _initial_cover_future(wg, d_succs) + s_set: List[str] = list(d_succs) + + for s in list(d_succs): + wg.remove_edge(t, s) + + safety = 0 + max_iter = 4 * len(d_succs) + 8 + while len(s_set) > 1: + progress = False + if _discover_xor_split(wg, s_set, cover, future) is not None: + progress = True + elif _discover_and_split(wg, s_set, cover, future) is not None: + progress = True + if not progress: + _fallback_or_split(wg, s_set, cover, future) + break + safety += 1 + if safety > max_iter: + _fallback_or_split(wg, s_set, cover, future) + break + + if s_set: + wg.add_edge(t, s_set[0]) + + +class ClassicSplitsDiscoverer(SplitsDiscoverer): + """Build a hierarchy of XOR/AND gateways at every split-task.""" + + @classmethod + def apply( + cls, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + split_tasks = [ + nid + for nid, n in list(wg.nodes.items()) + if n.kind in {"task", "start"} + ] + for t in split_tasks: + if len(wg.successors(t)) <= 1: + continue + _split_one(wg, t) diff --git a/pm4py/algo/discovery/split_miner/variants/__init__.py b/pm4py/algo/discovery/split_miner/variants/__init__.py new file mode 100644 index 000000000..dff435a00 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/variants/__init__.py @@ -0,0 +1,22 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +from pm4py.algo.discovery.split_miner.variants import abc, classic, sm2 diff --git a/pm4py/algo/discovery/split_miner/variants/abc.py b/pm4py/algo/discovery/split_miner/variants/abc.py new file mode 100644 index 000000000..f4f519714 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/variants/abc.py @@ -0,0 +1,293 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Base class implementing the Split Miner framework. + +How to extend: + +1. Subclass :class:`SplitMinerFramework`. +2. Override :meth:`do_extract_traces` and any of the other ``do_*`` phase + methods whose behaviour differs from the default classic pipeline. +3. Expose a top-level ``apply`` function that instantiates the subclass + and forwards to :meth:`apply`. + +The :meth:`apply` driver runs the canonical Split Miner pipeline: +(1) trace extraction, (2) DFG + loop discovery, (3) concurrency, +(4) PDFG filtering, (5) initial BPMN, (6) split discovery, +(7) optional heuristics, (8) join discovery, (9) OR-join minimisation, +(10) BPMN export. The default implementations of every ``do_*`` method +match the classic Split Miner; subclasses change only the phases that +genuinely differ from the classic flow. +""" +from abc import ABC +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple, Union + +import pandas as pd + +from pm4py.algo.discovery.split_miner.bpmn_export.classic import ( + ClassicBPMNExporter, +) +from pm4py.algo.discovery.split_miner.bpmn_init.classic import ( + ClassicBPMNInitializer, +) +from pm4py.algo.discovery.split_miner.concurrency.classic import ( + ClassicConcurrencyOracle, +) +from pm4py.algo.discovery.split_miner.dfg_discovery.classic import ( + ClassicDFGDiscoverer, + strip_self_loops, +) +from pm4py.algo.discovery.split_miner.dtypes.concurrency import ( + ConcurrencyResult, +) +from pm4py.objects.bpmn.util import reduction +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.filtering import FilterResult +from pm4py.algo.discovery.split_miner.dtypes.log import ( + END_LABEL, + LabelTrace, + START_LABEL, +) +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.algo.discovery.split_miner.filtering.max_min import MaxMinFilterer +from pm4py.algo.discovery.split_miner.joins.classic import ( + ClassicJoinsDiscoverer, +) +from pm4py.algo.discovery.split_miner.or_min.classic import ( + ClassicOrJoinMinimizer, +) +from pm4py.algo.discovery.split_miner.splits.classic import ( + ClassicSplitsDiscoverer, +) +from pm4py.objects.bpmn.obj import BPMN +from pm4py.objects.conversion.log import converter as log_conversion +from pm4py.objects.log.obj import EventLog, EventStream +from pm4py.util import constants, exec_utils +from pm4py.util import xes_constants as xes_util + + +class Parameters(Enum): + OR_MINIMISE = "split_miner_or_minimise" + + +DEFAULT_OR_MINIMISE = True + + +class SplitMinerFramework(ABC): + """Pipeline runner shared by every Split Miner variant.""" + + # ------------------------------------------------------------------ + # Phase 0 — log extraction + # ------------------------------------------------------------------ + + def do_extract_traces( + self, + log: Union[EventLog, EventStream, pd.DataFrame], + parameters: Optional[Dict[str, Any]] = None, + ) -> List[LabelTrace]: + """Project a pm4py log onto label sequences wrapped with sentinels. + + Each trace becomes ``[START_LABEL, *activity_labels, END_LABEL]`` + so the resulting BPMN has a single source and a single sink. + Subclasses that need richer per-event data (e.g. lifecycle phases) + override this method. + """ + parameters = parameters or {} + activity_key = exec_utils.get_param_value( + constants.PARAMETER_CONSTANT_ACTIVITY_KEY, + parameters, + xes_util.DEFAULT_NAME_KEY, + ) + event_log = ( + log + if isinstance(log, EventLog) + else log_conversion.apply( + log, variant=log_conversion.Variants.TO_EVENT_LOG + ) + ) + traces: List[LabelTrace] = [] + for trace in event_log: + labels: LabelTrace = [] + for ev in trace: + if activity_key in ev: + labels.append(str(ev[activity_key])) + if labels: + traces.append([START_LABEL, *labels, END_LABEL]) + return traces + + # ------------------------------------------------------------------ + # Phase 1 — DFG + loop discovery + # ------------------------------------------------------------------ + + def do_dfg_discovery( + self, + traces: List[Any], + parameters: Optional[Dict[str, Any]] = None, + ) -> Tuple[DFG, LoopInfo]: + return ClassicDFGDiscoverer.apply(traces, parameters) + + # ------------------------------------------------------------------ + # Phase 2 — concurrency + # ------------------------------------------------------------------ + + def do_concurrency( + self, + dfg: DFG, + traces: Optional[List[Any]], + loops: LoopInfo, + parameters: Optional[Dict[str, Any]] = None, + ) -> ConcurrencyResult: + return ClassicConcurrencyOracle.apply(dfg, traces, loops, parameters) + + # ------------------------------------------------------------------ + # Phase 3 — filter the pruned DFG + # ------------------------------------------------------------------ + + def do_filter( + self, + pdfg: DFG, + parameters: Optional[Dict[str, Any]] = None, + ) -> FilterResult: + return MaxMinFilterer.apply(pdfg, parameters) + + # ------------------------------------------------------------------ + # Phase 4 — initialise working BPMN + # ------------------------------------------------------------------ + + def do_build_initial_bpmn( + self, + filtered: FilterResult, + concurrency: ConcurrencyResult, + loops: LoopInfo, + parameters: Optional[Dict[str, Any]] = None, + ) -> WorkingGraph: + return ClassicBPMNInitializer.apply( + filtered, concurrency, loops, parameters + ) + + # ------------------------------------------------------------------ + # Phase 5 — split discovery + # ------------------------------------------------------------------ + + def do_discover_splits( + self, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + ClassicSplitsDiscoverer.apply(wg, parameters) + + # ------------------------------------------------------------------ + # Phase 6 — variant-specific heuristics (no-op by default) + # ------------------------------------------------------------------ + + def do_apply_heuristics( + self, + wg: WorkingGraph, + traces: List[Any], + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + """Run any variant-specific heuristics. Default: nothing to do.""" + + # ------------------------------------------------------------------ + # Phase 7 — join discovery + # ------------------------------------------------------------------ + + def do_discover_joins( + self, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + ClassicJoinsDiscoverer.apply(wg, parameters) + + # ------------------------------------------------------------------ + # Phase 8 — OR-join minimisation + # ------------------------------------------------------------------ + + def do_minimize_or_joins( + self, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + ClassicOrJoinMinimizer.apply(wg, parameters) + + # ------------------------------------------------------------------ + # Phase 9 — export + # ------------------------------------------------------------------ + + def do_export_bpmn( + self, + wg: WorkingGraph, + parameters: Optional[Dict[str, Any]] = None, + ) -> BPMN: + bpmn = ClassicBPMNExporter.apply(wg, parameters) + return reduction.apply(bpmn) + + # ------------------------------------------------------------------ + # Pipeline driver + # ------------------------------------------------------------------ + + def apply( + self, + log: Union[EventLog, EventStream, pd.DataFrame, DFG, str], + parameters: Optional[Dict[str, Any]] = None, + ) -> BPMN: + parameters = parameters or {} + + if isinstance(log, str): + # A file path was supplied directly — read it via pm4py so + # both ``classic`` and ``sm2`` variants accept paths. + from pm4py.objects.log.importer.xes import importer as xes_importer + + log = xes_importer.apply(log) + + if isinstance(log, dict): + # Pre-computed DFG path — phases 0 and 1 are bypassed. + dfg = log + loops = LoopInfo( + self_loops={a for (a, b) in dfg.keys() if a == b}, + ) + traces: List[Any] = [] + else: + traces = self.do_extract_traces(log, parameters) + if not traces: + raise ValueError( + "Cannot run Split Miner: the supplied log is empty" + ) + dfg, loops = self.do_dfg_discovery(traces, parameters) + + dfg_no_self = strip_self_loops(dfg) + conc = self.do_concurrency(dfg_no_self, traces, loops, parameters) + filt = self.do_filter(conc.pdfg, parameters) + wg = self.do_build_initial_bpmn(filt, conc, loops, parameters) + + self.do_discover_splits(wg, parameters) + self.do_apply_heuristics(wg, traces, parameters) + self.do_discover_joins(wg, parameters) + + or_minimise = exec_utils.get_param_value( + Parameters.OR_MINIMISE, parameters, DEFAULT_OR_MINIMISE + ) + if or_minimise: + self.do_minimize_or_joins(wg, parameters) + + return self.do_export_bpmn(wg, parameters) diff --git a/pm4py/algo/discovery/split_miner/variants/classic.py b/pm4py/algo/discovery/split_miner/variants/classic.py new file mode 100644 index 000000000..0c21f8ce0 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/variants/classic.py @@ -0,0 +1,65 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Classic Split Miner. + +Inherits the full default pipeline from :class:`SplitMinerFramework` +without further overrides — every default ``do_*`` method already +implements the classic behaviour. +""" +from enum import Enum +from typing import Any, Dict, Optional, Union + +import pandas as pd + +from pm4py.algo.discovery.split_miner.concurrency.classic import ( + Parameters as ConcParameters, +) +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.filtering.max_min import ( + Parameters as FilterParameters, +) +from pm4py.algo.discovery.split_miner.variants.abc import ( + Parameters as FrameworkParameters, + SplitMinerFramework, +) +from pm4py.objects.bpmn.obj import BPMN +from pm4py.objects.log.obj import EventLog, EventStream +from pm4py.util import constants + + +class Parameters(Enum): + EPSILON = ConcParameters.EPSILON.value + ETA = FilterParameters.ETA.value + OR_MINIMISE = FrameworkParameters.OR_MINIMISE.value + ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY + + +class ClassicSplitMiner(SplitMinerFramework): + """Classic Split Miner — default pipeline.""" + + +def apply( + log: Union[EventLog, EventStream, pd.DataFrame, DFG], + parameters: Optional[Dict[str, Any]] = None, +) -> BPMN: + """Discover a BPMN model using classic Split Miner.""" + return ClassicSplitMiner().apply(log, parameters) diff --git a/pm4py/algo/discovery/split_miner/variants/sm2.py b/pm4py/algo/discovery/split_miner/variants/sm2.py new file mode 100644 index 000000000..98129e6f9 --- /dev/null +++ b/pm4py/algo/discovery/split_miner/variants/sm2.py @@ -0,0 +1,241 @@ +''' +PM4Py – A Process Mining Library for Python +Copyright (C) 2026 Process Intelligence Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see this software project's root or +visit . + +Website: https://processintelligence.solutions +Contact: info@processintelligence.solutions +''' +"""Split Miner 2.0. + +Differs from the classic pipeline in four phases: + + * trace extraction is lifecycle-aware — each event keeps its + ``start`` / ``end`` phase and its timestamp; + * the directly-follows graph uses the refined definition that + requires a ``start`` of ``b`` after the ``end`` of ``a`` with no + intervening end event; + * the concurrency oracle compares lifecycle overlaps rather than + directly-follows frequencies; + * two heuristics run between split and join discovery: an + improper-completion fix and an OR-split identification. +""" +from enum import Enum +from typing import Any, Dict, List, Optional, Union + +import pandas as pd + +from pm4py.algo.discovery.split_miner.concurrency.refined import ( + Parameters as ConcParameters, + RefinedConcurrencyOracle, +) +from pm4py.algo.discovery.split_miner.dfg_discovery.refined import ( + RefinedDFGDiscoverer, +) +from pm4py.algo.discovery.split_miner.dtypes.concurrency import ( + ConcurrencyResult, +) +from pm4py.algo.discovery.split_miner.dtypes.dfg import DFG +from pm4py.algo.discovery.split_miner.dtypes.log import ( + END_LABEL, + RefinedEvent, + RefinedTrace, + START_LABEL, +) +from pm4py.algo.discovery.split_miner.dtypes.loops import LoopInfo +from pm4py.algo.discovery.split_miner.dtypes.working_graph import WorkingGraph +from pm4py.algo.discovery.split_miner.filtering.max_min import ( + Parameters as FilterParameters, +) +from pm4py.algo.discovery.split_miner.heuristics.improper_completion import ( + ImproperCompletionHeuristic, +) +from pm4py.algo.discovery.split_miner.heuristics.or_split import ( + OrSplitHeuristic, +) +from pm4py.algo.discovery.split_miner.variants.abc import ( + Parameters as FrameworkParameters, + SplitMinerFramework, +) +from pm4py.objects.bpmn.obj import BPMN +from pm4py.objects.conversion.log import converter as log_conversion +from pm4py.objects.log.obj import EventLog, EventStream +from pm4py.objects.log.util import interval_lifecycle +from pm4py.util import constants, exec_utils +from pm4py.util import xes_constants as xes_util + + +class Parameters(Enum): + EPSILON = ConcParameters.EPSILON.value + ETA = FilterParameters.ETA.value + OR_MINIMISE = FrameworkParameters.OR_MINIMISE.value + ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY + TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY + + +class SM2SplitMiner(SplitMinerFramework): + """Split Miner 2.0 — lifecycle-aware variant with post-split heuristics.""" + + # ------------------------------------------------------------------ + # Phase 0 — lifecycle-aware trace extraction + # ------------------------------------------------------------------ + + def do_extract_traces( + self, + log: Union[EventLog, EventStream, pd.DataFrame], + parameters: Optional[Dict[str, Any]] = None, + ) -> List[RefinedTrace]: + parameters = parameters or {} + activity_key = exec_utils.get_param_value( + constants.PARAMETER_CONSTANT_ACTIVITY_KEY, + parameters, + xes_util.DEFAULT_NAME_KEY, + ) + timestamp_key = exec_utils.get_param_value( + constants.PARAMETER_CONSTANT_TIMESTAMP_KEY, + parameters, + xes_util.DEFAULT_TIMESTAMP_KEY, + ) + start_timestamp_key = xes_util.DEFAULT_START_TIMESTAMP_KEY + + event_log = ( + log + if isinstance(log, EventLog) + else log_conversion.apply( + log, variant=log_conversion.Variants.TO_EVENT_LOG + ) + ) + + # Delegate the standard XES lifecycle handling to pm4py: this + # pairs ``start``/``complete`` events into interval events that + # expose both a ``start_timestamp`` and a ``time:timestamp``, + # short-circuits when the log is already in interval form, and + # honours the same parameter conventions as the rest of pm4py. + interval_log = interval_lifecycle.to_interval( + event_log, parameters=parameters + ) + + traces: List[RefinedTrace] = [] + for raw_trace, conv_trace in zip(event_log, interval_log): + events: List[RefinedEvent] = self._refined_from_interval( + conv_trace, activity_key, start_timestamp_key, timestamp_key + ) + if not events: + # Fall back to the raw trace and treat every event as + # instantaneous — SM 2.0 then degenerates to the classic + # pipeline rather than crashing on the empty log. + events = self._refined_from_raw( + raw_trace, activity_key, timestamp_key + ) + + # Stable sort keeps the synthesised start before its matching + # end when both share a timestamp. + events_idx = sorted( + enumerate(events), + key=lambda p: (p[1][2] if p[1][2] is not None else 0, p[0]), + ) + events = [e for _, e in events_idx] + if events: + wrapped: RefinedTrace = [ + (START_LABEL, "start", None), + (START_LABEL, "end", None), + *events, + (END_LABEL, "start", None), + (END_LABEL, "end", None), + ] + traces.append(wrapped) + return traces + + @staticmethod + def _refined_from_interval( + trace, + activity_key: str, + start_timestamp_key: str, + timestamp_key: str, + ) -> List[RefinedEvent]: + """Convert a pm4py interval-format trace into refined events.""" + events: List[RefinedEvent] = [] + for ev in trace: + if activity_key not in ev: + continue + label = str(ev[activity_key]) + end_ts = ev.get(timestamp_key) + start_ts = ev.get(start_timestamp_key, end_ts) + events.append((label, "start", start_ts)) + events.append((label, "end", end_ts)) + return events + + @staticmethod + def _refined_from_raw( + trace, + activity_key: str, + timestamp_key: str, + ) -> List[RefinedEvent]: + """Fallback: every raw event becomes an instantaneous interval.""" + events: List[RefinedEvent] = [] + for ev in trace: + if activity_key not in ev: + continue + label = str(ev[activity_key]) + ts = ev.get(timestamp_key) + events.append((label, "start", ts)) + events.append((label, "end", ts)) + return events + + # ------------------------------------------------------------------ + # Phase 1 — refined DFG + # ------------------------------------------------------------------ + + def do_dfg_discovery( + self, + traces: List[RefinedTrace], + parameters: Optional[Dict[str, Any]] = None, + ): + return RefinedDFGDiscoverer.apply(traces, parameters) + + # ------------------------------------------------------------------ + # Phase 2 — lifecycle-overlap concurrency oracle + # ------------------------------------------------------------------ + + def do_concurrency( + self, + dfg: DFG, + traces: Optional[List[RefinedTrace]], + loops: LoopInfo, + parameters: Optional[Dict[str, Any]] = None, + ) -> ConcurrencyResult: + return RefinedConcurrencyOracle.apply(dfg, traces, loops, parameters) + + # ------------------------------------------------------------------ + # Phase 6 — lifecycle-driven heuristics + # ------------------------------------------------------------------ + + def do_apply_heuristics( + self, + wg: WorkingGraph, + traces: List[RefinedTrace], + parameters: Optional[Dict[str, Any]] = None, + ) -> None: + ImproperCompletionHeuristic.apply(wg, traces, parameters) + OrSplitHeuristic.apply(wg, traces, parameters) + + +def apply( + log: Union[EventLog, EventStream, pd.DataFrame], + parameters: Optional[Dict[str, Any]] = None, +) -> BPMN: + """Discover a BPMN model using Split Miner 2.0.""" + return SM2SplitMiner().apply(log, parameters) diff --git a/pm4py/discovery.py b/pm4py/discovery.py index a08526864..9f093b02e 100644 --- a/pm4py/discovery.py +++ b/pm4py/discovery.py @@ -1044,6 +1044,94 @@ def discover_bpmn_inductive( return convert_to_bpmn(pt) +def discover_bpmn_split_miner( + log: Union[EventLog, pd.DataFrame], + epsilon: float = 0.1, + eta: float = 0.4, + minimize_or_joins: bool = True, + variant: str = "classic", + activity_key: str = "concept:name", + timestamp_key: str = "time:timestamp", + case_id_key: str = "case:concept:name", +) -> BPMN: + """Discover a BPMN model using Split Miner. + + Two variants are available: + + * ``"classic"`` — the original Split Miner of Augusto, Conforti, + Dumas, La Rosa, Polyvyanyy (KAIS, 2019). + * ``"sm2"`` — Split Miner 2.0 (Augusto, Dumas, La Rosa, 2021): uses + activity lifecycle information to detect true concurrency and + inclusive (OR) choices. + + :param log: Event log or Pandas DataFrame. + :param epsilon: Concurrency threshold ε ∈ [0, 1] (default 0.1). With a + lower ε more pairs of activities are considered concurrent. + :param eta: Filtering percentile η ∈ [0, 1] (default 0.4). Lower η + keeps more edges (higher fitness, more complex model). + :param minimize_or_joins: Replace trivial OR-joins with their XOR/AND + equivalent (Algorithm 9 of the SM 1.x paper). Default ``True``. + :param variant: ``"classic"`` (default) or ``"sm2"``. + :param activity_key: XES attribute holding the activity label + (default ``"concept:name"``). + :param timestamp_key: XES attribute holding the event timestamp + (default ``"time:timestamp"``). Used only by the ``sm2`` variant. + :param case_id_key: Attribute used as case identifier in pandas inputs + (default ``"case:concept:name"``). + :return: A :class:`BPMN` model. + :rtype: ``BPMN`` + + .. code-block:: python3 + + import pm4py + + bpmn_graph = pm4py.discover_bpmn_split_miner( + log, + epsilon=0.1, + eta=0.4, + variant="sm2", + ) + """ + __event_log_deprecation_warning(log) + + if check_is_pandas_dataframe(log): + check_pandas_dataframe_columns( + log, + activity_key=activity_key, + timestamp_key=timestamp_key, + case_id_key=case_id_key, + ) + + from pm4py.algo.discovery.split_miner import algorithm as sm_alg + + if variant == "sm2": + sm_variant = sm_alg.SM2 + from pm4py.algo.discovery.split_miner.variants.sm2 import ( + Parameters as SmParameters, + ) + elif variant == "classic": + sm_variant = sm_alg.CLASSIC + from pm4py.algo.discovery.split_miner.variants.classic import ( + Parameters as SmParameters, + ) + else: + raise ValueError( + f"Unknown Split Miner variant: {variant!r} " + f"(expected 'classic' or 'sm2')" + ) + + parameters = { + SmParameters.EPSILON: epsilon, + SmParameters.ETA: eta, + SmParameters.OR_MINIMISE: minimize_or_joins, + SmParameters.ACTIVITY_KEY: activity_key, + } + if variant == "sm2": + parameters[SmParameters.TIMESTAMP_KEY] = timestamp_key + + return sm_alg.apply(log, parameters=parameters, variant=sm_variant) + + def discover_transition_system( log: Union[EventLog, pd.DataFrame], direction: str = "forward", diff --git a/tests/model.bpmn b/tests/model.bpmn new file mode 100644 index 000000000..f1cb84e47 --- /dev/null +++ b/tests/model.bpmn @@ -0,0 +1,251 @@ + + + + + idad08a959-c4d1-4546-98b1-78d15ccf5788 + id73565065-b86f-4938-a927-3eab72ba59e7 + + + id233690b5-a08e-44e4-a645-445f953f071f + id1c759be0-4735-4cc2-af4c-885f89815be1 + + + id6d1098a9-75d0-4669-ab50-9cad4629d827 + id8e696043-76ee-4a73-abca-3604457fe362 + + + id09ad0ca9-a062-4fe4-b34e-72eaea8ef956 + id3bcb0f93-1a9c-46b4-a318-ba60c2142eae + + + id3bcb0f93-1a9c-46b4-a318-ba60c2142eae + + + id1c759be0-4735-4cc2-af4c-885f89815be1 + idad08a959-c4d1-4546-98b1-78d15ccf5788 + id844a5ed3-103d-4bcd-b771-8c29c5fd70d5 + + + id7b3dd1b0-ba73-4ef0-aa09-24950a551824 + idddf0408c-26fa-4f26-92c9-d0a94e0aefea + + + id0c79509a-433c-439a-b7a0-99d93e9a49e5 + id96e5f736-08c9-460f-b959-2f62221c2b8e + + + idddf0408c-26fa-4f26-92c9-d0a94e0aefea + id32eb976a-7950-44db-849f-4b1f5a912c68 + id233690b5-a08e-44e4-a645-445f953f071f + + + id32eb976a-7950-44db-849f-4b1f5a912c68 + id0c79509a-433c-439a-b7a0-99d93e9a49e5 + id6d1098a9-75d0-4669-ab50-9cad4629d827 + + + id7b3dd1b0-ba73-4ef0-aa09-24950a551824 + + + id17c8d479-dcfd-45fa-a89a-ec5dcadc3725 + id73565065-b86f-4938-a927-3eab72ba59e7 + id09ad0ca9-a062-4fe4-b34e-72eaea8ef956 + + + id8e696043-76ee-4a73-abca-3604457fe362 + id96e5f736-08c9-460f-b959-2f62221c2b8e + idd52a3dd9-60d5-4303-93d7-fccf5487a046 + + + id844a5ed3-103d-4bcd-b771-8c29c5fd70d5 + ideeda887d-5746-464c-890a-d902748ae369 + + + ideeda887d-5746-464c-890a-d902748ae369 + idd52a3dd9-60d5-4303-93d7-fccf5487a046 + id3211b9c6-9320-4c4b-b97c-31fc777f3f50 + + + id3211b9c6-9320-4c4b-b97c-31fc777f3f50 + id17c8d479-dcfd-45fa-a89a-ec5dcadc3725 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/split_miner_2_heuristics_test.py b/tests/split_miner_2_heuristics_test.py new file mode 100644 index 000000000..275359af3 --- /dev/null +++ b/tests/split_miner_2_heuristics_test.py @@ -0,0 +1,364 @@ +"""Regression test for the SM 2.0 improper-completion heuristic. + +Reproduces the example has just four activities — ``A``, ``B``, ``C``, +``D`` — arranged as + + --> A --> AND-split --> { B, C, D } + ^ | + +----- loop edge ----+ + +i.e. one of the parallel branches loops back to ``A``. Heuristic 1 of +SM 2.0 must split this loop branch off the AND-split via a new +preceding XOR-split so the AND only carries the two forward branches: + + --> A --> XOR --> AND-split --> { B, C } + | + +------ loop branch ----> D ----> back to A + +The test asserts that: + + * the discovered AND-split contains only the forward branches (no + loop branch); + * a fresh XOR-split sits between the AND's parent and the AND + itself, owning the loop branch as one of its outgoing edges; + * without Heuristic 1 (verified via a subclass that skips the + heuristics phase) the AND-split still carries the loop branch. + +Only the four activities from the paper appear in the resulting BPMN +— the framework's sentinel start / end events and any auto-inserted +gateways are all that surrounds them. +""" +from collections import Counter +import datetime +import os +import sys + +# Make sure we import the local pm4py source, not whatever is in site-packages. +_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +import pandas as pd + +import pm4py +from pm4py.objects.bpmn.obj import BPMN + +assert pm4py.__file__.startswith(_REPO_ROOT), ( + f"SM 2.0 improper-completion test must run against the local pm4py " + f"copy in {_REPO_ROOT}, but pm4py was imported from {pm4py.__file__}" +) + + +# ---------------------------------------------------------------------- +# Log construction +# ---------------------------------------------------------------------- +# +# Every iteration is structured: +# +# A --> { B || C } --> D +# +# i.e. ``A`` is followed by a concurrent block over ``{B, C}`` (recorded +# as overlapping start / complete lifecycle pairs) and then by ``D``, +# which runs *after* the parallel block has finished (its lifecycle does +# not overlap B or C). Some cases iterate the whole structure once more +# (``D`` is followed by ``A`` again) before terminating, so the refined +# directly-follows graph contains the loop arc ``D -> A``. +# +# Because ``D`` is sequential (not part of the parallel block), the +# discovered model has an AND-split over ``{B, C}`` only — and the loop +# closes back to ``A`` through ``D``. Heuristic 1 must therefore give the +# parallel block a preceding XOR-split with a loop-back to ``A`` so that +# ``A`` can be repeated without entering (and having to complete) the +# parallel block. + +_MINUTE = datetime.timedelta(minutes=1) + +# In one iteration B and C overlap (concurrent); the ``last`` branch is +# the one whose completion is observed last, which fixes the +# directly-follows arc into D. D always runs strictly after both. +_PARALLEL_END_OPTIONS = { + "B": [("B", 2, 6), ("C", 2, 5)], # B finishes last + "C": [("B", 2, 5), ("C", 2, 6)], # C finishes last +} +_D_START_MIN = 7 # D starts after both B and C have completed +_D_END_MIN = 8 + + +# (pattern_name, number_of_iterations, last-completing branch of the block) +# Loop patterns iterate three times so the directly-follows arc +# ``D -> A`` is observed often enough (twice per case) to dominate +# ``D -> __end__`` and survive the source-to-sink filter as D's best +# outgoing edge. A couple of single-iteration cases keep the terminal +# arc ``D -> __end__`` alive. +PATTERNS = [ + ("loop_B", 4, "B"), + ("loop_C", 4, "C"), + ("term_B", 1, "B"), + ("term_C", 1, "C"), +] + + +def _emit_activity(rows, case_id, label, start, end): + rows.append( + { + "case:concept:name": case_id, + "concept:name": label, + "lifecycle:transition": "start", + "time:timestamp": start, + } + ) + rows.append( + { + "case:concept:name": case_id, + "concept:name": label, + "lifecycle:transition": "complete", + "time:timestamp": end, + } + ) + + +def _emit_iteration(rows, case_id, iter_origin, last_branch): + """Emit one ``A -> {B || C} -> D`` iteration whose parallel block is + finished last by ``last_branch``. Returns the origin for the next + iteration.""" + # A occupies minute 0..1 of the iteration. + _emit_activity(rows, case_id, "A", iter_origin, iter_origin + _MINUTE) + # Concurrent block B || C. + for label, s_off, e_off in _PARALLEL_END_OPTIONS[last_branch]: + _emit_activity( + rows, + case_id, + label, + iter_origin + s_off * _MINUTE, + iter_origin + e_off * _MINUTE, + ) + # D runs strictly after the parallel block. + _emit_activity( + rows, + case_id, + "D", + iter_origin + _D_START_MIN * _MINUTE, + iter_origin + _D_END_MIN * _MINUTE, + ) + return iter_origin + (_D_END_MIN + 1) * _MINUTE + + +def build_log() -> pd.DataFrame: + """Sixteen cases — every pattern repeated four times so the + directly-follows frequencies survive the percentile filter.""" + base = datetime.datetime(2026, 1, 1) + rows: list[dict] = [] + case_index = 0 + for _ in range(4): + for pattern_name, n_iter, last_branch in PATTERNS: + case_id = f"c{case_index:02d}_{pattern_name}" + origin = base + datetime.timedelta(days=case_index) + case_index += 1 + + # Each iteration is ``A -> {B || C} -> D``; consecutive + # iterations produce the loop arc ``D -> A``. + for _i in range(n_iter): + origin = _emit_iteration(rows, case_id, origin, last_branch) + return pd.DataFrame(rows) + + +def gateway_counts(bpmn: BPMN) -> Counter: + counts: Counter[str] = Counter() + for node in bpmn.get_nodes(): + if isinstance(node, BPMN.Task): + counts["task"] += 1 + elif isinstance(node, BPMN.StartEvent): + counts["start"] += 1 + elif isinstance(node, BPMN.EndEvent): + counts["end"] += 1 + elif isinstance(node, BPMN.ParallelGateway): + counts["and"] += 1 + elif isinstance(node, BPMN.ExclusiveGateway): + counts["xor"] += 1 + elif isinstance(node, BPMN.InclusiveGateway): + counts["or"] += 1 + return counts + + +def _and_split_branch_names(bpmn: BPMN) -> list[set[str]]: + """For every AND-split (parallel gateway with multiple outgoing + flows), return the set of *task labels* its branches eventually + lead to. Intermediate gateways are followed forward until a task + is reached.""" + + def _resolve(node: BPMN.BPMNNode, seen: set[str]) -> set[str]: + if isinstance(node, BPMN.Task): + return {node.get_name()} + nid = node.get_id() + if nid in seen: + return set() + seen = seen | {nid} + result: set[str] = set() + for f in bpmn.get_flows(): + if f.source is node: + result |= _resolve(f.target, seen) + return result + + splits: list[set[str]] = [] + for node in bpmn.get_nodes(): + if not isinstance(node, BPMN.ParallelGateway): + continue + out_edges = [f for f in bpmn.get_flows() if f.source is node] + if len(out_edges) <= 1: + continue + labels = set() + for f in out_edges: + labels |= _resolve(f.target, set()) + splits.append(labels) + return splits + + +def main() -> int: + df = build_log() + print( + f"log: {len(df)} events, {df['case:concept:name'].nunique()} cases" + ) + + # ---- 1. SM 2.0 with Heuristic 1 enabled ---------------------------- + bpmn = pm4py.discover_bpmn_split_miner( + df, + epsilon=0.2, + eta=0.0, + variant="sm2", + minimize_or_joins=False, + ) + counts = gateway_counts(bpmn) + print( + f"SM 2.0 with heuristic 1: nodes={dict(counts)} " + f"edges={len(list(bpmn.get_flows()))}" + ) + assert {n.get_name() for n in bpmn.get_nodes() if isinstance(n, BPMN.Task)} == { + "A", + "B", + "C", + "D", + }, "Only the four paper activities should appear in the BPMN" + + splits = _and_split_branch_names(bpmn) + assert len(splits) == 1, ( + f"Expected exactly one AND-split, got {len(splits)}: {splits}" + ) + and_branches = splits[0] + # The parallel block stays {B, C}; D is sequential after it, so it + # is not one of the AND-split's branches. + assert and_branches == {"B", "C"}, ( + f"The AND-split should carry exactly the parallel branches " + f"B and C, got {and_branches}" + ) + + # The AND-split's sole predecessor must be the XOR-split that + # heuristic 1 inserts (Fig 4b — preceding XOR-split). + and_node = next( + n for n in bpmn.get_nodes() if isinstance(n, BPMN.ParallelGateway) + and len([f for f in bpmn.get_flows() if f.source is n]) > 1 + ) + parents = [ + f.source for f in bpmn.get_flows() if f.target is and_node + ] + assert len(parents) == 1 and isinstance( + parents[0], BPMN.ExclusiveGateway + ), ( + "AND-split's predecessor should be the new XOR-split inserted " + f"by heuristic 1, got {[type(p).__name__ for p in parents]}" + ) + new_xor = parents[0] + xor_targets = [f.target for f in bpmn.get_flows() if f.source is new_xor] + assert and_node in xor_targets, ( + "The new XOR-split should feed the AND-split (forward branch)" + ) + assert len(xor_targets) >= 2, ( + "The new XOR-split must own a loop-back edge besides the " + f"forward edge into the AND-split, got {len(xor_targets)} outputs" + ) + + # Crucially, the new XOR-split must be able to reach ``A`` *without* + # passing through the parallel block or ``D`` — that is what lets + # ``A`` be repeated directly (the property missing before this fix). + import networkx as nx + + g = nx.DiGraph() + for f in bpmn.get_flows(): + g.add_edge(f.source.get_id(), f.target.get_id()) + a_node = next( + n for n in bpmn.get_nodes() + if isinstance(n, BPMN.Task) and n.get_name() == "A" + ) + d_node = next( + n for n in bpmn.get_nodes() + if isinstance(n, BPMN.Task) and n.get_name() == "D" + ) + g_without_d = g.copy() + g_without_d.remove_node(d_node.get_id()) + assert nx.has_path(g_without_d, new_xor.get_id(), a_node.get_id()), ( + "A must be repeatable from the new XOR-split without going " + "through D, but every path from the XOR-split back to A passes " + "through D" + ) + print( + "A is repeatable directly from the new XOR-split (no need to " + "execute D)" + ) + + # ---- 2. Sanity check: without Heuristic 1 there is no preceding ---- + # XOR-split, so A can only be repeated by going through D. + from pm4py.algo.discovery.split_miner.variants.sm2 import SM2SplitMiner + + class _NoH1(SM2SplitMiner): + def do_apply_heuristics(self, wg, traces, parameters=None): + return + + bpmn_no_h1 = _NoH1().apply( + df, + { + "split_miner_epsilon": 0.2, + "split_miner_eta": 0.0, + "split_miner_or_minimise": False, + }, + ) + g2 = nx.DiGraph() + for f in bpmn_no_h1.get_flows(): + g2.add_edge(f.source.get_id(), f.target.get_id()) + a2 = next( + n for n in bpmn_no_h1.get_nodes() + if isinstance(n, BPMN.Task) and n.get_name() == "A" + ) + d2 = next( + n for n in bpmn_no_h1.get_nodes() + if isinstance(n, BPMN.Task) and n.get_name() == "D" + ) + g2_without_d = g2.copy() + g2_without_d.remove_node(d2.get_id()) + # Without heuristic 1, every loop back to A must traverse D, so once + # D is removed A no longer lies on any cycle. + a_on_cycle_without_d = a2.get_id() in { + n for comp in nx.strongly_connected_components(g2_without_d) + if len(comp) > 1 + for n in comp + } + assert not a_on_cycle_without_d, ( + "Without heuristic 1, A should only be repeatable by going " + "through D (no D-free loop), but a D-free cycle through A exists" + ) + print( + "without heuristic 1 : A is only repeatable through D " + "(no direct loop-back)" + ) + + # Render the corrected BPMN for visual inspection. + png_path = os.path.join( + _REPO_ROOT, "tests", "sm2_improper_completion.png" + ) + pm4py.save_vis_bpmn(bpmn, png_path) + print(f"rendered {os.path.relpath(png_path, _REPO_ROOT)}") + + print("OK — SM 2.0 Heuristic 1 reproduces the paper Fig. 4b fix.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/split_miner_2_test.py b/tests/split_miner_2_test.py new file mode 100644 index 000000000..f4ef8cae1 --- /dev/null +++ b/tests/split_miner_2_test.py @@ -0,0 +1,244 @@ +"""Regression test for the SM 2.0 OR-split heuristic. + +Reproduces the L_rho_y running example used in the SM 2.0 paper to +motivate the OR-split heuristic: three branches B, C, D after a single +entry activity A, with the following pairwise lifecycle observations: + + pair (B, C): 3 concurrent / 3 mutually exclusive + pair (B, D): 4 concurrent / 2 mutually exclusive + pair (C, D): 5 concurrent / 1 mutually exclusive + +Two of the three pairs satisfy the eligibility predicate (``2*conc >= +excl`` and ``2*excl >= conc``), so a majority of pairs are "eligible +for inclusiveness". The SM 2.0 heuristic must therefore promote the +AND-split discovered over {B, C, D} into an OR-split. The classic +Split Miner does not see lifecycle information at all and is expected +to produce an AND-split on the same log. +""" +from collections import Counter +import datetime +import os +import sys + +# Make sure we import the local pm4py source, not whatever is in site-packages. +_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +import pandas as pd + +import pm4py +from pm4py.objects.bpmn.obj import BPMN + +assert pm4py.__file__.startswith(_REPO_ROOT), ( + f"SM 2.0 test must run against the local pm4py copy in {_REPO_ROOT}, " + f"but pm4py was imported from {pm4py.__file__}" +) + + +# ---------------------------------------------------------------------- +# Log construction +# ---------------------------------------------------------------------- +# +# Six cases — three with all of B, C, D, two with only C and D, one +# with only B and D — each preceded by A and followed by E. The +# concurrent block is recorded as start / complete lifecycle pairs +# with deliberately staggered end times: this both makes the +# intervals overlap (so SM 2.0's concurrency oracle fires) and lets +# every branch be the *last-ending* one in some case (so the refined +# DFG records ``B -> E``, ``C -> E`` and ``D -> E``, not just one of +# them). +# +# Pairwise totals across the six cases: +# +# B,C concurrent : 3 (three "all" cases) +# B,C exclusive : 3 (two "no-B" + one "no-C") +# B,D concurrent : 4 (three "all" + one "no-C") +# B,D exclusive : 2 (two "no-B") +# C,D concurrent : 5 (three "all" + two "no-B") +# C,D exclusive : 1 (one "no-C") +# +# This matches the paper's L_rho_y example. +# +# Each entry is ``(pattern_name, [(label, start_offset, end_offset)])`` +# in minutes relative to the case's starting timestamp. + +PATTERNS = ( + # "all" cases — B, C and D all overlap. Each case picks a + # different branch to be the last-ending one so the refined DFG + # ends up with B->E, C->E, and D->E. + ("all_B_last", [("B", 2, 14), ("C", 2, 10), ("D", 2, 12)]), + ("all_C_last", [("B", 2, 10), ("C", 2, 14), ("D", 2, 12)]), + ("all_D_last", [("B", 2, 10), ("C", 2, 12), ("D", 2, 14)]), + # "no B" cases — C and D overlap; rotate the last-ending branch + # so we end up with both C->E and D->E. + ("no_b_C_last", [("C", 2, 14), ("D", 2, 12)]), + ("no_b_D_last", [("C", 2, 12), ("D", 2, 14)]), + # "no C" case — B and D overlap. + ("no_c_B_last", [("B", 2, 14), ("D", 2, 12)]), +) + + +def _emit_activity(rows, case_id, label, start, end): + """Emit a (start, complete) lifecycle pair for a single activity.""" + rows.append( + { + "case:concept:name": case_id, + "concept:name": label, + "lifecycle:transition": "start", + "time:timestamp": start, + } + ) + rows.append( + { + "case:concept:name": case_id, + "concept:name": label, + "lifecycle:transition": "complete", + "time:timestamp": end, + } + ) + + +def build_log() -> pd.DataFrame: + base = datetime.datetime(2026, 1, 1) + minute = datetime.timedelta(minutes=1) + rows: list[dict] = [] + + for case_index, (pattern_name, branches) in enumerate(PATTERNS): + case_id = f"c_{pattern_name}" + t0 = base + datetime.timedelta(days=case_index) + + # A : sequential prefix occupying the first minute. + _emit_activity(rows, case_id, "A", t0, t0 + 1 * minute) + + # Concurrent block — every branch starts during minute 2 and + # ends at a branch-specific offset so the intervals overlap + # pairwise yet have distinct closing times. + block_end = 0 + for label, start_off, end_off in branches: + _emit_activity( + rows, + case_id, + label, + t0 + start_off * minute, + t0 + end_off * minute, + ) + block_end = max(block_end, end_off) + + # E : sequential suffix, scheduled strictly after the last + # branch finishes so the refined DFG records ``X -> E`` for + # whichever branch was the last to close. + _emit_activity( + rows, + case_id, + "E", + t0 + (block_end + 1) * minute, + t0 + (block_end + 2) * minute, + ) + + return pd.DataFrame(rows) + + +def gateway_counts(bpmn: BPMN) -> Counter: + counts: Counter[str] = Counter() + for node in bpmn.get_nodes(): + if isinstance(node, BPMN.Task): + counts["task"] += 1 + elif isinstance(node, BPMN.StartEvent): + counts["start"] += 1 + elif isinstance(node, BPMN.EndEvent): + counts["end"] += 1 + elif isinstance(node, BPMN.ParallelGateway): + counts["and"] += 1 + elif isinstance(node, BPMN.ExclusiveGateway): + counts["xor"] += 1 + elif isinstance(node, BPMN.InclusiveGateway): + counts["or"] += 1 + return counts + + +def _assert_pair_observations(df: pd.DataFrame) -> None: + """Sanity-check that the log produces the paper's pairwise counts.""" + from pm4py.algo.discovery.split_miner.heuristics.or_split import ( + _pair_observation, + ) + from pm4py.algo.discovery.split_miner.variants.sm2 import SM2SplitMiner + + refined = SM2SplitMiner().do_extract_traces(df) + conc, excl = _pair_observation(refined) + + def get(a, b): + return conc.get(frozenset((a, b)), 0), excl.get(frozenset((a, b)), 0) + + assert get("B", "C") == (3, 3), f"B,C counts wrong: {get('B','C')}" + assert get("B", "D") == (4, 2), f"B,D counts wrong: {get('B','D')}" + assert get("C", "D") == (5, 1), f"C,D counts wrong: {get('C','D')}" + print( + "pair observations match the paper: " + f"B,C={get('B','C')}, B,D={get('B','D')}, C,D={get('C','D')}" + ) + + +def main() -> int: + df = build_log() + print( + f"log: {len(df)} events, {df['case:concept:name'].nunique()} cases" + ) + + _assert_pair_observations(df) + + # ---- Classic Split Miner: no lifecycle awareness ------------------- + # The classic oracle only inspects directly-follows frequencies in + # the flat event sequence; because our synthetic log emits the + # concurrent block in a fixed lifecycle order (B_s, C_s, D_s, then + # B_e, C_e, D_e), the resulting DFG is highly asymmetric and the + # classic concurrency test cannot recover the mutual parallelism + # that the lifecycle structure encodes. This is precisely the + # situation SM 2.0 was designed to address, so we only assert that + # classic SM does *not* invent an OR-split here. + classic = pm4py.discover_bpmn_split_miner( + df, + epsilon=0.2, + eta=0.0, + variant="classic", + minimize_or_joins=False, + ) + classic_counts = gateway_counts(classic) + print( + f"classic SM 1.x: nodes={dict(classic_counts)} " + f"edges={len(list(classic.get_flows()))}" + ) + assert classic_counts["or"] == 0, ( + "Classic Split Miner must not produce OR-splits — " + f"got {dict(classic_counts)}" + ) + + # ---- Split Miner 2.0: heuristic 2 must fire ------------------------ + sm2 = pm4py.discover_bpmn_split_miner( + df, + epsilon=0.2, + eta=0.0, + variant="sm2", + minimize_or_joins=False, + ) + sm2_counts = gateway_counts(sm2) + print( + f"SM 2.0 : nodes={dict(sm2_counts)} " + f"edges={len(list(sm2.get_flows()))}" + ) + assert sm2_counts["or"] == 2, ( + "SM 2.0 should produce an OR-split over {B, C, D} (heuristic 2) " + "and the matching OR-join — " + f"got {dict(sm2_counts)}" + ) + assert sm2_counts["and"] == 0, ( + "After heuristic 2 the AND-split must be gone — " + f"got {dict(sm2_counts)}" + ) + + print("OK — SM 2.0 OR-split heuristic matches the paper example.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/split_miner_test.py b/tests/split_miner_test.py new file mode 100644 index 000000000..bc97bed8e --- /dev/null +++ b/tests/split_miner_test.py @@ -0,0 +1,110 @@ +"""Quick smoke test for the new Split Miner integration. + +Reproduces the running example of Augusto et al. (2019) — 10 distinct +traces, 10 occurrences each — and asserts that classic Split Miner +produces a BPMN matching Fig. 3c (8 tasks, 1 AND-split + 2 XOR-splits, +2 OR-joins or their AND/XOR equivalent after OR-minimisation). +""" +from collections import Counter +import datetime +import os +import sys + +# Make sure we import the local pm4py source, not whatever is in site-packages. +_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +import pandas as pd + +import pm4py +from pm4py.objects.bpmn.obj import BPMN + +assert pm4py.__file__.startswith(_REPO_ROOT), ( + f"Smoke test must run against the local pm4py copy in {_REPO_ROOT}, " + f"but pm4py was imported from {pm4py.__file__}" +) + +PAPER_EXAMPLE = ( + [list("abcgeh")] * 10 + + [list("abcfgh")] * 10 + + [list("abdgeh")] * 10 + + [list("abdegh")] * 10 + + [list("abecgh")] * 10 + + [list("abedgh")] * 10 + + [list("acbegh")] * 10 + + [list("acbfgh")] * 10 + + [list("adbegh")] * 10 + + [list("adbfgh")] * 10 +) + + +def build_log() -> pd.DataFrame: + base = datetime.datetime(2026, 1, 1) + rows = [] + for i, trace in enumerate(PAPER_EXAMPLE): + for j, label in enumerate(trace): + rows.append( + { + "case:concept:name": f"c{i:03d}", + "concept:name": label, + "time:timestamp": base + datetime.timedelta(minutes=10 * j), + } + ) + return pd.DataFrame(rows) + + +def gateway_counts(bpmn: BPMN) -> Counter: + counts: Counter[str] = Counter() + for node in bpmn.get_nodes(): + if isinstance(node, BPMN.Task): + counts["task"] += 1 + elif isinstance(node, BPMN.StartEvent): + counts["start"] += 1 + elif isinstance(node, BPMN.EndEvent): + counts["end"] += 1 + elif isinstance(node, BPMN.ParallelGateway): + counts["and"] += 1 + elif isinstance(node, BPMN.ExclusiveGateway): + counts["xor"] += 1 + elif isinstance(node, BPMN.InclusiveGateway): + counts["or"] += 1 + return counts + + +def main() -> int: + df = build_log() + print(f"log: {len(df)} events, {df['case:concept:name'].nunique()} cases") + + bpmn = pm4py.discover_bpmn_split_miner( + df, + epsilon=0.2, + eta=0.4, + variant="classic", + minimize_or_joins=False, + ) + + counts = gateway_counts(bpmn) + edges = len(bpmn.get_flows()) + print(f"classic SM 1.x : nodes={dict(counts)} edges={edges}") + assert counts["task"] == 8, counts + assert counts["and"] == 1, counts + assert counts["xor"] == 3, counts + assert counts["or"] == 2, counts + + bpmn2 = pm4py.discover_bpmn_split_miner( + df, + epsilon=0.2, + eta=0.0, + variant="sm2", + ) + counts2 = gateway_counts(bpmn2) + print(f"SM 2.0 : nodes={dict(counts2)} edges={len(bpmn2.get_flows())}") + assert counts2["task"] == 8 + + print("OK — Split Miner integration works through pm4py top-level API.") + return 0 + + +if __name__ == "__main__": + sys.exit(main())