Source code for hoomd_flowws.Run

import collections
import itertools
import logging
import os

import numpy as np

from .internal import intfloat, HoomdContext
import hoomd
import flowws
from flowws import Argument as Arg

logger = logging.getLogger(__name__)

[docs]@flowws.add_stage_arguments class Run(flowws.Stage): """Run for a given number of timesteps using MD""" ARGS = [ Arg('steps', '-s', intfloat, None, help='Number of timesteps to run'), Arg('timestep_size', None, float, 0.005, help='Timestep size'), Arg('integrator', None, str, required=True, help='Integrator type'), Arg('temperature', None, float, 1, help='Temperature for isothermal simulations'), Arg('ramp_t_to', None, float, None, help='Ramp temperature to the given value over time during this stage'), Arg('tau_t', None, float, 1, help='Thermostat time constant for isothermal simulations'), Arg('pressure', None, float, 1, help='Pressure for isobaric simulations'), Arg('tau_p', None, float, 10, help='Barostat time constant for isobaric simulations'), Arg('bd_seed', None, int, 12, help='Random number seed for Brownian/Langevin thermostats'), Arg('zero_momentum', '-z', intfloat, 0, help='Period for zeroing the momentum of the system (default: disabled)'), Arg('backup_period', '-b', intfloat, 0, help='Period for dumping a backup file'), Arg('dump_period', '-d', intfloat, 0, help='Period for dumping a trajectory file'), Arg('trajectory_quantities', None, [str], [], help='Additional trajectory quantities to dump'), Arg('dump_quantities', '-q', (intfloat, str), help='Dump period and comma-separated list of quantities to record (i.e. temperature,pressure)'), Arg('expand_by', None, float, help='Expand each dimension of the box by this ratio during this stage'), Arg('compress_to', None, float, help='Compress to the given packing fraction during this stage (overrides expand_by)'), ] def setup_integrator(self, scope, storage, context): integrator_type = self.arguments['integrator'] kT = self.arguments['temperature'] if self.arguments.get('ramp_t_to', None) is not None: times = [0, self.arguments['steps']] temperatures = [self.arguments['temperature'], self.arguments['ramp_t_to']] kT = hoomd.variant.linear_interp( list(zip(times, temperatures)), zero=scope['previous_steps']) if integrator_type == 'nve': integrator = elif integrator_type == 'nvt': tau = self.arguments['tau_t'] integrator =, kT=kT, tau=tau) elif integrator_type == 'langevin': seed = self.arguments['bd_seed'] integrator =, kT=kT, seed=seed) elif integrator_type == 'brownian': seed = self.arguments['bd_seed'] integrator =, kT=kT, seed=seed) elif integrator_type == 'npt': tau = self.arguments['tau_t'] pressure = self.arguments['pressure'] tauP = self.arguments['tau_p'] integrator =, kT=kT, tau=tau, P=pressure, tauP=tauP) else: raise NotImplementedError( 'Unknown integrator type {}'.format(integrator_type))['timestep_size']) if self.arguments['zero_momentum']:['zero_momentum']) return integrator def setup_dumps(self, scope, storage, context): if self.arguments['backup_period']: backup_filename = context.get_backup_filename() backup_file = context.enter_context(, 'wb', on_filesystem=True, noop=scope['mpi_rank'])) hoomd.dump.getar.simple(, self.arguments['backup_period'], '1', static=[], dynamic=['all']) if self.arguments.get('dump_quantities', None): dump_filename = scope.get('dump_filename', 'dump.sqlite') dump_filename = '.'.join( (os.path.splitext(dump_filename)[0], 'quantities', 'log')) (period, quantities) = self.arguments['dump_quantities'] quantities = [q.strip() for q in quantities.split(',')] log_file = context.enter_context(, 'ab', on_filesystem=True, noop=scope['mpi_rank'])) hoomd.analyze.log(, quantities, period) if self.arguments['dump_period']: dump_filename = scope.get('dump_filename', 'dump.sqlite') dump_file = context.enter_context(, 'ab', on_filesystem=True, noop=scope['mpi_rank'])) dynamic_quantities = ( ['viz_aniso_dynamic'] if 'type_shapes' in scope else ['viz_dynamic']) dynamic_quantities.extend(self.arguments.get('trajectory_quantities', [])) dump = hoomd.dump.getar.simple(, self.arguments['dump_period'], 'a', static=['viz_static'], dynamic=dynamic_quantities) if 'type_shapes' in scope: type_shapes = scope['type_shapes'] dump.writeJSON('type_shapes.json', type_shapes, True) def setup_compression(self, scope, storage, context): if not self.arguments.get('compress_to', None) and not self.arguments.get('expand_by', None): return dimensions = scope.get('dimensions', 3) if self.arguments.get('compress_to', None): current_phi = compute_packing_fraction(scope, storage, context.snapshot) volume_ratio = current_phi/self.arguments['compress_to'] length_ratio = volume_ratio**(1./dimensions) self.arguments['expand_by'] = length_ratio box = factor = self.arguments['expand_by'] times = [0, self.arguments['steps']] Lx = hoomd.variant.linear_interp( list(zip(times, [box.Lx, box.Lx*factor])), zero='now') Ly = hoomd.variant.linear_interp( list(zip(times, [box.Ly, box.Ly*factor])), zero='now') if dimensions == 2: Lz = box.Lz else: Lz = hoomd.variant.linear_interp( list(zip(times, [box.Lz, box.Lz*factor])), zero='now') updater = hoomd.update.box_resize(Lx=Lx, Ly=Ly, Lz=Lz) return updater def run_steps(self, scope, storage, context): hoomd.run_upto(scope['cumulative_steps'])
[docs] def run(self, scope, storage): callbacks = scope.setdefault('callbacks', collections.defaultdict(list)) scope['previous_steps'] = scope.get('cumulative_steps', 0) scope['cumulative_steps'] = (scope['previous_steps'] + self.arguments['steps']) with HoomdContext(scope, storage) as ctx: if ctx.check_timesteps(): return self.setup_integrator(scope, storage, ctx) self.setup_dumps(scope, storage, ctx) self.setup_compression(scope, storage, ctx) for c in callbacks['pre_run']: c(scope, storage, ctx) self.run_steps(scope, storage, ctx) for c in callbacks['post_run']: c(scope, storage, ctx)
def compute_packing_fraction(scope, storage, snapshot): dimensions = scope.get('dimensions', 3) type_volumes = [] for shape in scope.get('type_shapes', []): volume = np.polyval(shape['rounding_volume_polynomial'], shape['rounding_radius']) type_volumes.append(volume) if not type_volumes: logger.warning('No shape information found, assuming particles are spheres with diameter 1') if dimensions == 2: type_volumes = len(snapshot.particles.types)*[np.pi*0.25] else: type_volumes = len(snapshot.particles.types)*[4./3*np.pi*0.125] type_volumes = np.array(type_volumes, dtype=np.float32) particle_volume = np.sum(type_volumes[snapshot.particles.typeid]) box_volume = return particle_volume/box_volume