Source code for pyro

#!/usr/bin/env python3

from __future__ import print_function

import argparse
import importlib
import os

import matplotlib.pyplot as plt

import compare
from util import msg, profile, runparams, io

valid_solvers = ["advection",
                 "advection_nonuniform",
                 "advection_rk",
                 "advection_fv4",
                 "advection_weno",
                 "compressible",
                 "compressible_rk",
                 "compressible_fv4",
                 "compressible_sdc",
                 "compressible_react",
                 "diffusion",
                 "incompressible",
                 "lm_atm",
                 "swe"]


[docs]class Pyro(object): """ The main driver to run pyro. """ def __init__(self, solver_name): """ Constructor Parameters ---------- solver_name : str Name of solver to use """ msg.bold('pyro ...') if solver_name not in valid_solvers: msg.fail("ERROR: %s is not a valid solver" % solver_name) self.pyro_home = os.path.dirname(os.path.realpath(__file__)) + '/' # import desired solver under "solver" namespace self.solver = importlib.import_module(solver_name) self.solver_name = solver_name # ------------------------------------------------------------------------- # runtime parameters # ------------------------------------------------------------------------- # parameter defaults self.rp = runparams.RuntimeParameters() self.rp.load_params(self.pyro_home + "_defaults") self.rp.load_params(self.pyro_home + solver_name + "/_defaults") self.tc = profile.TimerCollection() self.is_initialized = False
[docs] def initialize_problem(self, problem_name, inputs_file=None, inputs_dict=None, other_commands=None): """ Initialize the specific problem Parameters ---------- problem_name : str Name of the problem inputs_file : str Filename containing problem's runtime parameters inputs_dict : dict Dictionary containing extra runtime parameters other_commands : str Other command line parameter options """ problem_defaults_file = self.pyro_home + self.solver_name + \ "/problems/_" + problem_name + ".defaults" # problem-specific runtime parameters if os.path.isfile(problem_defaults_file): self.rp.load_params(problem_defaults_file) # now read in the inputs file if inputs_file is not None: if not os.path.isfile(inputs_file): # check if the param file lives in the solver's problems directory inputs_file = self.pyro_home + self.solver_name + "/problems/" + inputs_file if not os.path.isfile(inputs_file): msg.fail("ERROR: inputs file does not exist") self.rp.load_params(inputs_file, no_new=1) if inputs_dict is not None: for k, v in inputs_dict.items(): self.rp.params[k] = v # and any commandline overrides if other_commands is not None: self.rp.command_line_params(other_commands) # write out the inputs.auto self.rp.print_paramfile() self.verbose = self.rp.get_param("driver.verbose") self.dovis = self.rp.get_param("vis.dovis") # ------------------------------------------------------------------------- # initialization # ------------------------------------------------------------------------- # initialize the Simulation object -- this will hold the grid and # data and know about the runtime parameters and which problem we # are running self.sim = self.solver.Simulation( self.solver_name, problem_name, self.rp, timers=self.tc) self.sim.initialize() self.sim.preevolve() plt.ion() self.sim.cc_data.t = 0.0 self.is_initialized = True
[docs] def run_sim(self): """ Evolve entire simulation """ if not self.is_initialized: msg.fail("ERROR: problem has not been initialized") tm_main = self.tc.timer("main") tm_main.begin() # output the 0th data basename = self.rp.get_param("io.basename") self.sim.write("{}{:04d}".format(basename, self.sim.n)) if self.dovis: plt.figure(num=1, figsize=(8, 6), dpi=100, facecolor='w') self.sim.dovis() while not self.sim.finished(): self.single_step() # final output if self.verbose > 0: msg.warning("outputting...") basename = self.rp.get_param("io.basename") self.sim.write("{}{:04d}".format(basename, self.sim.n)) tm_main.end() # ------------------------------------------------------------------------- # final reports # ------------------------------------------------------------------------- if self.verbose > 0: self.rp.print_unused_params() self.tc.report() self.sim.finalize() return self.sim
[docs] def single_step(self): """ Do a single step """ if not self.is_initialized: msg.fail("ERROR: problem has not been initialized") # fill boundary conditions self.sim.cc_data.fill_BC_all() # get the timestep self.sim.compute_timestep() # evolve for a single timestep self.sim.evolve() if self.verbose > 0: print("%5d %10.5f %10.5f" % (self.sim.n, self.sim.cc_data.t, self.sim.dt)) # output if self.sim.do_output(): if self.verbose > 0: msg.warning("outputting...") basename = self.rp.get_param("io.basename") self.sim.write("{}{:04d}".format(basename, self.sim.n)) # visualization if self.dovis: tm_vis = self.tc.timer("vis") tm_vis.begin() self.sim.dovis() store = self.rp.get_param("vis.store_images") if store == 1: basename = self.rp.get_param("io.basename") plt.savefig("{}{:04d}.png".format(basename, self.sim.n)) tm_vis.end()
def __repr__(self): """ Return a representation of the Pyro object """ s = "Solver = {}\n".format(self.solver_name) if self.is_initialized: s += "Problem = {}\n".format(self.sim.problem_name) s += "Simulation time = {}\n".format(self.sim.cc_data.t) s += "Simulation step number = {}\n".format(self.sim.n) s += "\nRuntime Parameters" s += "\n------------------\n" s += str(self.rp) return s
[docs] def get_var(self, v): """ Alias for cc_data's get_var routine, returns the cell-centered data given the variable name v. """ if not self.is_initialized: msg.fail("ERROR: problem has not been initialized") return self.sim.cc_data.get_var(v)
[docs]class PyroBenchmark(Pyro): """ A subclass of Pyro for benchmarking. Inherits everything from pyro, but adds benchmarking routines. """ def __init__(self, solver_name, comp_bench=False, reset_bench_on_fail=False, make_bench=False): """ Constructor Parameters ---------- solver_name : str Name of solver to use comp_bench : bool Are we comparing to a benchmark? reset_bench_on_fail : bool Do we reset the benchmark on fail? make_bench : bool Are we storing a benchmark? """ super().__init__(solver_name) self.comp_bench = comp_bench self.reset_bench_on_fail = reset_bench_on_fail self.make_bench = make_bench
[docs] def run_sim(self, rtol): """ Evolve entire simulation and compare to benchmark at the end. """ super().run_sim() result = 0 if self.comp_bench: result = self.compare_to_benchmark(rtol) if self.make_bench or (result != 0 and self.reset_bench_on_fail): self.store_as_benchmark() if self.comp_bench: return result else: return self.sim
[docs] def compare_to_benchmark(self, rtol): """ Are we comparing to a benchmark? """ basename = self.rp.get_param("io.basename") compare_file = "{}/tests/{}{:04d}".format( self.solver_name, basename, self.sim.n) msg.warning("comparing to: {} ".format(compare_file)) try: sim_bench = io.read(compare_file) except IOError: msg.warning("ERROR opening compare file") return "ERROR opening compare file" result = compare.compare(self.sim.cc_data, sim_bench.cc_data, rtol) if result == 0: msg.success("results match benchmark to within relative tolerance of {}\n".format(rtol)) else: msg.warning("ERROR: " + compare.errors[result] + "\n") return result
[docs] def store_as_benchmark(self): """ Are we storing a benchmark? """ if not os.path.isdir(self.solver_name + "/tests/"): try: os.mkdir(self.solver_name + "/tests/") except (FileNotFoundError, PermissionError): msg.fail( "ERROR: unable to create the solver's tests/ directory") basename = self.rp.get_param("io.basename") bench_file = self.pyro_home + self.solver_name + "/tests/" + \ basename + "%4.4d" % (self.sim.n) msg.warning("storing new benchmark: {}\n".format(bench_file)) self.sim.write(bench_file)
[docs]def parse_args(): """Parse the runtime parameters""" p = argparse.ArgumentParser() p.add_argument("--make_benchmark", help="create a new benchmark file for regression testing", action="store_true") p.add_argument("--compare_benchmark", help="compare the end result to the stored benchmark", action="store_true") p.add_argument("solver", metavar="solver-name", type=str, nargs=1, help="name of the solver to use", choices=valid_solvers) p.add_argument("problem", metavar="problem-name", type=str, nargs=1, help="name of the problem to run") p.add_argument("param", metavar="inputs-file", type=str, nargs=1, help="name of the inputs file") p.add_argument("other", metavar="runtime-parameters", type=str, nargs="*", help="additional runtime parameters that override the inputs file " "in the format section.option=value") return p.parse_args()
if __name__ == "__main__": args = parse_args() if args.compare_benchmark or args.make_benchmark: pyro = PyroBenchmark(args.solver[0], comp_bench=args.compare_benchmark, make_bench=args.make_benchmark) else: pyro = Pyro(args.solver[0]) pyro.initialize_problem(problem_name=args.problem[0], inputs_file=args.param[0], other_commands=args.other) pyro.run_sim()