import logging
from fipy import CellVariable, PhysicalField
from fipy.tools import numerix
from .entity import DomainEntity
from ..utils.snapshotters import restore_var, snapshot_var
[docs]class ModelVariable(DomainEntity):
"""
A class to represent a variable on the model domain.
This class serves as means for defining features such as environmental variables, chemical
analytes or microbiological features (biomass). The class allows defining the variable but
deferring the initialization of the associated :class:`CellVariable` until the domain is
available.
The interface defined allows to:
* set boundary condition to :meth:`.constrain` the values
* :meth:`.seed` the variable values
* create a :meth:`.snapshot` of the state and :meth:`restore_from` it
* set :attr:`.clip_min` and :attr:`.clip_max` limits on the values
"""
[docs] def __init__(self, create,
constraints = None,
seed = None,
clip_min = None,
clip_max = None,
**kwargs):
"""
Configure the creation of a variable and its boundary conditions
Args:
name (str): The name of the variable
create (dict): parameters for variable creation (see :meth:`.create`)
constraints (dict): Mapping of `location` to value of boundary condition
through :meth:`.constrain`
seed (dict): parameters to seed initial value of variable
"""
self.logger = kwargs.get('logger') or logging.getLogger(__name__)
kwargs['logger'] = self.logger
super(ModelVariable, self).__init__(**kwargs)
self.logger.debug('Init in {} {!r}'.format(self.__class__.__name__, self.name))
self.var = None
""":type : :class:`fipy.CellVariable`
Object created on domain.mesh"""
self.create_params = self.check_create(**create)
""":type : dict
Validated dict of params for creation of variable"""
self.logger.debug('{} saving create params {}'.format(self, self.create_params))
#: mapping of domain location to values for boundary conditions (see :meth:`constrain`)
self.constraints = constraints or dict()
self.check_constraints(self.constraints)
self.seed_params = seed or dict()
#: the min value to which the variable array is clipped
self.clip_min = clip_min
#: the max value to which the variable array is clipped
self.clip_max = clip_max
def __repr__(self):
return 'Var({})'.format(self.name)
[docs] @staticmethod
def check_create(**params):
"""
Check that the given `params` are valid for creating the variable once the domain is
available
Args:
name (str): a required identifier string
unit (str): a string like ("kg/m**3") that defines the physical units of the variable
value (float, :class:`~numpy.ndarray`, :class:`~fipy.PhysicalField`): the value to set
for the variable. If a :class:`PhysicalField` is supplied, then its (base) unit is
used as `unit` and overrides any supplied `unit`.
Returns:
dict: the params dictionary to be used
Raises:
ValueError: if no `name` is supplied
ValueError: if `unit` is not a valid input for :class:`PhysicalField`
Note:
Due to limitation in :mod:`fipy` (v3.1.3) that meshes do not accept arrays
:class:`PhysicalField` as inputs, the variables defined here are cast into base units
since the domain mesh is created in meters.
"""
name = params.get('name')
if name:
raise ValueError('Create params should not contain name. Will be set from init name.')
from fipy import PhysicalField
value = params.get('value', 0.0)
if hasattr(value, 'unit'):
unit = value.unit
else:
unit = params.get('unit')
try:
p = PhysicalField(value, unit)
except:
raise ValueError('{!r} is not a valid unit!'.format(unit))
pbase = p.inBaseUnits()
params['unit'] = pbase.unit.name()
params['value'] = pbase.value
return params
[docs] @staticmethod
def check_constraints(constraints):
"""
Check that constraints is a mapping of location to value of boundary conditions.
Recognized location values are: `"top", "bottom"`.
Raises:
TypeError if `constraints` is not a mapping
ValueError if `loc` is not valid
ValueError if `value` is not single-valued
"""
logger = logging.getLogger(__name__)
locs = ('top', 'bottom', 'dbl', 'sediment')
try:
for loc, val in dict(constraints).items():
pass
except (ValueError, TypeError):
logger.error('Constraints not a mapping of pairs: {}'.format(constraints))
raise ValueError('Constraints should be mapping of (location, value) pairs!')
for loc, val in dict(constraints).items():
loc_, grad_type = ModelVariable._parse_constraint_loc(loc)
if loc_ not in locs:
raise ValueError('Constraint loc={!r} unknown. Should be in {}'.format(
loc_, locs
))
try:
if isinstance(val, PhysicalField):
v = float(val.value)
else:
v = float(val)
except:
raise ValueError('Constraint should be single-valued, not {!r}'.format(val))
[docs] def setup(self, **kwargs):
"""
Once a domain is available, :meth:`.create` the variable with the requested parameters and
apply any constraints.
Constraints are specified as:
* ``"top"`` : ``domain.mesh.facesLeft``
* ``"bottom"`` : ``domain.mesh.facesRight``
* ``"dbl"`` : ``slice(0, domain.idx_surface)``
* ``"sediment:`` : ``slice(domain.idx_surface, None)``
Note:
The constraints "dbl" and "sediment" are not yet tested to work with fipy
equations. It is likely that this formulation may not work correctly.
Specifying both "top" and "dbl" or "bottom" and "sediment" does not currently raise
an error, but instead warning messages are logged.
"""
self.check_domain()
self.create(**self.create_params)
if self.seed_params:
self.seed(profile=self.seed_params['profile'], **self.seed_params.get('params', {}))
if self.create_params.get('hasOld'):
self.var.updateOld()
self._LOCs = {
'top': self.domain.mesh.facesLeft,
'bottom': self.domain.mesh.facesRight,
'dbl': slice(0, self.domain.idx_surface),
'sediment': slice(self.domain.idx_surface, None)
}
invalid_pairs = [('top', 'dbl'), ('bottom', 'sediment')]
for pair in invalid_pairs:
if all(p in self.constraints for p in pair):
self.logger.warning('Constraints specified with invalid pair: {}'.format(pair))
for loc, value in dict(self.constraints).items():
self.constrain(loc, value)
@staticmethod
def _parse_constraint_loc(loc):
locparts = loc.split('.')
loc_ = locparts[0]
grad_type = None
if len(locparts) == 2:
grad_type = locparts[1]
elif len(locparts) > 2:
raise ValueError(f'Constraint loc should be either loc or '
f'loc.grad_type, but got: {loc}')
if grad_type and not hasattr(CellVariable, grad_type):
raise ValueError(f'Constraint {loc} has grad_type={grad_type} '
f'which is not found on CellVariable')
return (loc_, grad_type)
[docs] def create(self, value, unit = None, hasOld = False, **kwargs):
"""
Create a :class:`~fipy.CellVariable` by calling :meth:`.domain.create_var`.
Args:
value (int, float, array, PhysicalField): the value for the array
unit (str): physical units string for the variable. Is overridden if `value` is a
`PhysicalField`.
hasOld (bool): flag to indicate that the variable should store the older value
separately (see :class:`fipy.CellVariable`).
Returns:
instance of the variable created
Raises:
RuntimeError: if a domain variable with `name` already exists, or no mesh exists on
the domain.
ValueError: if value.shape is not 1 or the domain shape
"""
self.logger.debug('Creating variable {!r} with unit {}'.format(self.name, unit))
self.var = self.domain.create_var(name=self.name, value=value,
unit=unit, hasOld=hasOld,
**kwargs)
return self.var
[docs] def constrain(self, loc, value):
"""
Constrain the variable at the given location to the given value
Args:
loc (str): One of ``("top", "bottom", "dbl", "sediment")``
value (float, :class:`PhysicalField`): a numeric value for the constraint
Returns:
None
Raises:
TypeError: if the units for `value` are incompatible with :attr:`.var` units
ValueError: for improper values for `loc`
ValueError: if value is not a 0-dimension array
RuntimeError: if variable doesn't exist
"""
if self.var is None:
raise RuntimeError('Variable {} does not exist!'.format(self.name))
self.logger.debug("Setting constraint for {!r}: {} = {}".format(self.var, loc, value))
loc, grad_type = self._parse_constraint_loc(loc)
if loc in ('top', 'bottom'):
mask = self._LOCs[loc]
else:
mask = numerix.zeros(self.var.shape, dtype=bool)
try:
L = self._LOCs[loc]
self.logger.debug('Constraint mask loc: {}'.format(L))
mask[L] = 1
except KeyError:
raise ValueError('loc={} not in {}'.format(loc, tuple(self._LOCs.keys())))
if isinstance(value, PhysicalField):
value = value.inUnitsOf(self.var.unit)
else:
value = PhysicalField(value, self.var.unit)
self.logger.info('Constraining {} (grad={}) at {} = {}'.format(
self.var, grad_type, loc, value))
if grad_type:
var_entity = getattr(self.var, grad_type)
else:
var_entity = self.var
var_entity.constrain(value, mask)
[docs] def seed(self, profile, **kwargs):
"""
Seed the value of the variable based on the profile and parameters
Available profiles are:
* "normal"
* normal distribution of values from :data:`scipy.stats.norm`
* `loc` and `scale` in units compatible with domain mesh
* `coeff` to multiply the distribution with, in units compatible with that of
:attr:`.var`
* the normal distribution is created to have unit height for different `loc` and
`scale` values
* "linear"
* uses :func:`~numpy.linspace` to fill the first dimension of :attr:`.var`
* `start`: the start value given, else taken from constraint "top"
* `stop`: the stop value given, else taken from constraint "bottom"
* "lognormal"
* lognormal distributuion from :data:`scipy.stats.lognorm`
* `loc` and `scale` should be in units compatible with domain mesh
* `shape` should be a float > 0
* the distribution is normalized to have max value = 1
Args:
profile (str): The type of profile to use
**kwargs: Parmeters for the profile
Returns:
None
Raises:
ValueError: if incompatible units encountered
"""
PROFILES = ('linear', 'normal', 'lognormal')
if profile not in PROFILES:
raise ValueError('Unknown profile {!r} not in {}'.format(profile, PROFILES))
if profile == 'normal':
from scipy.stats import norm
loc = kwargs['loc']
scale = kwargs['scale']
coeff = kwargs['coeff']
C = 1.0 / numerix.sqrt(2 * numerix.pi)
# loc and scale should be in units of the domain mesh
if hasattr(loc, 'unit'):
loc_ = loc.inUnitsOf(self.domain.depths.unit).value
else:
loc_ = loc
if hasattr(scale, 'unit'):
scale_ = scale.inUnitsOf(self.domain.depths.unit).value
else:
scale_ = scale
if hasattr(coeff, 'unit'):
# check if compatible with variable unit
try:
c = coeff.inUnitsOf(self.var.unit)
except TypeError:
self.logger.error(
'Coeff {!r} not compatible with variable unit {!r}'.format(
coeff,
self.var.unit.name()))
raise ValueError('Incompatible unit of coefficient')
self.logger.info(
'Seeding with profile normal loc: {} scale: {} coeff: {}'.format(
loc_, scale_,
coeff))
normrv = norm(loc=loc_, scale=scale_)
rvpdf = normrv.pdf(self.domain.depths)
rvpdf /= rvpdf.max()
val = coeff * rvpdf
self.var.value = val
elif profile == 'lognormal':
from scipy.stats import lognorm
loc = kwargs['loc']
scale = kwargs['scale']
coeff = kwargs['coeff']
lognorm_shape = kwargs.get('shape', 1.25)
# loc and scale should be in units of the domain mesh
if hasattr(loc, 'unit'):
loc_ = loc.inUnitsOf(self.domain.depths.unit).value
else:
loc_ = loc
if hasattr(scale, 'unit'):
scale_ = scale.inUnitsOf(self.domain.depths.unit).value
else:
scale_ = scale
if hasattr(coeff, 'unit'):
# check if compatible with variable unit
try:
c = coeff.inUnitsOf(self.var.unit)
except TypeError:
self.logger.error(
'Coeff {!r} not compatible with variable unit {!r}'.format(
coeff, self.var.unit.name()))
raise ValueError('Incompatible unit of coefficient')
self.logger.info(
'Seeding with profile lognormal loc: {} scale: {} shape: {} '
'coeff: {}'.format(loc_, scale_, lognorm_shape, coeff))
rv = lognorm(lognorm_shape, loc=loc_, scale=scale_)
rvpdf = rv.pdf(self.domain.depths)
rvpdf = rvpdf / rvpdf.max()
val = coeff * rvpdf
self.var.value = val
elif profile == 'linear':
start = kwargs.get('start')
stop = kwargs.get('stop')
if start is None:
start = self.constraints.get('top')
if start is None:
raise ValueError('Seed linear has no "start" or "top" constraint')
else:
start = PhysicalField(start, self.var.unit)
self.logger.info('Linear seed using start as top value: '
'{}'.format(start))
if stop is None:
stop = self.constraints.get('bottom')
if stop is None:
raise ValueError('Seed linear has no "stop" or "bottom" constraint')
else:
stop = PhysicalField(stop, self.var.unit)
self.logger.info('Linear seed using stop as bottom value: '
'{}'.format(stop))
N = self.var.shape[0]
if hasattr(start, 'unit'):
start_ = start.inUnitsOf(self.var.unit).value
else:
start_ = start
if hasattr(stop, 'unit'):
stop_ = stop.inUnitsOf(self.var.unit).value
else:
stop_ = stop
self.logger.info(
'Seeding with profile linear: start: {} stop: {}'.format(start_, stop_))
val = numerix.linspace(start_, stop_, N)
self.var.value = val
self.logger.debug('Seeded {!r} with {} profile'.format(self, profile))
[docs] def snapshot(self, base = False):
"""
Returns a snapshot of the variable's state, with the following structure:
* data:
* (:attr:`.var`, `dict(unit=:meth:`.var.unit.name()`)
* metadata
* constraint_<name>: constraint_value (in :attr:`.constraints`)
Args:
base (bool): Convert to base units?
Returns:
dict: the variable state
"""
self.logger.debug('Snapshot: {}'.format(self))
self.check_domain()
state = dict()
state['data'] = snapshot_var(self.var, base=base)
meta = state['metadata'] = {}
for cloc, cval in self.constraints.items():
key = 'constraint_{}'.format(cloc)
meta[key] = str(cval)
return state
[docs] def restore_from(self, state, tidx):
"""
Restore the variable from a saved state
It sets the value of the :attr:`.var` to that stored in the `state`.
Args:
state (dict): the saved state
tidx (int, None): passed to :func:`.restore_var`
The `state` dictionary must be of the structure:
* `data`: (array, dict(unit=str))
Raises:
ValueError: if the state restore does not succeed
"""
self.logger.debug('Restoring {} from state: {}'.format(self, tuple(state)))
self.check_domain()
try:
self.var.setValue(restore_var(state, tidx))
self.logger.debug('{} restored state'.format(self))
except:
self.logger.exception('Data restore failed')
raise ValueError('{}: restore of "data" failed!'.format(self))