import collections
import contextlib
import itertools
import logging
import numpy as np
from .internal import intfloat, HoomdContext
import flowws
from flowws import Argument as Arg
import hoomd
import hoomd.hpmc
from . import Run
logger = logging.getLogger(__name__)
BOX_MOVE_NAMES = ['aspect', 'length', 'shear', 'volume', 'ln_volume']
[docs]@flowws.add_stage_arguments
class RunHPMC(Run):
"""Run for some number of steps using HPMC"""
ARGS = [
Arg('steps', '-s', intfloat, None, required=True,
help='Number of timesteps to run'),
Arg('integrator', '-i', str, None, required=True,
help='Integrator type'),
Arg('pressure', None, float, 1,
help='Pressure for isobaric simulations'),
Arg('backup_period', '-b', intfloat, 0,
help='Period for dumping a backup file'),
Arg('dump_period', '-d', intfloat, 0,
help='Period for dumping a trajectory file'),
Arg('trajectory_quantities', None, [str], [],
help='Additional trajectory quantities to dump'),
Arg('dump_quantities', '-q', (intfloat, str),
help='Dump period and comma-separated list of quantities to record (i.e. temperature,pressure)'),
Arg('expand_by', None, float,
help='Expand each dimension of the box by this ratio during this stage'),
Arg('compress_to', None, float,
help='Compress to the given packing fraction during this stage (overrides expand_by)'),
Arg('integrator_seed', None, int, 14,
help='Random number seed for integration method'),
Arg('tune', '-t', (float, int, intfloat),
metavar=('acceptance_ratio', 'epochs', 'steps'),
help='Tune move distances to achieve the target acceptance ratio '
'after updating a given number of epochs and running the given '
'number of steps at each epoch'),
Arg('box_move_aspect', None, (float, float), metavar=('distance', 'weight'),
help='Move distance and weight for box aspect ratio moves'),
Arg('box_move_length', None, (float, float), metavar=('distance', 'weight'),
help='Move distance and weight for box length moves'),
Arg('box_move_ln_volume', None, (float, float), metavar=('distance', 'weight'),
help='Move distance and weight for box log-volume moves'),
Arg('box_move_shear', None, (float, float), metavar=('distance', 'weight'),
help='Move distance and weight for box shear moves'),
Arg('box_move_volume', None, (float, float), metavar=('distance', 'weight'),
help='Move distance and weight for box volume moves'),
]
def setup_integrator(self, scope, storage, context):
system = scope['system']
particle_types = system.particles.types
integrator_type = self.arguments['integrator']
type_shapes = scope.get('type_shapes', dict(type='Sphere'))
shape_types = [shape['type'] for shape in type_shapes]
rounding_radii = [shape.get('rounding_radius', 0) for shape in type_shapes]
assert len(set(shape_types)) == 1, 'HPMC only supports systems with a single shape type (for example, all convex polyhedra)'
shape_type = shape_types[0]
if shape_type == 'Sphere':
integrator = hoomd.hpmc.integrate.sphere(self.arguments['integrator_seed'])
elif shape_type == 'Polygon':
if max(rounding_radii) > 0:
assert all(not is_concave(shape) for shape in type_shapes), 'Rounded polygons must be convex'
integrator = hoomd.hpmc.integrate.convex_spheropolygon(
self.arguments['integrator_seed'])
else:
if any(is_concave(shape) for shape in type_shapes):
integrator = hoomd.hpmc.integrate.simple_polygon(
self.arguments['integrator_seed'])
else:
integrator = hoomd.hpmc.integrate.convex_polygon(
self.arguments['integrator_seed'])
for (typ, shape) in zip(particle_types, type_shapes):
integrator.shape_param.set(
typ, vertices=shape['vertices'],
sweep_radius=shape.get('rounding_radius', 0))
elif shape_type == 'ConvexPolyhedron':
if max(rounding_radii) > 0:
integrator = hoomd.hpmc.integrate.convex_spheropolyhedron(
self.arguments['integrator_seed'])
for (typ, shape) in zip(particle_types, type_shapes):
integrator.shape_param.set(
typ, vertices=shape['vertices'],
sweep_radius=shape.get('rounding_radius', 0))
else:
integrator = hoomd.hpmc.integrate.convex_polyhedron(
self.arguments['integrator_seed'])
for (typ, shape) in zip(particle_types, type_shapes):
integrator.shape_param.set(
typ, vertices=shape['vertices'])
if integrator_type == 'nvt':
if any('box_move_{}'.format(name) in self.arguments
for name in ['length', 'volume', 'ln_volume']):
raise ArgumentError('NVT integration mode specified, but box moves were also enabled')
elif integrator_type == 'npt':
if all('box_move_{}'.format(name) not in self.arguments
for name in ['length', 'volume', 'ln_volume']):
raise ArgumentError('Must enable box moves for NPT simulations')
else:
raise NotImplementedError(integrator_type)
updater = self.setup_box_updater(scope, storage, context, integrator)
frame, _ = self.load_move_distance(scope, storage, context, integrator, updater)
# we should only create a tuner object if we haven't already
# done tuning for this stage
can_tune = frame < scope['cumulative_steps'] - self.arguments['steps']
if self.arguments.get('tune', None) and can_tune:
import gtar # throw ImportErrors before running any steps
(acceptance_ratio, epochs, period) = self.arguments['tune']
child_arguments = dict(self.arguments)
child_arguments['steps'] = epochs*period
del child_arguments['tune']
child_arguments['tune_acceptance_ratio'] = acceptance_ratio
child_arguments['tune_epochs'] = epochs
child_scope = dict(scope)
child_scope['cumulative_steps'] += epochs*period - self.arguments['steps']
TuneHPMC(**child_arguments).run(child_scope, storage)
# now the tuned move distances have been saved, so we can
# retrieve them for this stage
self.load_move_distance(scope, storage, context, integrator, updater)
scope['integrator'] = integrator
return integrator
def setup_box_updater(self, scope, storage, context, integrator):
should_create = any(self.arguments.get('box_move_{}'.format(name), False)
for name in BOX_MOVE_NAMES)
should_create |= self.arguments['integrator'] == 'npt'
updater = None
if should_create:
updater = hoomd.hpmc.update.boxmc(
integrator, self.arguments['pressure'],
self.arguments['integrator_seed'] + 20)
for name in BOX_MOVE_NAMES:
if 'box_move_{}'.format(name) not in self.arguments:
continue
(distance, weight) = self.arguments['box_move_{}'.format(name)]
getattr(updater, name)(delta=distance, weight=weight)
scope['box_updater'] = updater
return updater
def load_move_distance(self, scope, storage, context, integrator, updater):
import gtar
# per-type move distance arrays for translation/rotation/box
distances = {}
frame = -1
dump_filename = scope.get('dump_filename', 'dump.sqlite')
local_context = contextlib.ExitStack()
with local_context:
try:
dump_file = local_context.enter_context(storage.open(
dump_filename, 'rb', on_filesystem=True, noop=scope['mpi_rank']))
traj = local_context.enter_context(gtar.GTAR(dump_file.name, 'r'))
except (FileNotFoundError, RuntimeError):
return -1, {}
# grab per-type distance arrays
for (frame, trans) in traj.recordsNamed('type_translation_distance'):
if int(frame) <= scope['cumulative_steps']:
distances['translation'] = trans
frame = int(frame)
else:
break
for (frame, rot) in traj.recordsNamed('type_rotation_distance'):
if int(frame) <= scope['cumulative_steps']:
distances['rotation'] = rot
frame = int(frame)
else:
break
for name in BOX_MOVE_NAMES:
box_name = 'box_{}'.format(name)
distance_name = '{}_distance'.format(box_name)
for (frame, rot) in traj.recordsNamed(distance_name):
if int(frame) <= scope['cumulative_steps']:
distances[box_name] = rot
frame = int(frame)
else:
break
if distances:
types = context.snapshot.particles.types
kwargs = dict()
if 'translation' in distances:
kwargs['d'] = dict(zip(types, distances['translation']))
if 'rotation' in distances:
kwargs['a'] = dict(zip(types, distances['rotation']))
integrator.set_params(**kwargs)
for (key, distance) in distances.items():
if key.startswith('box_') and updater is not None:
name = key[4:]
getattr(updater, name)(delta=distance)
msg = 'Loaded tuned move distances from {} for frame {}: {}'.format(
dump_filename, frame, distances)
logger.debug(msg)
return frame, distances
class TuneHPMC(RunHPMC):
"""Helper class to be inserted as a stage immediately before a RunHPMC stage.
Sets up a move size tuner, runs a certain number of steps while
updating, and then discards the system state while saving the move
size state.
"""
ARGS = RunHPMC.ARGS + [
Arg('tune_acceptance_ratio', None, float),
Arg('tune_epochs', None, int),
]
def setup_dumps(self, scope, storage, context):
# disable dumps for this pseudo-stage
pass
def run_steps(self, scope, storage, context):
context.cancel_saving()
epochs = self.arguments['tune_epochs']
acceptance_ratio = self.arguments['tune_acceptance_ratio']
steps_per_epoch = self.arguments['steps']/epochs
tunables = ['d'] if 'type_shapes' not in scope else ['d', 'a']
integrator = scope['integrator']
tuner = hoomd.hpmc.util.tune(
integrator, tunables, target=acceptance_ratio)
box_tuner = None
updater = scope.get('box_updater', None)
if updater is not None:
tunable_map = dict(
length=['dLx', 'dLy', 'dLz'],
volume=['dV'],
ln_volume=['dlnV'],
shear=['dxy', 'dxz', 'dyz'],
aspect=[] # aspect tuning is not builtin
)
tunables = sum(
(tunable_map[name] for name in BOX_MOVE_NAMES if
'box_move_{}'.format(name) in self.arguments), [])
box_tuner = hoomd.hpmc.util.tune_npt(
updater, tunables, target=acceptance_ratio)
for epoch in range(epochs):
hoomd.run(steps_per_epoch)
tuner.update()
if box_tuner is not None:
box_tuner.update()
self.save_tune_results(scope, storage, context)
def save_tune_results(self, scope, storage, context):
import gtar
# step count before this stage
beginning_steps = scope['cumulative_steps'] - self.arguments['steps']
types = context.snapshot.particles.types
if scope['mpi_rank']:
return
integrator = scope['integrator']
translation, rotation = [], []
for t in types:
translation.append(integrator.get_d(t))
rotation.append(integrator.get_a(t))
updater = scope.get('box_updater', None)
box_distances = {}
if updater is not None:
for name in BOX_MOVE_NAMES:
box_distances[name] = getattr(updater, name)()['delta']
dump_filename = scope.get('dump_filename', 'dump.sqlite')
msg = 'Dumping tuned move distances to {}: translation {}, rotation {}'.format(
dump_filename, translation, rotation)
logger.debug(msg)
local_context = contextlib.ExitStack()
with local_context:
dump_file = local_context.enter_context(storage.open(
dump_filename, 'ab', on_filesystem=True, noop=scope['mpi_rank']))
getar_file = local_context.enter_context(gtar.GTAR(dump_file.name, 'a'))
path = 'hpmc/frames/{}/type_translation_distance.f32.uni'.format(beginning_steps)
getar_file.writePath(path, translation)
path = 'hpmc/frames/{}/type_rotation_distance.f32.uni'.format(beginning_steps)
getar_file.writePath(path, rotation)
for (key, value) in box_distances.items():
path = 'hpmc/frames/{}/box_{}_distance.f32.uni'.format(
beginning_steps, key)
getar_file.writePath(path, value)
def is_concave(polygon):
vertices = np.array(polygon['vertices'], dtype=np.float32)
delta = np.roll(vertices, -1, axis=0) - vertices
return np.any(np.cross(delta, np.roll(delta, -1, axis=0)) < 0)