diff --git a/src/openrxn/compartments/ID.py b/src/openrxn/compartments/ID.py index d0b2693..2ba4f9f 100644 --- a/src/openrxn/compartments/ID.py +++ b/src/openrxn/compartments/ID.py @@ -1,16 +1,16 @@ -def join_tup(tup): +def join_tuple(tup): return '_'.join([str(a) for a in tup]) -def makeID(array_ID,comp_ID): +def make_ID(array_ID,comp_ID): if array_ID is not None: tag = array_ID + '-' else: tag = "" - if type(comp_ID) is tuple or type(comp_ID) is list: - return tag + join_tup(comp_ID) - elif type(comp_ID) is str: + if isinstance(comp_ID, tuple) or isinstance(comp_ID, list): + return tag + join_tuple(comp_ID) + elif isinstance(comp_ID, str): return tag + comp_ID - elif type(comp_ID) is int: + elif isinstance(comp_ID, int): return tag + str(comp_ID) diff --git a/src/openrxn/compartments/compartment.py b/src/openrxn/compartments/compartment.py index 0ba4438..68d38a0 100644 --- a/src/openrxn/compartments/compartment.py +++ b/src/openrxn/compartments/compartment.py @@ -1,7 +1,7 @@ """Compartments hold a set of reactions and govern the transport of material between compartments through connections.""" -from openrxn.compartments.ID import makeID +from openrxn.compartments.ID import make_ID from openrxn import unit import logging @@ -22,20 +22,22 @@ class Compartment(object): Upon initialization, both of these lists are empty. """ - def __init__(self, ID, pos=[], array_ID=None, volume=None): + def __init__(self, ID, pos=None, array_ID=None, volume=None): self.ID = ID + self._rxn_ids = set() self.reactions = [] self.connections = {} - self.pos = pos + self.pos = [] if pos is None else list(pos) self.array_ID = array_ID self.volume = volume def add_rxn_to_compartment(self, rxn): """Adds a reaction to a compartment.""" - if rxn.ID in [r.ID for r in self.reactions]: - logging.warn("Reaction {0} already in compartment {1}".format(rxn.ID,self.ID)) + if rxn.ID in self._rxn_ids: + logging.warning("Reaction {0} already in compartment {1}".format(rxn.ID,self.ID)) else: self.reactions.append(rxn) + self._rxn_ids.add(rxn.ID) def add_rxns_to_compartment(self, rxns): """Adds a list of reactions to a compartment.""" @@ -54,20 +56,20 @@ def connect(self, other_compartment, conn_type, warn_overwrite=True): """Make a connection from this compartment to another one using the conn_type connection type.""" - conn_tag = makeID(other_compartment.array_ID,other_compartment.ID) + conn_tag = make_ID(other_compartment.array_ID,other_compartment.ID) if conn_tag in self.connections and warn_overwrite: - self_tag = makeID(self.array_ID,self.ID) - logging.warn("Warning: overwriting connection between {0} and {1}".format(self_tag,conn_tag)) + self_tag = make_ID(self.array_ID,self.ID) + logging.warning("Overwriting connection between {0} and {1}".format(self_tag,conn_tag)) self.connections[conn_tag] = (other_compartment, conn_type) def remove_connection(self, other_compartment): """Remove the connection with the other_compartment""" - conn_tag = makeID(other_compartment.array_ID,other_compartment.ID) + conn_tag = make_ID(other_compartment.array_ID,other_compartment.ID) if conn_tag not in self.connections: - self_tag = makeID(self.array_ID,self.ID) - logging.warn("Warning: connection to remove between {0} and {1} does not exist".format(self_tag,conn_tag)) + self_tag = make_ID(self.array_ID,self.ID) + logging.warning("Connection to remove between {0} and {1} does not exist".format(self_tag,conn_tag)) val = self.connections.pop(conn_tag) @@ -89,11 +91,22 @@ def copy(self,ID=None,delete_array_ID=False): new_comp = type(self)(newID, pos=self.pos, array_ID=new_aID) new_comp.volume = self.volume - new_comp.connections = self.connections - new_comp.reactions = self.reactions + new_comp.connections = dict(self.connections) + new_comp.reactions = list(self.reactions) + new_comp._rxn_ids = {r.ID for r in new_comp.reactions} return new_comp + def __repr__(self): + vol_str = str(self.volume) if self.volume is not None else "None" + pos_str = tuple(self.pos) if self.pos else None + + return ( + f"Compartment(ID={self.ID!r}, " + f"volume={vol_str}, pos={pos_str}, " + f"n_rxns={len(self.reactions)}, n_connections={len(self.connections)})" + ) + class Compartment1D(Compartment): def __init__(self, *args, **kwargs): diff --git a/src/openrxn/connections.py b/src/openrxn/connections.py index f5ba299..1a39ee3 100644 --- a/src/openrxn/connections.py +++ b/src/openrxn/connections.py @@ -1,4 +1,5 @@ -"""Connections govern the transport between compartments. +""" +Connections govern the transport between compartments. In some cases the transport can be described by a first-order rate equation, e.g.: @@ -15,6 +16,7 @@ """ from . import unit +import logging class Connection(object): """Basis class for connections. The argument species_rates @@ -24,72 +26,126 @@ class Connection(object): class AnisotropicConnection(Connection): def __init__(self, species_rates,dim=3): - """AnisotropicConnections are initialized with a dictionary - of species_rates, where the keys are Species IDs and the - values are tuples of transport rates (k_out,k_in). + """ + Directional (anisotropic) transport connection between two compartments. + + Parameters + ---------- + species_rates : dict + Dictionary of Species, where the keys are Species IDs and the + values are tuples of transport rates (k_out,k_in). + k_out corresponds to transport from compartment 1 to 2. + k_in corresponds to transport from compartment 2 to 1. + + - A tuple (k_out, k_in) specifies directional transport rates. + - A scalar quantity k is interpreted as symmetric transport + and internally turn into tuples of transport rates (k, k). + - All rates must be specified in units of 1/s. - Care should be taken to make sure these are applied in the - right direction! + dim : int, optional + Spatial dimension. - Rates should be specified in units of 1/s. """ self.species_rates = species_rates + self.dim = dim + + for s,r in self.species_rates.items(): + if not isinstance(r,tuple): + logging.warning(f"Species {s}: one scalar rate provided. Assigning k_out == k_in.") + r = (r,r) + self.species_rates[s] = r + + elif len(r) != 2: + raise ValueError(f"Species {s}: rate tuple must be length 2, got length {len(r)}.") - for s in self.species_rates: - if type(self.species_rates[s]) is not tuple or len(self.species_rates[s]) != 2: - raise ValueError("Error! Elements of species_rates dictionary should be tuples of length 2") self.species_rates[s][0].ito(1/unit.sec) self.species_rates[s][1].ito(1/unit.sec) + @staticmethod def _flip_tuple(t): return (t[1],t[0]) def reverse(self): rev_species_rates = {} - for s in self.species_rates: - rev_species_rates[s] = self._flip_tuple(self.species_rates[s]) + for s,r in self.species_rates.items(): + rev_species_rates[s] = self._flip_tuple(r) return AnisotropicConnection(rev_species_rates) - + + def __repr__(self): + rates = {s: (k[0].magnitude, k[1].magnitude) for s, k in self.species_rates.items()} + unit_str = str(next(iter(self.species_rates.values()))[0].units) + return (f"AnisotropicConnection(dim={self.dim}," + f"n_species={len(self.species_rates)}," + f"transport_rates={rates}{unit_str})" + ) + class IsotropicConnection(Connection): def __init__(self, species_rates,dim=3): - """IsotropicConnections are initialized with a dictionary - of species_rates, where the keys are Species IDs and the - values are transport rates. + """ + Symmetric (isotropic) transport connection between two compartments. + + Parameters + ---------- + species_rates : dict + Dictionary of Species, where the keys are Species IDs and the + value is a transport rate constant (k). + + - A scalar quantity k is interpreted as symmetric transport + and internally turn into tuples of transport rates (k, k). + - All rates must be convertible to units of 1/s. + + dim : int, optional + Spatial dimension. - Rates should be specified in units of 1/s. """ self.species_rates = species_rates self.dim = dim - for s in self.species_rates: - k = self.species_rates[s] - if type(k) is not tuple: + for s,k in self.species_rates.items(): + if not isinstance(k,tuple): self.species_rates[s] = (k,k) + self.species_rates[s][0].ito(1/unit.sec) self.species_rates[s][1].ito(1/unit.sec) + def __repr__(self): + rates = {s: (k[0].magnitude, k[1].magnitude) for s, k in self.species_rates.items()} + unit_str = str(next(iter(self.species_rates.values()))[0].units) + return (f"IsotropicConnection(dim={self.dim}," + f"n_species={len(self.species_rates)}," + f"transport_rates={rates}{unit_str})" + ) + class DivByVConnection(Connection): def __init__(self, species_rates,dim=3): - """DivByVConnections are initialized with a dictionary - of species_rates, where the keys are Species IDs and the - values are transport rates. + """ + Volume-based transport connection between two compartments. + + This connection stores transport coefficients proportional to compartment volume. + Rates should be specified in units of L^d/s, where L is length and d is the + compartment volume. - Rates should be specified in units of L^d/s, where L is length - and d is the compartment volume. + Parameters + ---------- + species_rates : dict + Dictionary of Species, where the keys are Species IDs and the + value is a transport rate constant (k). + + dim : int, optional + Spatial dimension. - These connections are divided by the compartment volume - when constructing a system. """ + self.species_rates = species_rates self.dim = dim - for s in self.species_rates: - k = self.species_rates[s] - if type(k) is not tuple: - self.species_rates[s] = (k,k) + for s,k in self.species_rates.items(): + if not isinstance(k, tuple): + k = (k,k) + self.species_rates[s] = k self.species_rates[s][0].ito(unit.nm**self.dim/unit.sec) self.species_rates[s][1].ito(unit.nm**self.dim/unit.sec) @@ -127,6 +183,19 @@ def __init__(self, species_d_constants, surface_area=None, ic_distance=None, dim If either surface_area or ic_distance is left undefined, they will be automatically calculated using compartment positions. + + Parameters + ---------- + species_d_constants : dict + Dictionary of Species, where the keys are Species IDs and the + value is a diffusion constant (D). (units convertible to L^2 / s). + surface_area : Quantity, optional + Interface area A between compartments (units convertible to L^(dim-1)). + ic_distance : Quantity, optional + Center-to-center distance DeltaX (units convertible to L). + dim : int, optional + Spatial dimension (default 3). + """ self.species_d_constants = species_d_constants @@ -139,15 +208,13 @@ def resolve(self): require any information about the Species, or the arrays""" if self.surface_area is None or self.ic_distance is None: - raise ValueError("Error! This connection is not ready to be resolved.") - species_list = self.species_d_constants.keys() + raise ValueError("Error! This connection is not ready to be resolved.") + rates = {} for s,d in self.species_d_constants.items(): - - # how are you going to get this volume? - - rates[s] = d*self.surface_area/self.ic_distance - rates[s].ito(unit.nm**self.dim/unit.sec) + kV = d*self.surface_area/self.ic_distance + kV.ito(unit.nm**self.dim/unit.sec) + rates[s] = kV return DivByVConnection(rates,self.dim) @@ -158,8 +225,20 @@ def __init__(self, species_d_constants, surface_area=None, ic_distance=None, dim ResConnections are special connections from a compartment to a reservoir. They are resolved similarly to a FicksConnection. - face is a string input, equal to either 'x', 'y' or 'z', - denoting along which axis the ResConnection takes place. + Parameters + ---------- + species_d_constants : dict + Dictionary of Species, where the keys are Species IDs and the + value is a diffusion constant (D). (units convertible to L^2/s). + surface_area : Quantity, optional + Reservoir interface area A (units convertible to L^(dim-1)). + ic_distance : Quantity, optional + Distance DeltaX from compartment center to reservoir boundary (units convertible to L). + dim : int, optional + Spatial dimension. + face : str({'x','y','z'}) or None, optional + Axis normal to the reservoir interface. + """ self.species_d_constants = species_d_constants @@ -173,11 +252,12 @@ def resolve(self): require any information about the Species, or the arrays""" if self.surface_area is None or self.ic_distance is None: - raise ValueError("Error! This connection is not ready to be resolved.") - species_list = self.species_d_constants.keys() + raise ValueError("Error! This connection is not ready to be resolved.") + rates = {} for s,d in self.species_d_constants.items(): - rates[s] = d*self.surface_area/self.ic_distance - rates[s].ito(unit.nm**self.dim/unit.sec) + kV = d*self.surface_area/self.ic_distance + kV.ito(unit.nm**self.dim/unit.sec) + rates[s] = kV return DivByVConnection(rates,self.dim) diff --git a/src/openrxn/model.py b/src/openrxn/model.py index 3ef9c94..c2d8103 100644 --- a/src/openrxn/model.py +++ b/src/openrxn/model.py @@ -6,7 +6,7 @@ from openrxn.compartments.compartment import Compartment from openrxn.reactions import Reaction -from openrxn.compartments.ID import makeID +from openrxn.compartments.ID import make_ID from openrxn.connections import FicksConnection, ResConnection import numpy as np @@ -205,7 +205,7 @@ def n_compartments(self): return len(self.compartments) def add_compartment(self,compartment): - newID = makeID(compartment.array_ID,compartment.ID) + newID = make_ID(compartment.array_ID,compartment.ID) if newID in self.compartments.keys(): raise ValueError("Error! Duplicate compartment ID in model ({0})".format(newID)) self.compartments[newID] = compartment.copy(ID=newID,delete_array_ID=True) @@ -251,14 +251,14 @@ def to_graph(self,scale=10): # add all the nodes for c_name, c in self.compartments.items(): graph.add_node(c_name) - graph.node[c_name]['viz'] = {} + graph.nodes[c_name]['viz'] = {} x = scale*0.5*(c.pos[0][0] + c.pos[0][1]).magnitude y = scale*0.5*(c.pos[1][0] + c.pos[1][1]).magnitude z = scale*0.5*(c.pos[2][0] + c.pos[2][1]).magnitude vis_x,vis_y = self._project_xy((x,y,z)) - graph.node[c_name]['viz']['position'] = {'x': float(vis_x), 'y': float(vis_y)} + graph.nodes[c_name]['viz']['position'] = {'x': float(vis_x), 'y': float(vis_y)} # build an edges list edges = [] diff --git a/src/openrxn/reactions.py b/src/openrxn/reactions.py index c61e781..c8662ae 100644 --- a/src/openrxn/reactions.py +++ b/src/openrxn/reactions.py @@ -18,56 +18,119 @@ r1 = Reaction('combustion',[m,o2],[co2,w],[1,2],[1,2],kf=kf,kr=kr) """ +from openrxn import unit import logging class Species(object): - """Species can be either reactants or products and can - interconvert through Reactions. Each species object must - be given a unique name.""" + """ + Species object models an individual chemical entity (reactants or product) that can participate and interconvert + through Reactions. Each species object must be given a unique name ('ID'). + + Parameters + ---------- + ID : str + Unique identifier for the species. + **kwargs : dict, optional + Arbitrary key-value pairs defining additional species properties. + """ def __init__(self, ID, **kwargs): - self.ID = ID + for key, value in kwargs.items(): - self.key = value + setattr(self, key, value) + + def __repr__(self): + return f"Species({self.ID})" class Reaction(object): - """A reaction describes how reactants and products are - related""" + """ + A reaction describes how reactants and products are related. + + Parameters + ---------- + ID: str + Unique identifier for the species. + reactants: set of Species + Reactant Species objects participate in the reaction + products: set of Species + Products Species objects participate in the reaction + stoich_r: int + Stoichiometric coefficients for reactants. Must be positive integers + stoich_p: int + Stoichiometric coefficients for products. Must be positive integers + kf: float, optional + Forward rate constant. + kr: float, optional + Reverse rate constant. + """ def __init__(self, ID, reactants, products, stoich_r, stoich_p, kf=0, kr=0): self.ID = ID - for r in reactants: - assert isinstance(r,Species), "Error! Reactants must be Species objects" - - self.reactants = reactants - assert len(reactants) == len(stoich_r), "Error! Stoichiometry list for reactants must have same length as reactants list" - self.stoich_r = stoich_r + if len(reactants) != len(stoich_r): + raise ValueError("Stoichiometry list for reactants must have same length as reactants list") + if len(products) != len(stoich_p): + raise ValueError("Stoichiometry list for products must have same length as products list") - for p in products: - assert isinstance(p,Species), "Error! Products must be Species objects" - - self.products = products - assert len(products) == len(stoich_p), "Error! Stoichiometry list for products must have same length as products list" - self.stoich_p = stoich_p + if not all(isinstance(r, Species) for r in reactants): + raise TypeError("Reactants must be Species objects") + if not all(isinstance(p, Species) for p in products): + raise TypeError("Products must be Species objects") if kf < 0 or kr < 0: raise ValueError("Error! Reaction rate cannot be negative") - + if kf == 0 and kr == 0: - logging.warn("Warning: Both forward and reverse rates are set to zero") + logging.warning("Both forward and reverse rates are set to zero") + self.reactants = reactants + self.stoich_r = stoich_r + self.products = products + self.stoich_p = stoich_p self.kf = kf - self.kr = kr + self.kr = kr self.reactant_IDs = [s.ID for s in self.reactants] self.product_IDs = [s.ID for s in self.products] - # todo: assure that the units on the rates are correct + def _expected_unit_from_order(self, stoich): + + order = float(sum(stoich)) + power = 1.0 - order + + molar_factor = 1 if abs(power) < 1e-12 else (unit.molar ** power) + return molar_factor / unit.second + + def _validate_rate(self, rate, expected_unit): + """ + Assure that the units on the rates are correct. + """ + + if isinstance(rate, (int,float)): + q = rate * expected_unit + logging.warning(f"Reaction rate provided without units. Assigning {expected_unit}. ") + else: + q = unit.Quantity(rate) + + if q.magnitude < 0: + raise ValueError(f"Reaction rate cannot be negative!") + + if q.dimensionality != expected_unit.dimensionality: + raise ValueError(f"Reaction rate has wrong units: {q.units}. " + f"Expected unit: {expected_unit}") + + return q.to(expected_unit) + + def __repr__(self): + rxn_list = self.display() + return f"Reaction({rxn_list})" def display(self): - """Returns a print string summarizing the reaction.""" + """ + Returns a string summarizing the reaction. + """ + to_print = "" rate_str = "" @@ -97,6 +160,6 @@ def display(self): else: to_print += "{0} ".format(p.ID) - return(to_print + rate_str) + return (to_print + rate_str) + - diff --git a/src/openrxn/systems/ODESystem.py b/src/openrxn/systems/ODESystem.py index d266da4..92af733 100644 --- a/src/openrxn/systems/ODESystem.py +++ b/src/openrxn/systems/ODESystem.py @@ -20,8 +20,8 @@ class ODESystem(System): def __init__(self, *args, **kwargs): super().__init__(*args,**kwargs) - self.NA = 6.022e23 + self.NA = 6.022e23 self.dqdt = self._build_dqdt() def set_q(self,idxs,Q): @@ -39,6 +39,7 @@ def set_q(self,idxs,Q): mult_volume = False mult_na = False + if hasattr(Q,'units'): if Q.units == unit.mol/unit.L: mult_volume = True @@ -98,12 +99,16 @@ def _build_dqdt(self): state : openrxn.systems.state.State object model : openrxn.model.FlatModel object """ + state = self.state + dqdt = [] - for i in range(self.state.size): + for i in range(state.size): # collect source and sink terms for this species in this compartment c = self.model.compartments[self.state.compartment[i]] - s = self.state.species[i] + s = state.species[i] + vol = (self.NA*c.volume/unit.mol) if c.volume is not None else None + state_idx = state.index[c.ID] sources = [] sources_reservoir = [] @@ -111,6 +116,7 @@ def _build_dqdt(self): # look through the reactions in this compartment for ones that # involve this species + for r in c.reactions: if s in r.reactant_IDs: s_idx = r.reactant_IDs.index(s) @@ -119,10 +125,10 @@ def _build_dqdt(self): q_list = [] n_r = 0 for j,x in enumerate(r.reactants): - q_list += [self.state.index[c.ID][x.ID]]*r.stoich_r[j] + q_list.extend([state_idx[x.ID]]*r.stoich_r[j]) n_r += r.stoich_r[j] - if n_r - 1 > 0 and c.volume is not None: - vol_fac = (self.NA*c.volume/unit.mol)**(n_r-1) + if n_r - 1 > 0 and vol is not None: + vol_fac = vol**(n_r-1) rate = r.kf/vol_fac else: rate = r.kf @@ -132,15 +138,15 @@ def _build_dqdt(self): q_list = [] n_p = 0 for j,x in enumerate(r.products): - q_list += [self.state.index[c.ID][x.ID]]*r.stoich_p[j] + q_list.extend([state_idx[x.ID]]*r.stoich_p[j]) n_p += r.stoich_p[j] - if n_p - 1 > 0 and c.volume is not None: - vol_fac = (self.NA*c.volume/unit.mol)**(n_p-1) + if n_p - 1 > 0 and vol is not None: + vol_fac = vol**(n_p-1) rate = r.kr/vol_fac else: rate = r.kr sources.append((rate, q_list, r.stoich_r[s_idx])) - + if s in r.product_IDs: s_idx = r.product_IDs.index(s) if r.kf > 0: @@ -148,10 +154,10 @@ def _build_dqdt(self): q_list = [] n_r = 0 for j,x in enumerate(r.reactants): - q_list += [self.state.index[c.ID][x.ID]]*r.stoich_r[j] + q_list.extend([state_idx[x.ID]]*r.stoich_r[j]) n_r += r.stoich_r[j] - if n_r - 1 > 0 and c.volume is not None: - vol_fac = (self.NA*c.volume/unit.mol)**(n_r-1) + if n_r - 1 > 0 and vol is not None: + vol_fac = vol**(n_r-1) rate = r.kf/vol_fac else: rate = r.kf @@ -162,10 +168,10 @@ def _build_dqdt(self): q_list = [] n_p = 0 for j,x in enumerate(r.products): - q_list += [self.state.index[c.ID][x.ID]]*r.stoich_p[j] + q_list.extend([state_idx[x.ID]]*r.stoich_p[j]) n_p += r.stoich_p[j] - if n_p - 1 > 0 and c.volume is not None: - vol_fac = (self.NA*c.volume/unit.mol)**(n_p-1) + if n_p - 1 > 0 and vol is not None: + vol_fac = vol**(n_p-1) rate = r.kr/vol_fac else: rate = r.kr @@ -182,18 +188,18 @@ def _build_dqdt(self): sinks.append((conn[1].species_rates[s][0], [i], 1)) # add "in" diffusion process - if isinstance(self.model.compartments[other_lab],Reservoir): + if isinstance(comp[other_lab],Reservoir): sources_reservoir.append((conn[1].species_rates[s][1], - self.model.compartments[other_lab].conc_funcs[s], + comp[other_lab].conc_funcs[s], 1)) else: if isinstance(conn,DivByVConnection): sources.append((conn[1].species_rates[s][1]/conn[0].volume, - [self.state.index[other_lab][s]], + [state.index[other_lab][s]], 1)) else: sources.append((conn[1].species_rates[s][1], - [self.state.index[other_lab][s]], + [state.index[other_lab][s]], 1)) dqdt.append(DerivFuncBuilder(sources, sinks, sources_reservoir))