Source code for microbenthos.model.simulation

"""
Module to handle the simulation of microbenthos models
"""
from __future__ import division

import importlib
import logging
import time
from collections import deque

from fipy import PhysicalField, Variable

from .model import MicroBenthosModel
from ..utils import CreateMixin, snapshot_var


[docs]class Simulation(CreateMixin): """ This class enables the process of repeatedly solving the model's equations for a (small) time step to a certain numerical accuracy, and then incrementing the model clock. During the evolution of the simulation, the state of the model as well as the simulation is yielded repeatedly. Numerically approximating the solution to a set of partial differential equations requires that the solver system has a reasonable target accuracy ("residual") and enough attempts ("sweeps") to reach both a stable and accurate approximation for a time step. This class attempts to abstract out these optimizations for the user, by performing adaptive time-stepping. The user needs to specify a worst-case residual ( :attr:`.max_residual`), maximum number of sweeps per time-step ( :attr:`.max_sweeps`) and the range of time-step values to explore during evolution (:attr:`.simtime_lims`). During the evolution of the simulation, the time-step is penalized if the max residual is overshot or max sweeps reached. If not, the reward is a bump up in the time-step duration, allowing for faster evolution of the simulation. See Also: The scheme of simulation :meth:`.evolution`. The adaptive scheme to :meth:`.update_time_step`. """ schema_key = 'simulation' FIPY_SOLVERS = ('scipy', 'pyAMG', 'trilinos', 'pysparse')
[docs] def __init__(self, simtime_total = 6, simtime_days = None, simtime_lims=(0.1, 120), snapshot_interval = 60, fipy_solver = 'scipy', max_sweeps=100, max_residual=1e-12, ): """ Args: simtime_total (float, PhysicalField): The number of hours for the simulation to run simtime_days (float): The number of days (in terms of the model's irradiance cycle) the simulation should run for. Note that specifying this will override the given :attr:`.simtime_total` when the :attr:`.model` is supplied. simtime_lims (float, PhysicalField): The minimum and maximum limits for the :attr:`simtime_step` for adaptive time-stepping. This should be supplied as a pair of values, which are assumed to be in seconds and cast into PhysicalField internally. (default: 0.01, 240) max_sweeps (int): Maximum number of sweeps to attempt per timestep (default: 50) max_residual (float): Maximum residual value for the solver at a timestep (default: 1e-14) snapshot_interval (int, float, :class:`PhysicalField`): the duration in seconds of the model clock between yielding snapshots of the model state for exporters (default: 60) fipy_solver (str): Name of the fipy solver to use. One of ``('scipy', 'pyAMG', 'trilinos','pysparse')`` (default: "scipy") """ super(Simulation, self).__init__() # the __init__ call is deliberately empty. will implement # cooeperative inheritance only # when necessary self.logger = logging.getLogger(__name__) self._started = False self._solver = None self._fipy_solver = None self.fipy_solver = fipy_solver self._simtime_lims = None self._simtime_total = None self._simtime_step = None #: Numer of days to simulate in terms of the model's irradiance source self.simtime_days = None if simtime_days: simtime_days = float(simtime_days) if simtime_days <= 0: raise ValueError('simtime_days should be >0, not {:.2f}'.format( simtime_days)) self.simtime_days = simtime_days self.simtime_lims = simtime_lims self.simtime_total = simtime_total self.simtime_step = self.simtime_lims[0] self.snapshot_interval = PhysicalField(snapshot_interval, 's') self.max_residual = float(max_residual) if not (0 < self.max_residual < 1e-3): raise ValueError( 'Max residual should be a small positive number, ' 'not {:.3g}'.format( self.max_residual)) self._residualQ = deque([], maxlen=10) self._max_sweeps = None self.max_sweeps = max_sweeps self._sweepsQ = deque([], maxlen=5) self._model = None
@property def started(self): """ Returns: bool: Flag for if the sim evolution has started """ return self._started @property def fipy_solver(self): return self._fipy_solver @fipy_solver.setter def fipy_solver(self, val): if val not in self.FIPY_SOLVERS: raise ValueError( 'Solver {!r} not in {}'.format(val, self.FIPY_SOLVERS)) if self.started: raise RuntimeError('Fipy solver cannot be changed after started') self._fipy_solver = val @property def simtime_total(self): """ The number of hours of the model clock the simulation should be evolved for. The supplied value must be larger than the time-steps allowed. Also, it may be over-ridden by supplying :attr:`.simtime_days`. Returns: PhysicalField: duration in hours """ return self._simtime_total @simtime_total.setter def simtime_total(self, val): try: val = PhysicalField(val, 'h') except TypeError: raise ValueError( 'simtime_total {!r} not compatible with time units'.format(val)) if val <= 0: raise ValueError('simtime_total should be > 0') if self.simtime_step is not None: if val <= self.simtime_lims[0]: raise ValueError( 'simtime_total {} should be > step {}'.format(val, self.simtime_lims[ 0])) self._simtime_total = val @property def simtime_step(self): """ The current time-step duration. While setting, the supplied value will be clipped to within :attr:`simtime_lims`. Returns: PhysicalField: in seconds """ return self._simtime_step @simtime_step.setter def simtime_step(self, val): try: val = PhysicalField(val, 's') except TypeError: raise ValueError( 'simtime_step {!r} not compatible with time units'.format(val)) dtMin, dtMax = self.simtime_lims # val = min(max(val, dtMin), dtMax) val = min(val, dtMax) assert hasattr(val, 'unit') if self.simtime_total is not None: if self.simtime_total <= val: raise ValueError( 'simtime_total {} should be > step {}'.format( self.simtime_total, val)) self._simtime_step = val @property def simtime_lims(self): """ The limits for the time-step duration allowed during evolution. This parameter determines the degree to which the simulation evolution can be speeded up. In phases of the model evolution where the numerical solution is reached within a few sweeps, the clock would run at the max limit, whereas when a large number of sweeps are required, it would be penalized towards the min limit. A high max value enables faster evolution, but can also lead to numerical inaccuracy ( higher residual) or solution breakdown (numerical error) during :meth:`.run_timestep`. A small enough min value allows recovery, but turning back the clock to the previous time step and restarting with the min timestep and allowing subsequent relaxation. Args: vals (float, PhysicalField): the (min, max) durations in seconds Returns: lims (tuple): The (min, max) limits of :attr:`simtime_step` each as a :class:`.PhysicalField` """ return self._simtime_lims @simtime_lims.setter def simtime_lims(self, vals): if vals is None: lmin = PhysicalField(0.1, 's') # lmax = (self.simtime_total / 25.0).inUnitsOf('s').floor() lmax = PhysicalField(120, 's') else: lmin, lmax = [PhysicalField(float(_), 's') for _ in vals] if not (0 < lmin < lmax): raise ValueError( 'simtime_lims ({}, {}) are not positive and in order'.format( lmin, lmax)) self._simtime_lims = (lmin, lmax) self.logger.debug('simtime_lims set: {}'.format(self._simtime_lims)) @property def max_sweeps(self): """ The maximum number of sweeps allowed for a timestep Args: val (int): should be > 0 Returns: int """ return self._max_sweeps @max_sweeps.setter def max_sweeps(self, val): try: val = int(val) assert val > 0 self._max_sweeps = val except: raise ValueError('max_sweeps {} should be > 0'.format(val)) @property def model(self) -> MicroBenthosModel: """ The model to run the simulation on. This is typically an instance of :class:`~microbenthos.MicroBenthosModel` or its subclasses. The interface it must provide is: * a method :meth:`create_full_equation()` * an attribute :attr:`full_eqn` created by above method, which is a :class:`~fipy.terms.binaryTerm._BinaryTerm` that has a :meth:`sweep()` method. * method :meth:`model.update_vars()` which is called before each timestep * method :meth:`model.clock.increment_time(dt)` which is called after each timestep Additionally, if :attr:`.simtime_days` is set, then setting the model will try to find the ``"env.irradiance:`` object and use its :attr:`.hours_total` attribute to set the :attr:`.simtime_total`. Args: model (:class:`~microbenthos.MicroBenthosModel`): model instance Returns: :class:`~microbenthos.MicroBenthosModel` Raises: RuntimeError: if model has already been set ValueError: if modes interface does not match ValueError: if model :attr:`.model.full_eqn` does not get created even after :meth:`.model.create_full_equation` is called. """ return self._model @model.setter def model(self, m): if self.model: raise RuntimeError('Model already set') full_eqn = getattr(m, 'full_eqn', None) if full_eqn is None: if hasattr(m, 'create_full_equation'): m.create_full_equation() full_eqn = getattr(m, 'full_eqn', None) if full_eqn is None: raise ValueError( 'Model {!r} (type={}) does not have a valid equation'.format( m, type(m))) def recursive_hasattr(obj, path, is_callable = False): parts = path.split('.') S = obj FOUND = False for p in parts: if hasattr(S, p): S = getattr(S, p) FOUND = True else: FOUND = False break if not FOUND: return False else: if is_callable: return callable(S) else: return True expected_attrs = ['clock', 'full_eqn'] expected_callables = ['full_eqn.sweep', 'update_vars', 'clock.increment_time'] failed_attrs = tuple( filter(lambda x: not recursive_hasattr(m, x), expected_attrs)) failed_callables = tuple( filter(lambda x: not recursive_hasattr(m, x, is_callable=True), expected_callables)) if failed_attrs: self.logger.error( 'Model is missing required attributes: {}'.format(failed_attrs)) if failed_callables: self.logger.error('Model is missing required callables: {}'.format( failed_callables)) if failed_callables or failed_attrs: raise ValueError( 'Model interface is missing: {}'.format( set(failed_attrs + failed_callables))) self._model = m # if simtime_days is given, override the simtime_total with it if self.simtime_days is not None: I = self.model.get_object('env.irradiance') simtime_total = self.simtime_days * I.hours_total self.logger.info( 'Setting simtime_total={} for {} days of simtime'.format( simtime_total, self.simtime_days )) self.simtime_total = simtime_total
[docs] def _create_solver(self): """ Create the fipy solver to be used """ solver_module = importlib.import_module( 'fipy.solvers.{}'.format(self.fipy_solver)) Solver = getattr(solver_module, 'DefaultSolver') self._solver = Solver() self.logger.debug( 'Created fipy {} solver: {}'.format(self.fipy_solver, self._solver))
[docs] def run_timestep(self): """ Evolve the model through a single timestep """ if not self.started: raise RuntimeError( 'Simulation timestep cannot be run since started=False') if self.model is None: raise RuntimeError('Simulation model is None, cannot run timestep') dt = self.simtime_step self.logger.info( 'Running timestep {} + {}'.format(self.model.clock, dt)) num_sweeps = 0 EQN = self.model.full_eqn retry = True res = 100.0 while (res > self.max_residual) and (num_sweeps < self.max_sweeps) \ and retry: try: res = EQN.sweep( solver=self._solver, dt=float(dt.numericValue) ) num_sweeps += 1 res = float(res) self.logger.debug( 'Sweeps: {} residual: {:.2g}'.format(num_sweeps, res)) except (TypeError, RuntimeError): self.logger.warning(f'Error with simulation timestep dt={dt}') res = self.max_residual*100 break return res, num_sweeps
[docs] def evolution(self): """ Evolves the model clock through the time steps for the simulation, i.e. by calling :meth:`.run_timestep` and :meth:`.model.clock.increment_time` repeatedly while ``model.clock() <= self.simtime_total``. This is a generator that yields the step number, and the state of the evolution after each time step. If :meth:`snapshot_due` is true, then also the model snapshot is included in the state. Yields: `(step, state)` tuple of step number and simulation state Raises: RuntimeError: if :attr:`.started` is already True """ if self.started: raise RuntimeError( 'Simulation already started. Cannot run parallel evolutions!') self.logger.debug( 'simtime_total={o.simtime_total} simtime_step={o.simtime_step}, ' 'max_sweeps={o.max_sweeps} max_residual={o.max_residual}'.format(o=self)) self.logger.debug('Solving: {}'.format(self.model.full_eqn)) self._create_solver() self._started = True self.logger.info('Simulation evolution starting') self.model.update_vars() self._prev_snapshot = Variable(self.model.clock.copy(), name='prev_snapshot') step = 0 self.simtime_step = self.simtime_lims[0] while self.model.clock() <= self.simtime_total: self.logger.debug( 'Running step #{} {}'.format(step, self.model.clock)) dt = self.simtime_step tic = time.time() residual, num_sweeps = self.run_timestep() toc = time.time() self.logger.debug( f'For dt={dt} residual={residual:.4g} with sweeps={num_sweeps}') # if residual == 0: # raise RuntimeError(f'Residual perfect 0 for dt={dt}. Problem in domain!') self._sweepsQ.appendleft(num_sweeps) self._residualQ.appendleft(residual) if residual >= self.max_residual: self.logger.info( f'Ignoring dt={dt}: res={residual:.4g} > {self.max_residual:.4g}') self.update_simtime_step(residual, num_sweeps) self.model.revert_vars() # just go back to while loop start, now that dt has been made smaller continue else: self.model.update_vars() self.model.update_equations(dt) step += 1 self.update_simtime_step(residual, num_sweeps) calc_time = 1000 * (toc - tic) self.logger.debug('Time step {} done in {:.2f} msec'.format( self.simtime_step, calc_time)) if self.snapshot_due(): self.logger.debug('Snapshot in step #{}'.format(step)) state = self.get_state( state=self.model.snapshot(), calc_time=calc_time, residual=residual, num_sweeps=num_sweeps ) yield (step, state) # now set the prev_snapshot so that snapshot_due() will # remain true for processing self._prev_snapshot.setValue(self.model.clock.copy()) self.logger.debug( 'Prev snapshot set: {}'.format(self._prev_snapshot)) else: # create a minimal state # this is the model clock and current residual state = self.get_state( calc_time=calc_time, residual=residual, num_sweeps=num_sweeps ) yield (step, state) self.model.clock.increment_time(dt) self.logger.info('Simulation evolution completed') self._started = False
[docs] def get_state(self, state = None, metrics = None, **kwargs): """ Get the state of the simulation evolution Args: state (None, dict): If state is given (from ``model.snapshot()``), then that is used. If None, then just the time info is created by using :attr:`.model.clock`. metrics (None, dict): a dict to get the simulation metrics from, else from `kwargs` **kwargs: parameters to build metrics dict. Currently the keys `"calc_time"`, `"residual"` and `"num_sweeps"` are used, if available. Returns: dict: the simulation state """ if state is None: state = dict( time=dict(data=snapshot_var(self.model.clock)) ) if metrics is None: metrics = dict( calc_time=dict( data=(kwargs.get('calc_time', 0.0), dict(unit='ms')) ), residual=dict( data=(kwargs.get('residual', 0.0), None)), num_sweeps=dict( data=(kwargs.get('num_sweeps', 0), None)), ) state['metrics'] = metrics return state
[docs] def snapshot_due(self): """ Returns: bool: If the current model clock time has exceeded :attr:`.snapshot_interval` since the last snapshot time """ return self.model.clock() - self._prev_snapshot() >= \ self.snapshot_interval
[docs] def update_simtime_step(self, residual, num_sweeps): """ Update the :attr:`.simtime_step` to be adaptive to the current residual and sweeps. A multiplicative factor for the time-step is determined based on the number of sweeps and residual. If the `residual` is more than :attr:`.max_residual`, then the time-step is quartered. If not, it is boosted by up to double, depending on the `num_sweeps` and :attr:`.max_sweeps`. Once a new timestep is determined, it is limited to the time left in the model simulation. Args: residual (float): the residual from the last equation step num_sweeps (int): the number of sweeps from the last equation step """ self.logger.info( 'Updating step {} after {}/{} sweeps and {:.3g}/{:.3g} ' 'residual'.format( self.simtime_step, num_sweeps, self.max_sweeps, residual, self.max_residual )) old_step = self.simtime_step mult = 1.0 if residual >= self.max_residual: mult = 0.5 else: # if the last N simtime steps have produced residuals lesser than max, then boost the # timestep if all([r < self.max_residual for r in list(self._residualQ)]): mult = 1.25 else: mult = 1.0 new_step = self.simtime_step * max(0.01, mult) self.simtime_step = min(new_step, self.simtime_total - self.model.clock()) self.logger.info(f'Residual={residual} max={self.max_residual}') self.logger.info('Time-step update {} x {:.2g} = {}'.format( old_step, mult, self.simtime_step))