Source code for microbenthos.model.equation

import logging
from collections import namedtuple

import sympy as sp
from fipy import CellVariable, DiffusionTerm, ImplicitSourceTerm, \
    PhysicalField, \
    TransientTerm, Variable
from fipy.tools import numerix as np

from microbenthos import ModelVariable, Process, restore_var, snapshot_var


[docs]class ModelEquation(object): """ Class that handles the creation of partial differential equations for a transient variable of the model """
[docs] def __init__(self, model, varpath, coeff = 1, track_budget = False): """ Initialize the model equation for a given variable Args: model (:class:`~.model.MicroBenthosModel`): model this belongs to varpath (str): Model path of the equation variable (example: "env.oxygen") coeff (int, float): the coefficient for the transient term """ self.logger = logging.getLogger(__name__) self.logger.debug('Initializing ModelEqn for: {!r}'.format(varpath)) getter = getattr(model, 'get_object', None) if not callable(getter): self.logger.error('Supplied model ({}) has no "get_object" method'.format(type(model))) raise ValueError('Invalid model type supplied: {}'.format(type(model))) #: the model instance this equation belongs to self.model = model var = self.model.get_object(varpath) if isinstance(var, ModelVariable): var = var.var if not isinstance(var, CellVariable): raise ValueError('Var {!r} is {}, not CellVariable'.format(varpath, type(var))) #: the path (str) to the equation variable self.varpath = varpath #: The equation variable object (:class:`~microbenthos.entity.Variable`) self.var = var #: the name of :attr:`.var` self.varname = var.name self.logger.debug('Found variable: {!r}'.format(self.var)) self._term_transient = None self._term_diffusion = None #: container (dict) of the formulae of the sources (sympy expressions) self.source_formulae = {} #: container (dict) of the expressions of the sources (fipy variable binOp) self.source_exprs = {} #: container (dict) of the fipy equation terms (explicit & implicit) of the sources self.source_terms = {} #: the coefficients in the equation for the sources self.source_coeffs = {} #: The additive sum of the values in :attr:`source_exprs` self.sources_total = None self.diffusion_def = () """:type : (str, float) The model path to the diffussion coeff and a numeric coefficient to multiply in the equation """ #: the fipy term that is used in the model full_eqn self.obj = None #: flag to indicate if the equation has been finalized self.finalized = False term = TransientTerm(var=self.var, coeff=coeff) self.logger.debug('Created transient term with coeff: {}'.format(coeff)) #: The transient term of the equation: dv/dt self.term_transient = term #: namedtuple definition for tracked fields: #: ('time_step', 'var_expected', 'var_actual', 'sources_change', 'transport_change') self.Tracked = namedtuple('tracked_budget', ('time_step', 'var_expected', 'var_actual', 'sources_change', 'transport_change') ) #: a tuple of tracked values according to :attr:`.Tracked` self.tracked = self.Tracked( PhysicalField(0.0, 's'), 0.0, 0.0, 0.0, 0.0) self._track_budget = None self.track_budget = track_budget
def __repr__(self): return 'TransientEqn({})'.format(self.varname)
[docs] def finalize(self): """ Call this to setup the equation. Once this is called, no more terms can be added. This does:: self.obj = self.term_transient == sum(self.RHS_terms) Raises: RuntimeError: if no :attr:`.term_transient` defined or no RHS terms defined """ if self.finalized: self.logger.warning('Equation already finalized') return if self.term_transient is None: raise RuntimeError('Cannot finalize equation without transient term!') RHS_terms = self.RHS_terms if not RHS_terms: raise RuntimeError('Cannot finalize equation without right-hand side terms') if self.source_exprs: self.sources_total = sum(self.source_exprs.values()) #: the additive sum of all the sources else: self.sources_total = PhysicalField(np.zeros_like(self.var), self.var.unit.name() + '/s') self.obj = self.term_transient == sum(self.RHS_terms) self.update_tracked_budget(PhysicalField(0.0, 's')) self.finalized = True self.logger.info('Final equation: {}'.format(self.obj))
@property def term_transient(self): """ The transient term for the equation Returns: Instance of :class:`fipy.TransientTerm` """ return self._term_transient @term_transient.setter def term_transient(self, term): if self.term_transient is not None: raise RuntimeError('Transient term has already been set!') self._term_transient = term self.logger.info('Transient term set: {}'.format(term))
[docs] def _get_term_obj(self, path): """ Get the model object at the path and return a usable fipy type Args: path (str): dotted path in the model store Returns: Variable | :class:`fipy.terms.binaryTerm._BinaryTerm` """ obj = self.model.get_object(path) if isinstance(obj, ModelVariable): expr = obj.var elif isinstance(obj, Variable): expr = obj elif isinstance(obj, Process): expr = obj return expr
[docs] def _add_diffusion_term(self, coeff): """ Add a linear diffusion term to the equation Args: coeff (int, float, term): Coefficient for diffusion term """ if self.finalized: raise RuntimeError('Equation already finalized, cannot add terms') term = DiffusionTerm(var=self.var, coeff=coeff) self.logger.debug('Created implicit diffusion term with coeff: {!r}'.format(coeff)) self.term_diffusion = term
[docs] def add_diffusion_term_from(self, path, coeff): """ Add diffusion term from the object path Args: path (str): Path to model store coeff (int, float): Multiplier coefficient for object Returns: Object stored on model store Raises: ValueError if object not found at path """ self.logger.debug('Adding diffusion term from {!r}'.format(path)) obj = self._get_term_obj(path) term = obj.as_term() self._add_diffusion_term(coeff=term * coeff) self.diffusion_def = (path, coeff)
@property def term_diffusion(self): """ The diffusion term for the equation Returns: Instance of :class:`fipy.DiffusionTerm` """ return self._term_diffusion @term_diffusion.setter def term_diffusion(self, term): if self.term_diffusion is not None: raise RuntimeError('Diffusion term has already been set!') self._term_diffusion = term self.logger.info('Diffusion term set: {}'.format(term))
[docs] def add_source_term_from(self, path, coeff = 1): """ Add a source term from the model path Args: path (str): Path to model store coeff (int, float): coeff for source term Raises: ValueError if path does not point to an object """ if self.finalized: raise RuntimeError('Equation already finalized, cannot add terms') self.logger.info('{} Adding source term from {!r} (coeff={}'.format( self, path, coeff)) if not isinstance(coeff, (int, float)): raise ValueError('Source coeff should be int or float, not {}'.format(type(coeff))) if path in self.source_exprs: raise RuntimeError('Source term path already exists: {!r}'.format(path)) obj = self.model.get_object(path) """:type: Process""" # expr = obj.evaluate() # self.logger.debug('Created source expr: {!r}'.format(expr)) self.source_coeffs[path] = coeff full_expr = obj.as_term() self.source_exprs[path] = coeff * full_expr self.source_formulae[path] = coeff * obj.expr() var, S0, S1 = obj.as_source_for(self.varname) assert var is self.var, 'Got var: {!r} and self.var: {!r}'.format( var, self.var) if S1 is not 0: # do not use != 0 as it fails for array type vars S1 = ImplicitSourceTerm(coeff=S1, var=self.var) term = S0 + S1 else: term = S0 self.source_terms[path] = term * coeff self.logger.debug('Created source {!r}: {!r}'.format(path, term))
[docs] def as_symbolic(self): """ Return a symbolic version (sympy) of the equation """ var = sp.var(self.varname) t, z = sp.var('t z') Dcoeff = sp.sympify(self.diffusion_def[1]) D = sp.symbols('D{}'.format(self.varname)) transient = sp.Derivative(var, t) diffusive = D * Dcoeff * sp.Derivative(var, z, 2) sources = sum(self.source_formulae.values()) return sp.Eq(transient, diffusive + sources)
[docs] def as_latex_string(self): """ Return a latex string of the equation through sympy """ return sp.latex(self.as_symbolic())
[docs] def as_pretty_string(self): """ Return a pretty (unicode) string of the equation through sympy """ return sp.pretty(self.as_symbolic())
@property def RHS_terms(self): """ The right hand side terms of the equation Returns: A list of terms, with the first one being :attr:`.term_diffusion`, followed by the values in :attr:`.source_terms`. """ terms = [] if self.term_diffusion: terms.append(self.term_diffusion) terms.extend(self.source_terms.values()) return terms
[docs] def snapshot(self, base = False): """ Return a state dictionary of the equation, with the structure * "sources" * "metadata": source paths and coefficients * "data": the net rate of the combined sources * "diffusion" * metadata: dict(:attr:`.diffusion_def`) * "transient" * "metadata": * :attr:`.varpath`: transient term coeff * "tracked_budget" (if :attr:`.track_budget` is `True`) * var_expected: data: integrated density of variable from tracked changes * var_actual: data: integrated density of variable * time_step: data: the time step duration * sources_change: data: integrated rate of combined sources over the time step * transport_change: data: change in variable quantity due to mass transport "metadata" * "variable": :attr:`.varpath` Returns: dict: A dictionary of the equation state Raises: RuntimeError: if the equation is not yet finalized """ self.logger.debug('Snapshot of {!r}'.format(self)) if not self.finalized: raise RuntimeError('{} not finalized. Cannot snapshot!'.format(self)) if self.diffusion_def: diff_def = dict([self.diffusion_def]) else: diff_def = dict() state = dict( diffusion=dict(metadata=diff_def), transient=dict(metadata={self.varpath: self.term_transient.coeff}), metadata=dict( variable=self.varpath, ), sources=dict( metadata=self.source_coeffs, data=snapshot_var(self.sources_total, base=base) ), ) if self.track_budget: tracked_state = {k: dict(data=snapshot_var(v, base=base)) for \ (k, v) in self.tracked._asdict().items() } # tracked_state['var_actual'] = dict(data=snapshot_var(self.var_quantity(), base=base)) state['tracked_budget'] = tracked_state return state
[docs] def restore_from(self, state, tidx): """ If "tracked_budget" is in the `state`, then set the values on the instance """ self.logger.debug('Restoring {} from state: {}'.format(self, tuple(state))) # update the tracked budget info # self.Tracked = namedtuple('tracked_budget', # ('time_step', 'var_expected', 'var_actual', 'sources_change', # 'transport_change') if not 'tracked_budget' in state: return tracked_state = state['tracked_budget'] values = [] for fld in self.tracked._fields: self.logger.debug('Reading tracked field {!r}'.format(fld)) values.append(restore_var(tracked_state[fld], tidx)) self.tracked = self.Tracked(*values) self.logger.debug('Restored {} budget: {}'.format(self, self.tracked))
[docs] def sources_rate(self): """ Estimate the rate of change of the variable quantity caused by source terms. Returns: PhysicalField: The integrated quantity of the sources """ self.logger.debug('Estimating rate from {} sources '.format(len(self.source_terms))) # the total sources contribution if self.sources_total is not None: depths = self.model.domain.depths sources_rate = np.trapz(self.sources_total.numericValue, depths.numericValue) # the trapz function removes all units, so figure out the unit source_total_unit = (self.sources_total[0] * depths[0]).inBaseUnits().unit sources_rate = PhysicalField(sources_rate, source_total_unit) self.logger.debug('Calculated source rate: {}'.format(sources_rate)) else: sources_rate = 0.0 return sources_rate
[docs] def transport_rate(self): """ Estimate the rate of change of the variable quantity caused by transport at the domain boundaries Returns: PhysicalField: The integrated quantity of the transport rate """ self.logger.debug('Estimating transport rate at boundaries') # the total transport at the boundaries # from Fick's law: J = -D dC/dx if self.term_diffusion: depths = self.model.domain.depths D = self.term_diffusion.coeff[0] top = -D[1] * (self.var[0] - self.var[1]) / (depths[1] - depths[0]) bottom = -D[-2] * (self.var[-1] - self.var[-2]) / (depths[-1] - depths[-2]) transport_rate = (top + bottom).inBaseUnits() # self.logger.debug('Calculated transport rate: {}'.format(transport_rate)) else: transport_rate = 0.0 return transport_rate
[docs] def var_quantity(self): """ Calculate the integral quantity of the variable in the domain Returns: PhysicalField: depth integrated amount """ # self.logger.debug('Calculating actual var quantity') q = np.trapz(self.var.numericValue, self.model.domain.depths.numericValue) unit = (self.var[0] * self.model.domain.depths[0]).inBaseUnits().unit return PhysicalField(q, unit)
[docs] def update_tracked_budget(self, dt): """ Update the tracked quantities for the variable, sources and transport Args: dt (PhysicalField): the time step Note: This is not a very accurate way to measure it, because the boundaries conditions keep the value at the domain boundaries constant, and so is not a true measure of the equation change in the time step. However, it should provide an order of magnitude metric for the accuracy of the numerical approximation in solving the equation. """ if not self.track_budget: return self.logger.debug("{}: Updating tracked budget".format(self)) # the change in the domain for this time step is then: # (source - transport) * dt sources_change = dt * self.sources_rate() transport_change = dt * self.transport_rate() self.logger.debug('source change: {} transport_change: {}'.format( sources_change, transport_change )) net_change = sources_change + transport_change var_expected = self.tracked.var_expected + net_change self.tracked = self.tracked._replace( time_step=dt, var_expected=var_expected, var_actual=self.var_quantity(), sources_change=sources_change, transport_change=transport_change ) self.logger.debug('{}: Updated tracked: {}'.format(self, self.tracked))
@property def track_budget(self): """ Flag to indicate if the variable quantity should be tracked. When this is set, the variable quantity in the domain is :attr:`.tracked` through :meth:`.update_tracked_budget`. Returns: bool: Flag state """ return self._track_budget @track_budget.setter def track_budget(self, b): b = bool(b) if b and self.track_budget: self.logger.debug('track_budget already set. Doing nothing.') return elif b and not self.track_budget: self.logger.debug("track_budget being set. Estimating quantities.") self.tracked = self.Tracked( time_step=0.0, var_expected=self.var_quantity(), var_actual=self.var_quantity(), sources_change=self.sources_rate(), transport_change=self.transport_rate() ) self.logger.debug('Started tracking budget: {}'.format(self.tracked)) self._track_budget = b elif not b: self.logger.debug('Resetting track budget') self.tracked = self.Tracked(0.0, 0.0, 0.0, 0.0, 0.0) self._track_budget = b