Source code for hoomd_flowws.Run

import collections
import itertools
import logging
import os

import numpy as np

from .internal import intfloat, HoomdContext
import hoomd
import hoomd.md
import flowws
from flowws import Argument as Arg

logger = logging.getLogger(__name__)

[docs]@flowws.add_stage_arguments class Run(flowws.Stage): """Run for a given number of timesteps using MD""" ARGS = [ Arg('steps', '-s', intfloat, None, help='Number of timesteps to run'), Arg('timestep_size', None, float, 0.005, help='Timestep size'), Arg('integrator', None, str, required=True, help='Integrator type'), Arg('temperature', None, float, 1, help='Temperature for isothermal simulations'), Arg('ramp_t_to', None, float, None, help='Ramp temperature to the given value over time during this stage'), Arg('tau_t', None, float, 1, help='Thermostat time constant for isothermal simulations'), Arg('pressure', None, float, 1, help='Pressure for isobaric simulations'), Arg('tau_p', None, float, 10, help='Barostat time constant for isobaric simulations'), Arg('bd_seed', None, int, 12, help='Random number seed for Brownian/Langevin thermostats'), Arg('zero_momentum', '-z', intfloat, 0, help='Period for zeroing the momentum of the system (default: disabled)'), Arg('backup_period', '-b', intfloat, 0, help='Period for dumping a backup file'), Arg('dump_period', '-d', intfloat, 0, help='Period for dumping a trajectory file'), Arg('trajectory_quantities', None, [str], [], help='Additional trajectory quantities to dump'), Arg('dump_quantities', '-q', (intfloat, str), help='Dump period and comma-separated list of quantities to record (i.e. temperature,pressure)'), Arg('expand_by', None, float, help='Expand each dimension of the box by this ratio during this stage'), Arg('compress_to', None, float, help='Compress to the given packing fraction during this stage (overrides expand_by)'), ] def setup_integrator(self, scope, storage, context): integrator_type = self.arguments['integrator'] kT = self.arguments['temperature'] if self.arguments.get('ramp_t_to', None) is not None: times = [0, self.arguments['steps']] temperatures = [self.arguments['temperature'], self.arguments['ramp_t_to']] kT = hoomd.variant.linear_interp( list(zip(times, temperatures)), zero=scope['previous_steps']) if integrator_type == 'nve': integrator = hoomd.md.integrate.nve(hoomd.group.all()) elif integrator_type == 'nvt': tau = self.arguments['tau_t'] integrator = hoomd.md.integrate.nvt( hoomd.group.all(), kT=kT, tau=tau) elif integrator_type == 'langevin': seed = self.arguments['bd_seed'] integrator = hoomd.md.integrate.langevin( hoomd.group.all(), kT=kT, seed=seed) elif integrator_type == 'brownian': seed = self.arguments['bd_seed'] integrator = hoomd.md.integrate.brownian( hoomd.group.all(), kT=kT, seed=seed) elif integrator_type == 'npt': tau = self.arguments['tau_t'] pressure = self.arguments['pressure'] tauP = self.arguments['tau_p'] integrator = hoomd.md.integrate.npt( hoomd.group.all(), kT=kT, tau=tau, P=pressure, tauP=tauP) else: raise NotImplementedError( 'Unknown integrator type {}'.format(integrator_type)) hoomd.md.integrate.mode_standard(dt=self.arguments['timestep_size']) if self.arguments['zero_momentum']: hoomd.md.update.zero_momentum(self.arguments['zero_momentum']) scope['integrator'] = integrator return integrator def setup_dumps(self, scope, storage, context): if self.arguments['backup_period']: backup_filename = context.get_backup_filename() backup_file = context.enter_context( storage.open(backup_filename, 'wb', on_filesystem=True, noop=scope['mpi_rank'])) hoomd.dump.getar.simple( backup_file.name, self.arguments['backup_period'], '1', static=[], dynamic=['all']) if self.arguments.get('dump_quantities', None): dump_filename = scope.get('dump_filename', 'dump.sqlite') dump_filename = '.'.join( (os.path.splitext(dump_filename)[0], 'quantities', 'log')) (period, quantities) = self.arguments['dump_quantities'] quantities = [q.strip() for q in quantities.split(',')] log_file = context.enter_context( storage.open(dump_filename, 'ab', on_filesystem=True, noop=scope['mpi_rank'])) hoomd.analyze.log(log_file.name, quantities, period) if self.arguments['dump_period']: dump_filename = scope.get('dump_filename', 'dump.sqlite') dump_file = context.enter_context( storage.open(dump_filename, 'ab', on_filesystem=True, noop=scope['mpi_rank'])) dynamic_quantities = ( ['viz_aniso_dynamic'] if 'type_shapes' in scope else ['viz_dynamic']) dynamic_quantities.extend(self.arguments.get('trajectory_quantities', [])) dump = hoomd.dump.getar.simple( dump_file.name, self.arguments['dump_period'], 'a', static=['viz_static'], dynamic=dynamic_quantities) if 'type_shapes' in scope: type_shapes = scope['type_shapes'] dump.writeJSON('type_shapes.json', type_shapes, True) def setup_compression(self, scope, storage, context): if not self.arguments.get('compress_to', None) and not self.arguments.get('expand_by', None): return dimensions = scope.get('dimensions', 3) if self.arguments.get('compress_to', None): current_phi = compute_packing_fraction(scope, storage, context.snapshot) volume_ratio = current_phi/self.arguments['compress_to'] length_ratio = volume_ratio**(1./dimensions) self.arguments['expand_by'] = length_ratio box = context.snapshot.box factor = self.arguments['expand_by'] times = [0, self.arguments['steps']] Lx = hoomd.variant.linear_interp( list(zip(times, [box.Lx, box.Lx*factor])), zero='now') Ly = hoomd.variant.linear_interp( list(zip(times, [box.Ly, box.Ly*factor])), zero='now') if dimensions == 2: Lz = box.Lz else: Lz = hoomd.variant.linear_interp( list(zip(times, [box.Lz, box.Lz*factor])), zero='now') updater = hoomd.update.box_resize(Lx=Lx, Ly=Ly, Lz=Lz) return updater def run_steps(self, scope, storage, context): hoomd.run_upto(scope['cumulative_steps'])
[docs] def run(self, scope, storage): callbacks = scope.setdefault('callbacks', collections.defaultdict(list)) scope['previous_steps'] = scope.get('cumulative_steps', 0) scope['cumulative_steps'] = (scope['previous_steps'] + self.arguments['steps']) with HoomdContext(scope, storage) as ctx: for c in callbacks['post_init']: c(scope, storage, ctx) if ctx.check_timesteps(): return self.setup_integrator(scope, storage, ctx) self.setup_dumps(scope, storage, ctx) self.setup_compression(scope, storage, ctx) for c in callbacks['pre_run']: c(scope, storage, ctx) self.run_steps(scope, storage, ctx) for c in callbacks['post_run']: c(scope, storage, ctx)
def compute_packing_fraction(scope, storage, snapshot): dimensions = scope.get('dimensions', 3) type_volumes = [] for shape in scope.get('type_shapes', []): volume = np.polyval(shape['rounding_volume_polynomial'], shape['rounding_radius']) type_volumes.append(volume) if not type_volumes: logger.warning('No shape information found, assuming particles are spheres with diameter 1') if dimensions == 2: type_volumes = len(snapshot.particles.types)*[np.pi*0.25] else: type_volumes = len(snapshot.particles.types)*[4./3*np.pi*0.125] type_volumes = np.array(type_volumes, dtype=np.float32) particle_volume = np.sum(type_volumes[snapshot.particles.typeid]) box_volume = snapshot.box.get_volume() return particle_volume/box_volume