import logging
from collections.abc import Mapping
import cerberus
import pkg_resources
from fipy import PhysicalField
from sympy import Symbol, SympifyError, sympify
from .yaml_setup import yaml
# TODO: Allow equation with no diffusion term
physical_unit_type = cerberus.TypeDefinition('physical_unit', (PhysicalField,), ())
[docs]class MicroBenthosSchemaValidator(cerberus.Validator):
"""
A :mod:`cereberus` validator for schema.yml in MicroBenthos
"""
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
logger.propagate = False
types_mapping = cerberus.Validator.types_mapping.copy()
types_mapping['physical_unit'] = physical_unit_type
# def __init__(self, *args, **kwargs):
# # self.logger.propagate = False
# super(MicroBenthosSchemaValidator, self).__init__(*args, **kwargs)
[docs] def _check_with_importpath(self, field, value):
"""
Validates if the value is a usable import path for an entity class
Valid examples are:
* pkg1.pkg2.mod1.class
* class_name
Invalid examples:
* .class_name
Args:
value: A string
Returns:
True if valid
"""
self.logger.debug('Validating importpath: {}'.format(value))
parts = [s.isidentifier() for s in value.split('.')]
if not all(parts):
self._error(field, "Must be a python import path")
# def _validate_type_physical_unit(self, value):
# """ Enables validation for `unit` schema attribute.
# :param value: field value.
# """
# self.logger.debug('Validating physical_unit: {}'.format(value))
# if isinstance(value, PhysicalField):
# if value.unit.name() != '1':
# return True
[docs] def _check_with_unit_name(self, field, value):
"""
Checks that the string can be used as units
"""
self.logger.debug('Validating unit_name: {}'.format(value))
try:
PhysicalField(1, value)
except TypeError:
self._error(field, 'Must be str of physical units')
def _validate_like_unit(self, unit, field, value):
"""
Test that the given value has compatible units
Args:
unit: A string useful with :class:`PhysicalField`
field:
value: An instance of a physical unit
Returns:
boolean if validated
The rule's arguments are validated against this schema:
{'type': 'string'}
"""
self.logger.debug('Validating like_unit: {} {} {}'.format(unit, field, value))
if not isinstance(value, PhysicalField):
self._error(field, 'Must be a PhysicalField, not {}'.format(type(value)))
try:
value.inUnitsOf(unit)
except:
self._error(field, 'Must be compatible with units {}'.format(unit))
[docs] def _validate_like_unit(self, unit, field, value):
"""
Test that the given value has compatible units
Args:
unit: A string useful with :class:`PhysicalField`
field:
value: An instance of a physical unit
Returns:
boolean if validated
The rule's arguments are validated against this schema:
{'type': 'string'}
"""
self.logger.debug('Validating like_unit: {} {} {}'.format(unit, field, value))
if not isinstance(value, PhysicalField):
self._error(field, 'Must be a PhysicalField, not {}'.format(type(value)))
try:
value.inUnitsOf(unit)
except:
self._error(field, 'Must be compatible with units {}'.format(unit))
def _check_with_sympify(self, field, value):
self.logger.debug(f'Checking if {value} usable with sympify')
try:
e = sympify(value)
except SympifyError:
self._error(field, "Must be str compatible with sympify")
# def _validate_type_sympifyable(self, value):
# """
# A string that can be run through sympify
# """
# self.logger.debug('Validating sympifyable: {}'.format(value))
# if not isinstance(value, (str, int, float)):
# return False
# try:
# e = sympify(value)
# self.logger.debug('Sympified: {}'.format(e))
# return True
# except:
# return False
[docs] def _check_with_sympy_symbol(self, field, value):
"""
String that can be run through sympify and only has one
variable symbol in it.
"""
self.logger.debug(f'Check if {value} is a sympy symbol')
try:
e = sympify(value)
valid = isinstance(e, Symbol)
except SympifyError:
valid = False
if not valid:
self._error(field, "Must be a single symbol in sympy")
# def _validate_type_symbolable(self, value):
# """
# String that can be run through sympify and only has one variable symbol in it.
# """
# self.logger.debug('Validating symbolable: {}'.format(value))
# try:
# e = sympify(value)
# return isinstance(e, Symbol)
# except:
# return False
[docs] def _check_with_model_path(self, field, value):
"""
Validate that the value of the field is a model store path
Value should be of type:
* domain.oxy
* env.oxy.var
* microbes.cyano.processes.oxyPS
The rule's arguments are validated against this schema:
{'type': 'string'}
"""
if '.' not in value:
self._error(field, 'Model path should be a dotted string')
parts = value.split('.')
if not all([len(p) for p in parts]):
self._error(field, 'Model path must not have empty parts')
ALLOWED_ROOTS = ('env', 'domain', 'microbes')
if parts[0] not in ALLOWED_ROOTS:
self._error(field, f'Model path root should be in {ALLOWED_ROOTS}')
if parts[0] == 'microbes':
MICROBE_SUBPARTS = ('features', 'processes')
if len(parts) < 4:
self._error(field,
'Microbes model path needs atleast 4 path '
'parts')
if parts[2] not in MICROBE_SUBPARTS:
self._error(field,
'Microbes model path should be of type {}'.format(
MICROBE_SUBPARTS))
# def _validate_model_store(self, jnk, field, value):
# """
# Validate that the value of the field is a model store path
#
# Value should be of type:
# * domain.oxy
# * env.oxy.var
# * microbes.cyano.processes.oxyPS
#
# The rule's arguments are validated against this schema:
# {'type': 'string'}
# """
# self.logger.debug('Validating model_store={} for field {!r}: {!r}'.format(
# jnk, field, value
# ))
#
# if '.' not in value:
# self._error(field, 'Model store should be a dotted path, not {}'.format(value))
#
# parts = value.split('.')
#
# if not all([len(p) for p in parts]):
# self._error(field, 'Model store has empty path element: {}'.format(value))
#
# if parts[0] not in ('env', 'domain', 'microbes'):
# self._error(field, 'Model store root should be in (env, domain, microbes)')
#
# if parts[0] in ('domain', 'env'):
# pass
#
# elif parts[0] == 'microbes':
# mtargets = ('features', 'processes')
#
# if len(parts) < 4:
# self._error(field, 'Microbes model store needs atleast 4 path elements')
#
# if parts[2] not in mtargets:
# self._error(field, 'Microbes model store should be of type {}'.format(mtargets))
def _normalize_coerce_float(self, value):
return float(value)
[docs]def validate_yaml(stream, key = None, schema = None, schema_stream = None):
logger = logging.getLogger(__name__)
logger.info('Loading definition with yaml')
inp_dict = yaml.unsafe_load(stream)
if key:
inp_dict = inp_dict[key]
return validate_dict(inp_dict, key=key, schema=schema, schema_stream=schema_stream)
[docs]def validate_dict(inp_dict, key, schema = None, schema_stream = None):
logger = logging.getLogger(__name__)
logger.info('Loading definition from: {}'.format(inp_dict.keys()))
logger.debug('Using schema key {!r} from schema_stream={}'.format(key, schema_stream))
if schema is None:
schema = get_schema(schema_stream=schema_stream)
else:
if not isinstance(schema, Mapping):
raise TypeError('Supplied schema should be a mapping, not {!r}'.format(type(schema)))
if key:
schema = schema[key]
logger.debug('Schema with entries: {}'.format(schema.keys()))
validator = MicroBenthosSchemaValidator()
validated = validator.validated(inp_dict, schema)
if not validated:
logger.propagate = True
logger.error('Input definition not validated for schema {!r}!'.format(key))
from pprint import pformat
logger.warning(pformat(validator.errors))
for path, errmsg in _denest_errors(validator.errors, [], []):
logger.error('Error: {} :: {}'.format(path, errmsg))
raise ValueError('Definition of {!r} invalid!'.format(key))
else:
logger.info('{} definition successfully loaded: {}'.format(key, validated.keys()))
return validated
def _denest_errors(D, paths, all_items):
for k in D:
# print('descending into {}'.format(k))
v = D[k]
paths.append(k)
for item in v:
if isinstance(item, dict):
_denest_errors(item, paths, all_items)
if paths:
paths.pop(-1)
elif isinstance(item, str):
# full_path = '.'.join([str(_) for _ in paths])
# print(f'{full_path}: {item}')
all_items.append(('.'.join(str(_) for _ in paths), item))
if paths:
paths.pop(-1)
return all_items
[docs]def get_schema(schema_stream = None):
"""
Returns the inbuilt model schema
"""
# INBUILT = pkg_resources.resource_stream(__name__, 'schema.yml')
if schema_stream:
schema = yaml.unsafe_load(schema_stream)
else:
with pkg_resources.resource_stream(__name__, 'schema.yml') as INBUILT:
schema = yaml.unsafe_load(INBUILT)
return schema
[docs]def find_subclasses_recursive(baseclass, subclasses = None):
"""
Find subclasses recursively. `subclasses` should be a set into which to add the subclasses
"""
if subclasses is None:
subclasses = set()
if not isinstance(baseclass, type):
raise ValueError('Need a class, but received: {} of type {}'.format(
baseclass, type(baseclass)))
for sclass in baseclass.__subclasses__():
subclasses.add(sclass)
find_subclasses_recursive(sclass, subclasses)
return subclasses