Source code for microbenthos.runners.simulate

import contextlib
import importlib
import logging
import os
import warnings
from collections import OrderedDict

import click

from ..exporters import BaseExporter
from ..model import MicroBenthosModel, Simulation
from ..utils import yaml, find_subclasses_recursive
from ..utils.log import SIMULATION_DEFAULT_FORMATTER, SIMULATION_DEBUG_FORMATTER

DUMP_KWARGS = dict(
    indent=4,
    explicit_start=True,
    explicit_end=True,
    default_flow_style=False
    )


[docs]class SimulationRunner(object): """ Class that handles the pipeline of creating and running simulations of models, and generating outputs from it. This class will: * check that the model & simulation setup are complete * check the output path * create a log file * export the model definition * export the run setup * setup the exporters * run the simulation evolution * clean up """ def __init__(self, output_dir = None, resume = False, confirm = False, overwrite = False, model = None, simulation = None, progress = False, progress_tag = 'evolution', plot = False, video = False, frames = False, budget = False, exporters = None, show_eqns = False, ): self.logger = logging.getLogger(__name__) self.logger.info('Initializing {}'.format(self)) self._model = None self._simulation = None exporters = exporters or [] self.exporters = OrderedDict() self.output_dir = output_dir or '.' self._log_fh = None if resume == 0 or resume is None: self.logger.warning( 'Resume = 0 implies to restart simulation. Setting overwrite=True instead') resume = False overwrite = True if resume is True: resume = -1 # overwrite = False if resume: overwrite = False self.resume = resume self.overwrite = overwrite self.confirm = confirm self.show_eqns = show_eqns # load up exporters from microbenthos.utils import find_subclasses_recursive from microbenthos.exporters import BaseExporter self._exporter_classes = {e._exports_: e for e in find_subclasses_recursive(BaseExporter)} if model: self.model = model if simulation is not None: self.simulation = simulation if progress: exporters.append(dict(exptype='progress', position=int(progress), desc=str(progress_tag))) if plot or video or frames: exporters.append(dict(exptype='graphic', write_video=video, show=plot, track_budget=budget, write_frames=frames)) if self.resume and video: self.logger.warning( 'Video will begin from this simulation run, since resume is set!') if exporters: # add other exporters for expdef in exporters: self.add_exporter(output_dir=self.output_dir, **expdef) def __repr__(self): return 'SimulationRunner' def _check_data_path(self, data_path = None): data_path = data_path EXISTS = os.path.exists(data_path) self.logger.debug('Checking data outpath: {} (exists={})'.format( data_path, EXISTS )) self.logger.debug('resume={} overwrite={} confirm={}'.format( self.resume, self.overwrite, self.confirm )) if EXISTS: if not self.resume and not self.overwrite: if not self.confirm: click.secho( 'Ambiguous case with --no-confirm: file exists and neither --overwrite nor' ' --resume were specified', fg='red') raise click.Abort() if self.resume: self.overwrite = False if self.overwrite: if self.confirm: click.confirm( 'Overwrite existing file: {}?'.format(data_path), abort=True) self.overwrite = True if self.overwrite: click.secho('Deleting output path: {}'.format(data_path), fg='red') os.remove(data_path) assert not os.path.exists(data_path) else: self._create_output_dir() @property def model(self): """ The model to run with the :attr:`simulation`. Typically an instance of :class:`~microbenthos.MicroBenthosModel`. """ return self._model @model.setter def model(self, obj): if self.model is None: m = MicroBenthosModel.create_from(obj) self.logger.debug('Created model {}'.format(m)) self._model = m self.logger.debug('Model set: {}'.format(self.model)) else: raise RuntimeError('Model already set in runner!') @property def simulation(self): """ The :class:`~microbenthos.Simulation` instance that will run with the :attr:`.model` """ return self._simulation @simulation.setter def simulation(self, obj): if self.simulation is None: self._simulation = Simulation.create_from(obj) self.logger.debug('Simulation set: {}'.format(self.simulation)) if not self.simulation.model and self.model: self.simulation.model = self.model else: raise RuntimeError('Simulation already set in runner!') def _load_exporters(self): self._exporter_classes = {c._exports_: c for c in find_subclasses_recursive(BaseExporter)} self.logger.debug("Loaded exporter classes: {}".format(self._exporter_classes.keys()))
[docs] def add_exporter(self, exptype, name = None, **kwargs): """ Add an exporter to the simulation run Args: exptype (str): The type of exporter. This should match the :attr:`_exports_ on the class of the exporter. (See :class:`~microbenthos.exporters.exporter.BaseExporter`) name (str): The name to set for the exporter **kwargs: passed to the init of the exporter class. Returns: The name of the exporter created """ self.logger.debug('Adding exporter for {}'.format(exptype)) if not name: name = exptype if name in self.exporters: raise ValueError('Exporter with name {!r} already exists!'.format(name)) if not self._exporter_classes: self._load_exporters() cls = self._exporter_classes.get(exptype) if cls is None: raise ValueError('No exporter of type {!r} found. Available: {}'.format( exptype, self._exporter_classes.keys())) instance = cls(name=name, **kwargs) self.logger.info('Adding exporter {!r}: {!r}'.format(name, instance)) self.exporters[name] = instance
[docs] def _create_output_dir(self): """ Create the output directory Raises: OSError: if :attr:`.output_dir` is a file and not a dir """ if self.output_dir is None: raise ValueError('output_dir cannot be empty for creation') if not os.path.isdir(self.output_dir): self.logger.debug('Creating output directory') try: os.makedirs(self.output_dir) except OSError: self.logger.error('Error creating output_dir') raise
[docs] def resume_existing_simulation(self, data_outpath = None): if not self.resume: self.logger.info( 'resume={}, so will not resume from existing file'.format(self.resume)) return data_outpath = data_outpath or self.data_outpath if not os.path.exists(data_outpath): self.logger.debug('Outpath does not exist, cannot resume...') return from fipy import PhysicalField import h5py as hdf # open the store and read out the time info with hdf.File(data_outpath, 'r') as store: tds = store['/time/data'] nt = len(tds) target_time = tds[self.resume] latest_time = tds[-1] time_unit = tds.attrs['unit'] target_time = PhysicalField(target_time, time_unit) latest_time = PhysicalField(latest_time, time_unit) click.secho( 'Model resume set: rewind from latest {} ({}) to {} ({})?'.format( latest_time, nt, target_time, self.resume ), fg='red') if self.confirm: click.confirm('Rewinding model clock can lead to data loss! Continue?', default=False, abort=True) try: with hdf.File(data_outpath, 'a') as store: self.model.restore_from(store, time_idx=self.resume) click.secho( 'Model restore successful. Clock = {}'.format(self.model.clock), fg='green') self.simulation.simtime_step = 1 # set a small simtime to start except: click.secho('Simulation could not be restored from given data file!', fg='red') raise # click.Abort()
[docs] def setup_logfile(self, mode = 'a'): """ Setup log file in the output directory """ logfile = os.path.join(self.output_dir, 'simulation.log') logger = logging.getLogger(__name__.split('.')[0]) lvl = 20 fh = self._log_fh = logging.FileHandler(logfile, mode=mode) fh.setLevel(lvl) fmt = SIMULATION_DEBUG_FORMATTER if lvl < 20 else SIMULATION_DEFAULT_FORMATTER fh.setFormatter(fmt) logger.addHandler(fh) self.logger.debug('Created simulation logfile: {}'.format(logfile))
[docs] def teardown_logfile(self): self.logger.debug('Ending simulation logfile!') logger = logging.getLogger(__name__.split('.')[0]) logger.removeHandler(self._log_fh)
[docs] def save_definitions(self): """ Save the model and simulation definition to the output directory """ DEFINITION_FILE = 'definition.yml' self.logger.info('Saving model definition: {}'.format(DEFINITION_FILE)) with open(os.path.join(self.output_dir, DEFINITION_FILE), 'w') as fp: yaml.dump(dict( model=self.model.definition_, simulation=self.simulation.definition_ ), fp, **DUMP_KWARGS)
[docs] def save_run_info(self): """ Save the runner info to output_dir """ self.logger.info('Saving runner info: runner.yml') with open(os.path.join(self.output_dir, 'runner.yml'), 'w') as fp: yaml.dump(self.get_info(), fp, **DUMP_KWARGS)
[docs] def get_info(self): """ Return a dictionary of info about the runtime environment """ runner = dict(cls=self.__class__.__name__) libraries = ['fipy', 'scipy', 'PyTrilinos', 'pysparse', 'numpy', 'cerberus', 'yaml', 'sympy', 'click', 'h5py', 'matplotlib', 'microbenthos'] library_versions = {} for name in libraries: try: lib = importlib.import_module(name) version = lib.__version__ library_versions[name] = version except ImportError: self.logger.debug('Could not import module: {}'.format(name)) exporters = {} for expname in self.exporters: exp = self.exporters[expname] exporters[expname] = exp.get_info() return dict(libraries=library_versions, exporters=exporters, runner=runner)
[docs] def check_simulation(self): self.logger.info('Checking simulation') required = [self.model, self.simulation] if not all([s is not None for s in required]): self.logger.error('Setup incomplete: {}'.format(required)) raise RuntimeError( 'Simulation run cannot begin without model and simulation')
[docs] def prepare_simulation(self): """ Prepare the simulation by setting up the model """ self.logger.info('Preparing simulation') if not self.simulation.model and self.model: self.simulation.model = self.model
[docs] @contextlib.contextmanager def exporters_activated(self): """ A context manager that starts and closes the exporters Returns: """ self.logger.info('Preparing exporters: {}'.format(self.exporters.keys())) state = self.simulation.get_state(state=self.model.snapshot()) for expname, exporter in self.exporters.items(): try: exporter.setup(self, state) except: self.logger.exception('Error in setting up exporter: {}'.format( expname)) raise yield # once context returns self.logger.info('Closing exporters: {}'.format(self.exporters.keys())) for expname, exporter in self.exporters.items(): if exporter.started: try: exporter.close() except: self.logger.error('Error in closing exporter: {}'.format(expname)) raise
[docs] def get_data_exporters(self): return list(filter( lambda e: e._exports_ == 'model_data', self.exporters.values()))
[docs] def run(self): """ Run the simulation with the stored model and simulation setup. This performs a sequence of operations: * shows equations if :attr:`.show_eqns` is set * Announces simulation settings in output * Runs :meth:`.check_simulation` * creates output directory * sets up the logfile * saves definitions to yaml outputs of simulation & model * saves the runtime info (library, exporter versions etc) * prepares the simulation * activates the exporter context (see :meth:`.exporters_activated`) * iterates over the :meth:`.simulation.evolution` and passes returned state to the exporters * after that tears down the logfile Raises: RuntimeError: if no simulation exists """ self.logger.debug('Starting simulation run') if not self.simulation: raise RuntimeError('No simulation defined to run') if not self.exporters: self.logger.error( 'No exporters defined for simulation run. Consider adding "model_data" to export ' 'data.') for dexporter in self.get_data_exporters(): self.logger.info('Checking outpath & resume of {}: {}'.format(dexporter, dexporter.outpath)) self._check_data_path(dexporter.outpath) self.resume_existing_simulation(dexporter.outpath) if self.show_eqns: click.secho('Solving the equation(s):', fg='green') for neqn, eqn in self.model.equations.items(): click.secho(eqn.as_pretty_string(), fg='green') click.secho( 'Simulation setup: solver={0.fipy_solver} ' 'max_sweeps={0.max_sweeps} max_residual={0.max_residual} ' 'timestep_lims=({1})'.format( self.simulation, [str(s) for s in self.simulation.simtime_lims]), fg='yellow') click.echo('Simulation clock at {}. Run till {}'.format( self.model.clock, self.simulation.simtime_total)) if self.confirm: click.confirm('Proceed with simulation run?', default=True, abort=True) click.secho('Starting simulation...', fg='green') self.logger.debug('Preparing to run simulation') self.check_simulation() if not self.exporters: self.logger.warning('No exporters set for simulation run!') self._create_output_dir() self.setup_logfile() self.save_definitions() self.save_run_info() self.prepare_simulation() self.logger.info('Solving equations') for name, eqn in self.model.equations.items(): self.logger.info(eqn.as_pretty_string()) self.logger.info('As fipy equations') for name, eqn in self.model.equations.items(): self.logger.info('Equation {}: {!r}'.format(name, eqn.obj)) warnings.filterwarnings('ignore', category=RuntimeWarning, module='fipy') with self.exporters_activated(): for step in self.simulation.evolution(): try: # time.sleep(1e-5) # step is (num, state) is the model snapshot if step: num, state = step export_due = self.simulation.snapshot_due() or (num == 0) self.logger.info('Step #{}: Exporting model state'.format(num)) for exporter in self.exporters.values(): if export_due: exporter.process(num, state) else: if exporter.is_eager: exporter.process(num, state) self.logger.info('Step #{}: Export done'.format(num)) else: self.logger.warning('Empty model state received!') except KeyboardInterrupt: self.logger.error("Keyboard interrupt on simulation run!") raise SystemExit self.teardown_logfile() warnings.resetwarnings() click.secho('Simulation done.', fg='green')