"""
Module handling stages of an MD run: PreProcessing, Simulation, PostProcessing.
"""
from IPython import get_ipython
from threading import Thread
if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib.cm import get_cmap, ScalarMappable
from matplotlib.colors import LogNorm
from numpy import (
arange,
array,
int64,
linspace,
log10,
logspace,
meshgrid,
rint,
sqrt,
zeros,
)
from os import listdir, mkdir
from os import remove as os_remove
from os import stat as os_stat
from os.path import exists, join
from pandas import DataFrame, read_csv
from seaborn import scatterplot
from warnings import warn
from .core import Parameters
from .particles import Particles
from .plasma import Species
from .potentials.core import Potential
from .time_evolution.integrators import Integrator
from .tools.observables import (
CurrentCorrelationFunction,
DiffusionFlux,
DynamicStructureFactor,
ElectricCurrent,
PressureTensor,
RadialDistributionFunction,
StaticStructureFactor,
Thermodynamics,
VelocityAutoCorrelationFunction,
VelocityDistribution,
)
# Sarkas modules
from .utilities.io import InputOutput
from .utilities.maths import force_error_analytic_pp, force_error_approx_pppm
from .utilities.timing import SarkasTimer
[docs]class Process:
"""Stage of a Molecular Dynamics simulation. \n
This is the parent class for PreProcess, Simulation, and PostProcess.
Parameters
----------
input_file : str
Path to the YAML input file.
Attributes
----------
potential : sarkas.potential.base.Potential
Class handling the interaction between particles.
integrator : :class:`sarkas.time_evolution.integrators.Integrator`
Class handling the integrator.
thermostat : :class:`sarkas.time_evolution.thermostats.Thermostat`
Class handling the equilibration thermostat.
particles: :class:`sarkas.core.Particles`
Class handling particles properties.
parameters : :class:`sarkas.core.Parameters`
Class handling simulation's parameters.
species : list
List of :class:`sarkas.core.Species` classes.
input_file : str
Path to YAML input file.
timer : :class:`sarkas.utilities.timing.SarkasTimer`
Class handling the timing of processes.
io : :class:`sarkas.utilities.io.InputOutput`
Class handling the IO in Sarkas.
"""
[docs] def __init__(self, input_file: str = None):
self.potential = Potential()
self.integrator = Integrator()
# self.thermostat = Thermostat()
self.parameters = Parameters()
self.particles = Particles()
self.species = []
self.threads_ls = []
self.observables_list = []
self.input_file = input_file
self.timer = SarkasTimer()
self.io = InputOutput(process=self.__name__)
[docs] def common_parser(self, filename: str = None) -> None:
"""
Parse simulation parameters from YAML file.
Parameters
----------
filename: str
Input YAML file
"""
if filename:
self.input_file = filename
dics = self.io.from_yaml(self.input_file)
for lkey in dics:
if lkey == "Particles":
for species in dics["Particles"]:
spec = Species(species["Species"])
self.species.append(spec)
if lkey == "Potential":
self.potential.from_dict(dics[lkey])
if lkey == "Integrator":
self.integrator.from_dict(dics[lkey])
if lkey == "Parameters":
self.parameters.from_dict(dics[lkey])
# electron properties has been moved to the Parameters class. Therefore I need to put this here.
if hasattr(self.potential, "electron_temperature"):
self.parameters.electron_temperature = self.potential.electron_temperature
elif hasattr(self.potential, "electron_temperature_eV"):
self.parameters.electron_temperature_eV = self.potential.electron_temperature_eV
if self.__name__ != "simulation":
self.observables_list = []
for observable in dics["Observables"]:
for key, sub_dict in observable.items():
if key == "RadialDistributionFunction":
self.observables_list.append("rdf")
self.rdf = RadialDistributionFunction()
if sub_dict:
self.rdf.from_dict(sub_dict)
elif key == "Thermodynamics":
self.therm = Thermodynamics()
self.therm.from_dict(sub_dict)
self.observables_list.append("therm")
elif key == "DynamicStructureFactor":
self.observables_list.append("dsf")
self.dsf = DynamicStructureFactor()
if sub_dict:
self.dsf.from_dict(sub_dict)
elif key == "CurrentCorrelationFunction":
self.observables_list.append("ccf")
self.ccf = CurrentCorrelationFunction()
if sub_dict:
self.ccf.from_dict(sub_dict)
elif key == "StaticStructureFactor":
self.observables_list.append("ssf")
self.ssf = StaticStructureFactor()
if sub_dict:
self.ssf.from_dict(sub_dict)
elif key == "VelocityAutoCorrelationFunction":
self.observables_list.append("vacf")
self.vacf = VelocityAutoCorrelationFunction()
if sub_dict:
self.vacf.from_dict(sub_dict)
elif key == "VelocityDistribution":
self.observables_list.append("vd")
self.vm = VelocityDistribution()
if sub_dict:
self.vm.from_dict(sub_dict)
elif key == "ElectricCurrent":
self.observables_list.append("ec")
self.ec = ElectricCurrent()
if sub_dict:
self.ec.from_dict(sub_dict)
elif key == "DiffusionFlux":
self.observables_list.append("diff_flux")
self.diff_flux = DiffusionFlux()
if sub_dict:
self.diff_flux.from_dict(sub_dict)
elif key == "PressureTensor":
self.observables_list.append("p_tensor")
self.p_tensor = PressureTensor()
if sub_dict:
self.p_tensor.from_dict(sub_dict)
if "TransportCoefficients" in dics.keys():
self.transport_dict = dics["TransportCoefficients"].copy()
[docs] def evolve_loop(self, phase, thermalization, it_start, it_end, dump_step):
for it in tqdm(range(it_start, it_end), disable=not self.parameters.verbose):
# Calculate the Potential energy and update particles' data
self.integrator.update(self.particles)
if (it + 1) % dump_step == 0:
th = Thread(
target=self.io.dump,
name=f"Sarkas_{phase.capitalize()}_Thread - {it+1}",
args=(
phase,
self.particles.__deepcopy__(),
it + 1,
),
)
self.threads_ls.append(th)
th.start()
if thermalization and (it + 1 >= self.integrator.thermalization_timestep):
self.integrator.thermostate(self.particles)
# Wait for all the threads to finish
for x in self.threads_ls:
x.join()
self.threads_ls.clear()
[docs] def initialization(self) -> None:
"""Initialize all classes."""
# initialize the directories and filenames
self.io.setup()
# Copy relevant subsclasses attributes into parameters class. This is needed for post-processing.
# Update parameters' dictionary with filenames and directories
self.parameters.from_dict(self.io.__dict__)
self.parameters.potential_type = self.potential.type.lower()
self.parameters.setup(self.species)
t0 = self.timer.current()
self.potential.setup(self.parameters, self.species)
time_pot = self.timer.current()
self.parameters.cutoff_radius = self.potential.rc
self.integrator.setup(self.parameters, self.potential)
self.particles.setup(self.parameters, self.species)
time_ptcls = self.timer.current()
# Copy needed parameters for pretty print
self.parameters.dt = self.integrator.dt
self.parameters.equilibration_integrator = self.integrator.equilibration_type
self.parameters.production_integrator = self.integrator.production_type
if self.parameters.magnetized:
self.parameters.magnetization_integrator = self.integrator.magnetization_type
# For restart and backups.
self.io.setup_checkpoint(self.parameters)
self.io.save_pickle(self)
# Print Process summary to file and screen
self.io.simulation_summary(self)
time_end = self.timer.current()
# Print timing
self.io.time_stamp("Potential Initialization", self.timer.time_division(time_end - t0))
self.io.time_stamp("Particles Initialization", self.timer.time_division(time_ptcls - time_pot))
self.io.time_stamp("Total Simulation Initialization", self.timer.time_division(time_end - t0))
[docs] def setup(self, read_yaml: bool = False, other_inputs: dict = None):
"""Setup simulations' parameters and io subclasses.
Parameters
----------
read_yaml: bool
Flag for reading YAML input file. Default = False.
other_inputs: dict (optional)
Dictionary with additional simulations options.
"""
if read_yaml:
self.common_parser()
if other_inputs:
if not isinstance(other_inputs, dict):
raise TypeError("Wrong input type. " "other_inputs should be a nested dictionary")
for class_name, class_attr in other_inputs.items():
if class_name not in ["Particles", "Observables"]:
self.__dict__[class_name.lower()].__dict__.update(class_attr)
elif class_name == "Particles":
# Remember Particles should be a list of dict
# example:
# args = {"Particles" : [ { "Species" : { "name": "O" } } ] }
# Check if you already have a non-empty list of species
if isinstance(self.species, list):
# If so do you want to replace or update?
# Update species attributes
for sp, species in enumerate(other_inputs["Particles"]):
spec = Species(species["Species"])
if hasattr(spec, "replace"):
self.species[sp].__dict__.update(spec.__dict__)
else:
self.species.append(spec)
else:
# Append new species
for sp, species in enumerate(other_inputs["Particles"]):
spec = Species(species["Species"])
self.species.append(spec)
if class_name == "Observables":
for observable in class_attr:
for key, sub_dict in observable.items():
if key == "RadialDistributionFunction":
self.rdf = RadialDistributionFunction()
self.rdf.from_dict(sub_dict)
if key == "Thermodynamics":
self.therm = Thermodynamics()
self.therm.from_dict(sub_dict)
if key == "DynamicStructureFactor":
self.dsf = DynamicStructureFactor()
if sub_dict:
self.dsf.from_dict(sub_dict)
if key == "CurrentCorrelationFunction":
self.ccf = CurrentCorrelationFunction()
if sub_dict:
self.ccf.from_dict(sub_dict)
if key == "StaticStructureFactor":
self.ssf = StaticStructureFactor()
if sub_dict:
self.ssf.from_dict(sub_dict)
if key == "VelocityAutoCorrelationFunction":
self.vacf = VelocityAutoCorrelationFunction()
if sub_dict:
self.vacf.from_dict(sub_dict)
if key == "VelocityDistribution":
self.vm = VelocityDistribution()
if sub_dict:
self.vm.from_dict(sub_dict)
if key == "ElectricCurrent":
self.ec = ElectricCurrent()
if sub_dict:
self.ec.from_dict(sub_dict)
if self.__name__ == "postprocessing":
# Create the file paths without creating directories and redefining io attributes
self.io.create_file_paths()
# Read previously stored files
self.io.read_pickle(self)
# Print parameters to log file
if not exists(self.io.log_file):
self.io.simulation_summary(self)
# Initialize the observable classes
# for obs in self.observables_list:
# if obs in self.__dict__.keys():
# self.__dict__[obs].setup(self.parameters)
else:
self.initialization()
if self.parameters.plot_style:
plt.style.use(self.parameters.plot_style)
[docs]class PostProcess(Process):
"""
Class handling the post-processing stage of a simulation.
Parameters
----------
input_file : str
Path to the YAML input file.
"""
[docs] def __init__(self, input_file: str = None):
self.__name__ = "postprocessing"
super().__init__(input_file)
[docs] def setup_from_simulation(self, simulation):
"""
Setup postprocess' subclasses by (shallow) copying them from simulation object.
Parameters
----------
simulation: sarkas.core.processes.Simulation
Simulation object
"""
self.parameters = simulation.parameters.__copy__()
self.integrator = simulation.integrator.__copy__()
self.potential = simulation.potential.__copy__()
self.species = simulation.species.copy()
self.io = simulation.io.__copy__()
self.io.process = "postprocess"
[docs] def run(self):
"""Calculate all the observables from the YAML input file."""
if len(self.observables_list) == 0:
# Make Temperature and Energy plots
self.therm = Thermodynamics()
self.therm.setup(self.parameters)
if self.parameters.equilibration_steps > 0:
self.therm.temp_energy_plot(self, phase="equilibration")
self.therm.temp_energy_plot(self, phase="production")
# Calculate the RDF.
self.rdf = RadialDistributionFunction()
self.rdf.setup(self.parameters)
self.rdf.parse()
else:
for obs in self.observables_list:
# Check that the observable is actually there
if obs in self.__dict__.keys():
self.__dict__[obs].setup(self.parameters)
if obs == "therm":
self.therm.temp_energy_plot(self)
else:
self.io.postprocess_info(self, write_to_file=True, observable=obs)
self.__dict__[obs].compute()
# Calculate transport coefficients
if hasattr(self, "transport_dict"):
from .tools.transport import TransportCoefficients
tc = TransportCoefficients(self.parameters)
for coeff in self.transport_dict:
for key, coeff_kwargs in coeff.items():
if key.lower() == "diffusion":
# Calculate if not already
if not self.vacf:
self.vacf = VelocityAutoCorrelationFunction()
self.vacf.setup(self.parameters)
# Use parse in case you calculated it already
self.vacf.parse()
tc.diffusion(observable=self.vacf)
elif key.lower() == "interdiffusion":
if not self.diff_flux:
self.diff_flux = DiffusionFlux()
self.diff_flux.setup(self.parameters)
self.diff_flux.parse()
tc.interdiffusion(self.diff_flux)
elif key.lower() == "viscosity":
if not self.p_tensor:
self.p_tensor = PressureTensor()
self.p_tensor.setup(self.parameters)
self.p_tensor.parse()
tc.viscosity(self.p_tensor)
elif key.lower() == "electricalconductivity":
if not self.ec:
self.ec = ElectricCurrent()
self.ec.setup(self.parameters)
self.ec.parse()
tc.electrical_conductivity(self.ec)
[docs]class PreProcess(Process):
"""
Wrapper class handling the estimation of time and best parameters of a simulation.
Parameters
----------
input_file : str
Path to the YAML input file.
Attributes
----------
loops: int
Number of timesteps to run for time and size estimates. Default = 10
estimate: bool
Run an estimate for the best PPPM parameters in the simulation. Default=False.
pm_meshes: numpy.ndarray
Array of mesh sizes used in the PPPM parameters estimation.
pp_cells: numpy.ndarray
Array of simulations box cells used in the PPPM parameters estimation.
kappa: float
Screening parameter. Calculated from :meth:`sarkas.potentials.core.Potential.matrix`.
"""
[docs] def __init__(self, input_file: str = None):
self.__name__ = "preprocessing"
self.estimate = False
self.pm_meshes = logspace(3, 7, 12, base=2, dtype=int64)
# array([16, 24, 32, 48, 56, 64, 72, 88, 96, 112, 128], dtype=int64)
self.pm_caos = arange(1, 8)
self.pp_cells = arange(3, 16, dtype=int64)
self.kappa = None
super().__init__(input_file)
[docs] def analytical_approx_pppm(self):
"""Calculate the total force error as given in :cite:`Dharuman2017`."""
a_min = self.potential.pppm_alpha_ewald * 0.5
a_max = self.potential.pppm_alpha_ewald * 1.5
r_min = self.potential.rc * 0.5
r_max = self.potential.rc * 1.5
alphas = linspace(a_min, a_max, 101)
rcuts = linspace(r_min, r_max, 101)
pm_force_error = zeros(len(alphas))
pp_force_error = zeros((len(alphas), len(rcuts)))
total_force_error = zeros((len(alphas), len(rcuts)))
potential_copy = self.potential.__deepcopy__()
for ia, alpha in enumerate(alphas):
for ir, rc in enumerate(rcuts):
potential_copy.rc = rc
potential_copy.pppm_alpha_ewald = alpha
tot_err, pm_err, pp_err = force_error_approx_pppm(potential_copy)
total_force_error[ia, ir] = tot_err
pm_force_error[ia] = pm_err
pp_force_error[ia, ir] = pp_err
# for ia, alpha in enumerate(alphas):
# somma = 0.0
# for m in arange(p):
# expp = 2 * (m + p)
# somma += Cmp[m] * (2.0 / (1 + expp)) * betamp(m, p, alpha, kappa) * (h / 2.0) ** expp
# # eq.(36) in Dharuman J Chem Phys 146 024112 (2017)
# pm_force_error[ia] = sqrt(3.0 * somma) / (2.0 * pi)
# # eq.(35)
# pm_force_error *= sqrt(self.parameters.total_num_ptcls * self.parameters.a_ws ** 3 / self.parameters.box_volume)
# # Calculate the analytic PP error and the total force error
# if self.potential.type == "qsp":
# for (ir, rc) in enumerate(rcuts):
# pp_force_error[:, ir] = sqrt(2.0 * pi * kappa) * exp(-rc * kappa)
# pp_force_error[:, ir] *=
# for (ia, alfa) in enumerate(alphas):
# # eq.(42) from Dharuman J Chem Phys 146 024112 (2017)
# total_force_error[ia, ir] = sqrt(pm_force_error[ia] ** 2 + pp_force_error[ia, ir] ** 2)
# else:
# for (ir, rc) in enumerate(rcuts):
# for (ia, alfa) in enumerate(alphas):
# # eq.(30) from Dharuman J Chem Phys 146 024112 (2017)
# pp_force_error[ia, ir] = 2.0 * exp(-((0.5 * kappa / alfa) ** 2) - alfa ** 2 * rc ** 2) / sqrt(rc)
# pp_force_error[ia, ir] *= sqrt(
# self.parameters.total_num_ptcls * self.parameters.a_ws ** 3 / self.parameters.box_volume
# )
# # eq.(42) from Dharuman J Chem Phys 146 024112 (2017)
# total_force_error[ia, ir] = sqrt(pm_force_error[ia] ** 2 + pp_force_error[ia, ir] ** 2)
return (
total_force_error,
pp_force_error,
pm_force_error,
rcuts / potential_copy.a_ws,
alphas * potential_copy.a_ws,
)
[docs] def green_function_timer(self):
"""Time Potential setup."""
self.timer.start()
self.potential.pppm_setup()
return self.timer.stop()
[docs] def make_color_map(self, rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error):
"""
Plot a color map of the total force error approximation.
Parameters
----------
rcuts: numpy.ndarray
Cut off distances.
alphas: numpy.ndarray
Ewald parameters.
chosen_alpha: float
Chosen Ewald parameter.
chosen_rcut: float
Chosen cut off radius.
total_force_error: numpy.ndarray
Force error matrix.
Raises
------
DeprecationWarning
"""
warn(
f"The function has been renamed make_pppm_color_map. make_color_map will be removed in v2.0.0.",
DeprecationWarning,
)
# Line Plot
self.make_pppm_color_map(rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error)
[docs] @staticmethod
def make_fit_plot(pp_xdata, pm_xdata, pp_times, pm_times, pp_opt, pm_opt, pp_xlabels, pm_xlabels, fig_path):
"""
Make a dual plot of the fitted functions.
"""
fig, ax = plt.subplots(1, 2, figsize=(12, 7))
ax[0].plot(pm_xdata, pm_times.mean(axis=-1), "o", label="Measured times")
# ax[0].plot(pm_xdata, quadratic(pm_xdata, *pm_opt), '--r', label="Fit $f(x) = a + b x + c x^2$")
ax[1].plot(pp_xdata, pp_times.mean(axis=-1), "o", label="Measured times")
# ax[1].plot(pp_xdata, linear(pp_xdata, *pp_opt), '--r', label="Fit $f(x) = a x$")
ax[0].set_xscale("log")
ax[0].set_yscale("log")
ax[1].set_xscale("log")
ax[1].set_yscale("log")
ax[0].legend()
ax[1].legend()
ax[0].set_xticks(pm_xdata)
ax[0].set_xticklabels(pm_xlabels)
# Rotate the tick labels and set their alignment.
plt.setp(ax[0].get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
ax[1].set_xticks(pp_xdata[0:-1:3])
ax[1].set_xticklabels(pp_xlabels)
# Rotate the tick labels and set their alignment.
plt.setp(ax[1].get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
ax[0].set_title("PM calculation")
ax[1].set_title("PP calculation")
ax[0].set_xlabel("Mesh sizes")
ax[1].set_xlabel(r"$r_c / a_{ws}$")
fig.tight_layout()
fig.savefig(join(fig_path, "Timing_Fit.png"))
[docs] def make_line_plot(self, rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error):
"""
Plot selected values of the total force error approximation.
Parameters
----------
rcuts: numpy.ndarray
Cut off distances.
alphas: numpy.ndarray
Ewald parameters.
chosen_alpha: float
Chosen Ewald parameter.
chosen_rcut: float
Chosen cut off radius.
total_force_error: numpy.ndarray
Force error matrix.
Raises
------
: DeprecationWarning
"""
warn(
f"The function has been renamed make_pppm_line_plot. make_line_plot will be removed in v2.0.0.",
DeprecationWarning,
)
# Line Plot
self.make_pppm_line_plot(rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error)
[docs] def make_lagrangian_plot(self):
"TODO: complete this."
c_mesh, m_mesh = meshgrid(self.pp_cells, self.pm_meshes)
fig = plt.figure()
ax = fig.add_subplot(111) # projection='3d')
# CS = ax.plot_surface(m_mesh, c_mesh, self.lagrangian, rstride=1, cstride=1, cmap='viridis', edgecolor='none')
CS = ax.contourf(
m_mesh, c_mesh, self.lagrangian, norm=LogNorm(vmin=self.lagrangian.min(), vmax=self.lagrangian.max())
)
CS2 = ax.contour(CS, colors="w")
ax.clabel(CS2, fmt="%1.0e", colors="w")
fig.colorbar(CS)
ax.scatter(self.best_mesh, self.best_cells, s=200, c="k")
ax.set_xlabel("Mesh size")
ax.set_ylabel(r"Cells = $L/r_c$")
ax.set_title("2D Lagrangian")
fig.savefig(join(self.io.preprocessing_dir, "2D_Lagrangian.png"))
# ax[1].set_xticks([8, 16, 32, 64, 128])
# ax[1].set_xticklabels([8, 16, 32, 64, 128])
[docs] def make_pppm_line_plot(self, rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error):
"""
Plot selected values of the total force error approximation.
Parameters
----------
rcuts: numpy.ndarray
Cut off distances.
alphas: numpy.ndarray
Ewald parameters.
chosen_alpha: float
Chosen Ewald parameter.
chosen_rcut: float
Chosen cut off radius.
total_force_error: numpy.ndarray
Force error matrix.
"""
# Plot the results
fig_path = self.pppm_plots_dir
fig, ax = plt.subplots(1, 2, constrained_layout=True, figsize=(12, 7))
linestyles = [(0, (5, 10)), "dashed", "solid", "dashdot", (0, (3, 10, 1, 10))]
indexes = [30, 40, 50, 60, 70]
for lns, i in zip(linestyles, indexes):
ax[0].plot(rcuts, total_force_error[i, :], ls=lns, label=r"$\alpha a_{ws} = " + "{:.2f}$".format(alphas[i]))
ax[1].plot(alphas, total_force_error[:, i], ls=lns, label=r"$r_c = {:.2f}".format(rcuts[i]) + " a_{ws}$")
ax[0].set(ylabel=r"$\Delta F^{approx}_{tot}$", xlabel=r"$r_c/a_{ws}$", yscale="log")
ax[1].set(xlabel=r"$\alpha \; a_{ws}$", yscale="log")
ax[0].axvline(chosen_rcut, ls="--", c="k")
ax[0].axhline(self.potential.force_error, ls="--", c="k")
ax[1].axhline(self.potential.force_error, ls="--", c="k")
ax[1].axvline(chosen_alpha, ls="--", c="k")
if rcuts[-1] * self.parameters.a_ws > 0.5 * self.parameters.box_lengths.min():
ax[0].axvline(0.5 * self.parameters.box_lengths.min() / self.parameters.a_ws, c="r", label=r"$L/2$")
for a in ax:
a.grid(True, alpha=0.3)
a.legend(loc="best")
fig.suptitle(
r"Parameters $N = {}, \quad M = {}, \quad p = {}, \quad \kappa = {:.2f}$".format(
self.parameters.total_num_ptcls,
self.potential.pppm_mesh[0],
self.potential.pppm_cao[0],
self.parameters.a_ws / self.potential.screening_length,
)
)
fig.savefig(join(fig_path, "LinePlot_ForceError_" + self.io.job_id + ".png"))
[docs] def make_pppm_color_map(self, rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error):
"""
Plot a color map of the total force error approximation.
Parameters
----------
rcuts: numpy.ndarray
Cut off distances.
alphas: numpy.ndarray
Ewald parameters.
chosen_alpha: float
Chosen Ewald parameter.
chosen_rcut: float
Chosen cut off radius.
total_force_error: numpy.ndarray
Force error matrix.
"""
# Plot the results
fig_path = self.pppm_plots_dir
r_mesh, a_mesh = meshgrid(rcuts, alphas)
fig, ax = plt.subplots(1, 1, figsize=(10, 7))
# if total_force_error.min() == 0.0:
# minv = 1e-120
# else:
# minv = total_force_error.min()
# total_force_error[total_force_error == 0.0] = minv
CS = ax.contourf(a_mesh, r_mesh, total_force_error, norm=LogNorm())
CS2 = ax.contour(CS, colors="w")
ax.clabel(CS2, fmt="%1.0e", colors="w")
ax.scatter(chosen_alpha, chosen_rcut, s=200, c="k")
if rcuts[-1] * self.parameters.a_ws > 0.5 * self.parameters.box_lengths.min():
ax.axhline(0.5 * self.parameters.box_lengths.min() / self.parameters.a_ws, c="r", label=r"$L/2$")
# ax.tick_parameters(labelsize=fsz)
ax.set_xlabel(r"$\alpha \;a_{ws}$")
ax.set_ylabel(r"$r_c/a_{ws}$")
ax.set_title(
r"Parameters $N = {}, \quad M = {}, \quad p = {}, \quad \kappa = {:.2f}$".format(
self.parameters.total_num_ptcls,
self.potential.pppm_mesh[0],
self.potential.pppm_cao[0],
self.parameters.a_ws / self.potential.screening_length,
)
)
clb = fig.colorbar(CS)
clb.set_label(r"$\Delta F^{approx}_{tot}(r_c,\alpha)$", va="bottom", rotation=270)
fig.tight_layout()
fig.savefig(join(fig_path, "ClrMap_ForceError_" + self.io.job_id + ".png"))
[docs] def make_timing_plots(self, data_df: DataFrame = None):
"""
Makes a figure with three subplots of the CPU times vs PPPM parameters.\n
The first plot is PP acc time vs the number of cells at different Mesh sizes.\n
The second plot is the PM acc time vs the mesh size at different charge assignment orders.\n
The third plot is the time for the calculation of the optimal green's function
for different charge asssignment orders.
Parameters
----------
data_df : pandas.DataFrame, Optional
Timing study data. If `None` it will look for previously saved data, otherwise it will run
:meth:`sarkas.processes.PreProcess.timing_study_calculation` to calculate the data. Default is `None`.
"""
if not data_df:
try:
data_df = read_csv(
join(self.io.preprocessing_dir, f"TimingStudy_data_{self.io.job_id}.csv"), index_col=False
)
except FileNotFoundError:
print(f"I could not find the data from the timing study. Running the timing study now.")
self.timing_study_calculation()
else:
data_df = self.dataframe.copy(deep=True)
fig, ax = plt.subplots(1, 3, figsize=(21, 7))
scatterplot(data=data_df, x="pp_cells", y="pp_acc_time [ns]", hue="M_x", s=100, palette="viridis", ax=ax[0])
scatterplot(data=data_df, x="M_x", y="pm_acc_time [ns]", hue="pppm_cao_x", s=150, palette="viridis", ax=ax[1])
scatterplot(data=data_df, x="M_x", y="G_k time [ns]", hue="pppm_cao_x", s=150, palette="viridis", ax=ax[2])
# ax[0].legend(ncol = 2)
ax[0].set(yscale="log", xlabel="LCL Cells", ylabel="PP Time [ns]")
ax[1].set(yscale="log", xlabel="Mesh", ylabel="PM Time [ns]")
ax[2].set(yscale="log", xlabel="Mesh", ylabel="Green Function Time [ns]")
ax[1].set_xscale("log", base=2)
ax[2].set_xscale("log", base=2)
fig_path = self.pppm_plots_dir
fig.savefig(join(fig_path, f"PPPM_Times_{self.io.job_id}.png"))
[docs] def make_force_v_timing_plot(self, data_df: DataFrame = None):
"""Make contour maps of the force error and total acc time as functions of LCL cells and PM meshes for each
charge assignment order sequence.
Parameters
----------
data_df : pandas.DataFrame, Optional
Timing study data. If `None` it will look for previously saved data, otherwise it will run
:meth:`sarkas.processes.PreProcess.timing_study_calculation` to calculate the data. Default is `None`.
"""
from scipy.interpolate import griddata
fig_path = self.pppm_plots_dir
if not data_df:
try:
data_df = read_csv(
join(self.io.preprocessing_dir, f"TimingStudy_data_{self.io.job_id}.csv"), index_col=False
)
except FileNotFoundError:
print(f"I could not find the data from the timing study. Running the timing study now.")
self.timing_study_calculation()
else:
data_df = self.dataframe.copy(deep=True)
# Plot the results
for _, cao in enumerate(self.pm_caos):
mask = self.dataframe["pppm_cao_x"] == cao
df = data_df[mask][
[
"M_x",
"pp_cells",
"force error [measured]",
"pp_acc_time [ns]",
"pm_acc_time [ns]",
"tot_acc_time [ns]",
]
]
# 2D-arrays from DataFrame
x1 = linspace(df["M_x"].min(), df["M_x"].max(), len(df["M_x"].unique()))
y1 = linspace(df["pp_cells"].min(), df["pp_cells"].max(), len(df["pp_cells"].unique()))
m_mesh, c_mesh = meshgrid(x1, y1)
# Interpolate unstructured D-dimensional data.
tot_time_map = griddata((df["M_x"], df["pp_cells"]), df["tot_acc_time [ns]"], (m_mesh, c_mesh))
force_error_map = griddata((df["M_x"], df["pp_cells"]), df["force error [measured]"], (m_mesh, c_mesh))
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 9))
if force_error_map.min() == 0.0:
minv = 1e-120
else:
minv = force_error_map.min()
maxt = force_error_map.max()
nlvl = 12
lvls = logspace(log10(minv), log10(maxt), nlvl)
luxmap = get_cmap("viridis", nlvl)
luxnorm = LogNorm(vmin=minv, vmax=maxt)
CS = ax1.contourf(m_mesh, c_mesh, force_error_map, levels=lvls, cmap=luxmap, norm=luxnorm)
clb = fig.colorbar(ScalarMappable(norm=luxnorm, cmap=luxmap), ax=ax1)
clb.set_label(r"Force Error [$Q^2/ a_{\rm ws}^2$] " + f"@ cao = {cao}", rotation=270, va="bottom")
CS2 = ax1.contour(CS, colors="w")
ax1.clabel(CS2, fmt="%1.0e", colors="w")
if cao == self.potential.pppm_cao[0]:
input_Nc = int(self.potential.box_lengths[0] / self.potential.rc)
ax1.scatter(self.potential.pppm_mesh[0], input_Nc, s=200, c="k")
ax1.set_xlabel("Mesh size")
ax1.set_ylabel(r"LCL Cells")
ax1.set_title(f"Force Error Map @ cao = {cao}")
# Timing Plot
maxt = tot_time_map.max()
mint = tot_time_map.min()
# nlvl = 13
lvls = logspace(log10(mint), log10(maxt), nlvl)
luxmap = get_cmap("viridis", nlvl)
luxnorm = LogNorm(vmin=minv, vmax=maxt)
CS = ax2.contourf(m_mesh, c_mesh, tot_time_map, levels=lvls, cmap=luxmap)
CS2 = ax2.contour(CS, colors="w", levels=lvls)
ax2.clabel(CS2, fmt="%.2e", colors="w")
# fig.colorbar(, ax = ax2)
clb = fig.colorbar(ScalarMappable(norm=luxnorm, cmap=luxmap), ax=ax2)
clb.set_label("CPU Time [ns]", rotation=270, va="bottom")
if cao == self.potential.pppm_cao[0]:
input_Nc = int(self.potential.box_lengths[0] / self.potential.rc)
ax2.scatter(self.potential.pppm_mesh[0], input_Nc, s=200, c="k")
ax2.set_xlabel("Mesh size")
ax2.set_title(f"Timing Map @ cao = {cao}")
fig.savefig(join(fig_path, f"ForceErrorMap_v_Timing_cao_{cao}_{self.io.job_id}.png"))
[docs] def postproc_estimates(self):
# POST- PROCESSING
self.io.postprocess_info(self, write_to_file=True, observable="header")
if hasattr(self, "rdf"):
self.rdf.setup(self.parameters)
self.io.postprocess_info(self, write_to_file=True, observable="rdf")
if hasattr(self, "ssf"):
self.ssf.setup(self.parameters)
self.io.postprocess_info(self, write_to_file=True, observable="ssf")
if hasattr(self, "dsf"):
self.dsf.setup(self.parameters)
self.io.postprocess_info(self, write_to_file=True, observable="dsf")
if hasattr(self, "ccf"):
self.ccf.setup(self.parameters)
self.io.postprocess_info(self, write_to_file=True, observable="ccf")
if hasattr(self, "vm"):
self.ccf.setup(self.parameters)
self.io.postprocess_info(self, write_to_file=True, observable="vm")
[docs] def pppm_approximation(self):
"""
Calculate the force error for a PPPM simulation using analytical approximations.\n
Plot the force error in the parameter space.
"""
self.pppm_plots_dir = join(self.io.preprocessing_dir, "PPPM_Plots")
if not exists(self.pppm_plots_dir):
mkdir(self.pppm_plots_dir)
# Calculate Force error from analytic approximation given in Dharuman et al. J Chem Phys 2017
total_force_error, pp_force_error, pm_force_error, rcuts, alphas = self.analytical_approx_pppm()
chosen_alpha = self.potential.pppm_alpha_ewald * self.parameters.a_ws
chosen_rcut = self.potential.rc / self.parameters.a_ws
# mesh_dir = join(self.pppm_plots_dir, 'Mesh_{}'.format(self.potential.pppm_mesh[0]))
# if not exists(mesh_dir):
# mkdir(mesh_dir)
#
# cell_num = int(self.parameters.box_lengths.min() / self.potential.rc)
# cell_dir = join(mesh_dir, 'Cells_{}'.format(cell_num))
# if not exists(cell_dir):
# mkdir(cell_dir)
#
# self.pppm_plots_dir = cell_dir
# Color Map
self.make_pppm_color_map(rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error)
# Line Plot
self.make_pppm_line_plot(rcuts, alphas, chosen_alpha, chosen_rcut, total_force_error)
print(f"\nFigures can be found in {self.pppm_plots_dir}")
[docs] def remove_preproc_dumps(self):
# Delete the energy files created during the estimation runs
os_remove(self.io.eq_energy_filename)
os_remove(self.io.prod_energy_filename)
# Delete dumps created during the estimation runs
for npz in listdir(self.io.eq_dump_dir):
os_remove(join(self.io.eq_dump_dir, npz))
for npz in listdir(self.io.prod_dump_dir):
os_remove(join(self.io.prod_dump_dir, npz))
if self.parameters.magnetized and self.parameters.electrostatic_equilibration:
os_remove(self.io.mag_energy_filename)
# Remove dumps
for npz in listdir(self.io.mag_dump_dir):
os_remove(join(self.io.mag_dump_dir, npz))
[docs] def run(
self,
loops: int = 10,
timing: bool = True,
timing_study: bool = False,
pppm_estimate: bool = False,
postprocessing: bool = False,
remove: bool = False,
):
"""
Estimate the time of the simulation and best parameters if wanted.
Parameters
----------
loops : int
Number of loops over which to average the acceleration calculation. Default = 10.
timing : bool
Flag for estimating simulation times. Default =True.
timing_study : bool
Flag for estimating time for simulation parameters.
pppm_estimate : bool
Flag for showing the force error plots in case of pppm algorithm.
postprocessing : bool
Flag for calculating Post processing parameters.
remove : bool
Flag for removing energy files and dumps created during times estimation. Default = False.
"""
# Clean everything
plt.close("all")
# Set the screening parameter
self.kappa = self.potential.matrix[1, 0, 0] if self.potential.type == "yukawa" else 0.0
if timing:
self.time_n_space_estimates(loops=loops)
if remove:
self.remove_preproc_dumps()
if pppm_estimate:
self.pppm_approximation()
if timing_study:
self.timing_study_calculation()
self.make_timing_plots()
self.make_force_v_timing_plot()
print(f"\nFigures can be found in {self.pppm_plots_dir}")
if postprocessing:
self.postproc_estimates()
[docs] def time_acceleration(self, loops: int = 11):
"""
Run loops number of acceleration calculations for timing estimate.
Parameters
----------
loops: int
Number of simulation steps to run. Default = 11.
"""
if self.potential.linked_list_on:
self.pp_acc_time = zeros(loops)
for i in range(loops):
self.timer.start()
self.potential.update_linked_list(self.particles)
self.pp_acc_time[i] = self.timer.stop()
# Calculate the mean excluding the first value because that time include numba compilation time
pp_mean_time = self.timer.time_division(self.pp_acc_time[1:].mean())
self.io.preprocess_timing("PP", pp_mean_time, loops)
# PM acceleration
if self.potential.pppm_on:
self.pm_acc_time = zeros(loops)
for i in range(loops):
self.timer.start()
self.potential.update_pm(self.particles)
self.pm_acc_time[i] = self.timer.stop()
pm_mean_time = self.timer.time_division(self.pm_acc_time[1:].mean())
self.io.preprocess_timing("PM", pm_mean_time, loops)
if self.potential.method == "fmm":
self.fmm_acc_time = zeros(loops)
for i in range(loops):
self.timer.start()
self.integrator.update_accelerations(self.particles)
self.fmm_acc_time[i] = self.timer.stop()
fmm_mean_time = self.timer.time_division(self.fmm_acc_time[:].mean())
self.io.preprocess_timing("FMM", fmm_mean_time, loops)
[docs] def time_evolution_loop(self, loops: int = 11):
"""Run several loops of the equilibration and production phase to estimate the total time of the simulation.
Parameters
----------
loops: int
Number of simulation steps to run. Default = 11.
"""
if self.io.verbose:
print(f"\nRunning {loops} equilibration and production steps to estimate simulation times\n")
# Run few equilibration steps to estimate the equilibration time
if self.parameters.equilibration_phase and self.parameters.electrostatic_equilibration:
self.integrator.update = self.integrator.type_setup(self.integrator.equilibration_type)
self.timer.start()
self.evolve_loop("equilibration", self.integrator.thermalization, 0, loops, self.parameters.eq_dump_step)
self.eq_mean_time = self.timer.stop() / loops
# Print the average equilibration & production times
self.io.preprocess_timing("Equilibration", self.timer.time_division(self.eq_mean_time), loops)
if self.parameters.magnetized and self.parameters.electrostatic_equilibration:
self.integrator.update = self.integrator.type_setup(self.integrator.magnetization_type)
self.timer.start()
self.evolve_loop("magnetization", self.integrator.thermalization, 0, loops, self.parameters.mag_dump_step)
self.mag_mean_time = self.timer.stop() / loops
# Print the average equilibration & production times
self.io.preprocess_timing("Magnetization", self.timer.time_division(self.mag_mean_time), loops)
# Run few production steps to estimate the equilibration time
self.integrator.update = self.integrator.type_setup(self.integrator.production_type)
self.timer.start()
self.evolve_loop("production", False, 0, loops, self.parameters.prod_dump_step)
self.prod_mean_time = self.timer.stop() / loops
self.io.preprocess_timing("Production", self.timer.time_division(self.prod_mean_time), loops)
if self.parameters.equilibration_phase and self.parameters.electrostatic_equilibration:
# Print the estimate for the full run
eq_prediction = self.eq_mean_time * self.parameters.equilibration_steps
self.io.time_stamp("Equilibration", self.timer.time_division(eq_prediction))
else:
eq_prediction = 0.0
if self.parameters.magnetized and self.parameters.electrostatic_equilibration:
mag_prediction = self.mag_mean_time * self.parameters.magnetization_steps
self.io.time_stamp("Magnetization", self.timer.time_division(mag_prediction))
eq_prediction += mag_prediction
prod_prediction = self.prod_mean_time * self.parameters.production_steps
self.io.time_stamp("Production", self.timer.time_division(prod_prediction))
tot_time = eq_prediction + prod_prediction
self.io.time_stamp("Total Run", self.timer.time_division(tot_time))
[docs] def time_n_space_estimates(self, loops: int = 10):
"""Estimate simulation times and space
Parameters
----------
loops: int
Number of simulation steps to run. Default = 10.
"""
if loops:
loops += 1
self.io.preprocess_timing("header", [0, 0, 0, 0, 0, 0], 0)
if self.potential.pppm_on:
green_time = self.timer.time_division(self.green_function_timer())
self.io.preprocess_timing("GF", green_time, 0)
self.time_acceleration(loops)
self.time_evolution_loop(loops)
# Estimate size of dump folder
# Grab one file from the dump directory and get the size of it.
if not listdir(self.io.eq_dump_dir):
raise FileNotFoundError(
"Could not estimate the size of the equilibration phase dumps"
" because there are no dumps in the equilibration directory."
"Re-run .time_n_space_estimate(loops) with loops > eq_dump_step"
)
if not listdir(self.io.prod_dump_dir):
raise FileNotFoundError(
"Could not estimate the size of the production phase dumps because"
" there are no dumps in the production directory."
"Re-run .time_n_space_estimate(loops) with loops > prod_dump_step"
)
eq_dump_size = os_stat(join(self.io.eq_dump_dir, listdir(self.io.eq_dump_dir)[0])).st_size
eq_dump_fldr_size = eq_dump_size * (self.parameters.equilibration_steps / self.parameters.eq_dump_step)
# Grab one file from the dump directory and get the size of it.
prod_dump_size = os_stat(join(self.io.eq_dump_dir, listdir(self.io.eq_dump_dir)[0])).st_size
prod_dump_fldr_size = prod_dump_size * (self.parameters.production_steps / self.parameters.prod_dump_step)
# Prepare arguments to pass for print out
sizes = array([[eq_dump_size, eq_dump_fldr_size], [prod_dump_size, prod_dump_fldr_size]])
# Check for electrostatic equilibration
if self.parameters.magnetized and self.parameters.electrostatic_equilibration:
if not listdir(self.io.mag_dump_dir):
raise FileNotFoundError(
"Could not estimate the size of the magnetization phase dumps because"
" there are no dumps in the production directory."
"Re-run .time_n_space_estimate(loops) with loops > mag_dump_step"
)
dump = self.parameters.mag_dump_step
mag_dump_size = os_stat(join(self.io.mag_dump_dir, "checkpoint_" + str(dump) + ".npz")).st_size
mag_dump_fldr_size = mag_dump_size * (self.parameters.magnetization_steps / self.parameters.mag_dump_step)
sizes = array(
[
[eq_dump_size, eq_dump_fldr_size],
[prod_dump_size, prod_dump_fldr_size],
[mag_dump_size, mag_dump_fldr_size],
]
)
self.io.preprocess_sizing(sizes)
[docs] def timing_study_calculation(self):
"""Estimate the best number of mesh points and cutoff radius."""
self.pppm_plots_dir = join(self.io.preprocessing_dir, "PPPM_Plots")
if not exists(self.pppm_plots_dir):
mkdir(self.pppm_plots_dir)
print("\n\n{:=^70} \n".format(" Timing Study "))
self.input_rc = self.potential.rc
self.input_mesh = self.potential.pppm_mesh.copy()
self.input_alpha = self.potential.pppm_alpha_ewald
self.input_cao = self.potential.pppm_cao.copy()
data = DataFrame()
# Rescaling constant to calculate the PP force error
rescaling_constant = (
sqrt(self.potential.total_num_ptcls) * self.potential.a_ws**2 / sqrt(self.potential.pbox_volume)
)
# Set the maximum number of cells to be L / (2 * a_ws). 2* a_ws is the closest two particles can be.
max_cells = int(0.5 * self.parameters.box_lengths.min() / self.parameters.a_ws)
if max_cells != self.pp_cells[-1]:
self.pp_cells = arange(3, max_cells, dtype=int)
# Start the loop for averaging PM and PP acceleration times
for _, m in enumerate(
tqdm(self.pm_meshes, desc="Looping over the PM meshes", disable=not self.parameters.verbose)
):
# Setup PM params
self.potential.pppm_mesh = m * array([1, 1, 1], dtype=int)
self.potential.pppm_alpha_ewald = 0.3 * m / self.potential.box_lengths.min()
self.potential.pppm_h_array = self.potential.box_lengths / self.potential.pppm_mesh
print(f"\n Mesh = {m, m, m}:\n\t cao = ", end="")
for _, cao in enumerate(self.pm_caos):
print(f"{cao}, ", end="")
self.potential.pppm_cao = cao * array([1, 1, 1], dtype=int)
# Update the potential matrix since alpha has changed
self.potential.pot_update_params(self.potential)
# The Green's function depends on alpha, Mesh and cao. It also updates the pppm_pm_err
green_time = self.green_function_timer()
# Calculate the PM acceleration timing 3x and average
pm_acc_time = 0.0
for it in range(3):
self.timer.start()
self.potential.update_pm(self.particles)
pm_acc_time += self.timer.stop() / 3.0
# Loop over the number of cells
for _, cell in enumerate(self.pp_cells):
# Cutoff radius is the side of the cells.
self.potential.rc = self.potential.box_lengths.min() / cell
# Update the potential pp error
self.potential.pppm_pp_err = force_error_analytic_pp(
self.potential.type,
self.potential.rc,
self.potential.screening_length,
self.potential.pppm_alpha_ewald,
rescaling_constant,
)
# Note: the PM error does not depend on rc. Only on alpha and it is given by G_k
self.potential.force_error = sqrt(self.potential.pppm_pp_err**2 + self.potential.pppm_pm_err**2)
# The PP acceleration does not depend on cao.
# However, it still needs to be in its loop for updating the dataframe.
pp_acc_time = 0.0
for it in range(3):
self.timer.start()
self.potential.update_linked_list(self.particles)
pp_acc_time += self.timer.stop() / 3.0
# tot_pppm_err, pppm_pm_err, pppm_pp_err = force_error_approx_pppm(
# self.potential.matrix[1, 0, 0],
# self.potential.rc,
# self.potential.pppm_cao[0],
# self.potential.pppm_h_array[0],
# self.potential.pppm_alpha_ewald,
# )
data = data.append(
{
"pp_cells": cell,
"r_cut": self.potential.rc,
"pppm_alpha_ewald": self.potential.pppm_alpha_ewald,
"pppm_cao_x": self.potential.pppm_cao[0],
"pppm_cao_y": self.potential.pppm_cao[1],
"pppm_cao_z": self.potential.pppm_cao[2],
"M_x": self.potential.pppm_mesh[0],
"M_y": self.potential.pppm_mesh[1],
"M_z": self.potential.pppm_mesh[2],
"Mesh volume": self.potential.pppm_mesh.prod(),
"Mesh": f"{self.potential.pppm_mesh[0], self.potential.pppm_mesh[1], self.potential.pppm_mesh[2]}",
"h_x": self.potential.pppm_h_array[0],
"h_y": self.potential.pppm_h_array[1],
"h_z": self.potential.pppm_h_array[2],
"h_M volume": self.potential.pppm_h_array.prod(),
"h_x alpha": self.potential.pppm_h_array[0] * self.potential.pppm_alpha_ewald,
"h_y alpha": self.potential.pppm_h_array[1] * self.potential.pppm_alpha_ewald,
"h_z alpha": self.potential.pppm_h_array[2] * self.potential.pppm_alpha_ewald,
"h_M a_ws^3": self.potential.pppm_h_array.prod() * self.potential.pppm_alpha_ewald**3,
"G_k time [ns]": green_time * 1e-9,
"pp_acc_time [ns]": pp_acc_time * 1e-9,
"pm_acc_time [ns]": pm_acc_time * 1e-9,
"tot_acc_time [ns]": (pp_acc_time + pm_acc_time) * 1e-9,
"pppm_pp_error [measured]": self.potential.pppm_pp_err,
"pppm_pm_error [measured]": self.potential.pppm_pm_err,
"force error [measured]": self.potential.force_error
# "pppm_pp_error [measured]": pppm_pp_err,
# "pppm_pm_error [measured]": pppm_pm_err,
# "force error [measured]": tot_pppm_err,
},
ignore_index=True,
)
self.dataframe = data
self.dataframe.to_csv(join(self.io.preprocessing_dir, f"TimingStudy_data_{self.io.job_id}.csv"), index=False)
# Reset the original values.
self.potential.rc = self.input_rc
self.potential.pppm_mesh = self.input_mesh.copy()
self.potential.pppm_alpha_ewald = self.input_alpha
self.potential.pppm_cao = self.input_cao.copy()
self.potential.setup(self.parameters, self.species)
# pm_popt = zeros((len(self.pm_caos), 2))
# for ic, cao in enumerate(self.pm_caos):
# # Fit the PM times
# pm_popt[ic, :], _ = curve_fit(
# lambda x, a, b: a + 5 * b * x ** 3 * log2(x ** 3), self.pm_meshes, pm_times[:, ic]
# )
# fit_str = (
# r"Fit = $a_2 + 5 a_3 M^3 \log_2(M^3)$ [s]"
# + "\n"
# + r"$a_2 = ${:.4e}, $a_3 = ${:.4e} ".format(pm_popt[ic, 0], pm_popt[ic, 1])
# )
# print(f"\nPM Time for cao {cao}: " + fit_str)
#
# # Fit the PP Times
# pp_popt, _ = curve_fit(
# lambda x, a, b: a + b / x ** 3,
# self.pp_cells,
# pp_times.mean(axis=0),
# p0=[pp_times.mean(axis=0)[0], self.parameters.total_num_ptcls],
# bounds=(0, [pp_times.mean(axis=0)[0], 1e9]),
# )
# fit_pp_str = (
# r"Fit = $a_0 + a_1 / N_c^3$ [s]"
# + "\n"
# + "$a_0 = ${:.4e}, $a_1 = ${:.4e}".format(pp_popt[0], pp_popt[1])
# )
# print(f"\nPP Time cao {cao}:" + fit_pp_str)
#
# # Start the plot
# fig, (ax_pp, ax_pm) = plt.subplots(1, 2, sharey=True, figsize=(12, 7))
# ax_pm.plot(self.pm_meshes, pm_times[:, ic], "o", label=f"Measured cao: {cao}")
# ax_pm.plot(
# self.pm_meshes,
# pm_popt[ic, 0] + 5 * pm_popt[ic, 1] * self.pm_meshes ** 3 * log2(self.pm_meshes ** 3),
# ls="--",
# label="Fit",
# )
# ax_pm.annotate(
# text=fit_str,
# xy=(self.pm_meshes[-1], pm_times[-1, ic]),
# xytext=(self.pm_meshes[0], pm_times[-1, ic]),
# bbox=dict(boxstyle="round4", fc="white", ec="k", lw=2),
# )
#
# ax_pm.set(title=f"PM calculation time and estimate @ cao = {cao}", yscale="log", xlabel="Mesh size")
# ax_pm.set_xscale("log", base=2)
# ax_pm.legend(ncol=2)
#
# # Scatter Plot the PP Times
# self.tot_time_map = zeros(pp_times.shape)
# for j, mesh_points in enumerate(self.pm_meshes):
# self.tot_time_map[j, :] = pm_times[j, ic] + pp_times[j, :]
# ax_pp.plot(self.pp_cells, pp_times[j], "o", label=r"@ Mesh {}$^3$".format(mesh_points))
#
# # Plot the Fit PP times
# ax_pp.plot(self.pp_cells, pp_popt[0] + pp_popt[1] / self.pp_cells ** 3, ls="--", label="Fit")
# ax_pp.legend(ncol=2)
# ax_pp.annotate(
# text=fit_pp_str,
# xy=(self.pp_cells[0], pp_times[0, 0]),
# xytext=(self.pp_cells[0], pp_times[-1, -1]),
# bbox=dict(boxstyle="round4", fc="white", ec="k", lw=2),
# )
# ax_pp.set(
# title=f"PP calculation time and estimate @ cao = {cao}",
# yscale="log",
# ylabel="CPU Times [s]",
# xlabel=r"$N_c $ = Cells",
# )
# fig.tight_layout()
# fig.savefig(join(self.pppm_plots_dir, f"Times_cao_{cao}_" + self.io.job_id + ".png"))
#
# self.make_force_v_timing_plot(ic)
# self.lagrangian = np.empty((len(self.pm_meshes), len(self.pp_cells)))
# self.tot_times = np.empty((len(self.pm_meshes), len(self.pp_cells)))
# self.pp_times = pp_times.copy()
# self.pm_times = pm_times.copy()
# for i in range(len(self.pm_meshes)):
# self.tot_times[i, :] = pp_times[i] + pm_times[i]
# self.lagrangian[i, :] = self.force_error_map[i, :]
#
# best = np.unravel_index(self.lagrangian.argmin(), self.lagrangian.shape)
# self.best_mesh = self.pm_meshes[best[0]]
# self.best_cells = self.pp_cells[best[1]]
# self.make_lagrangian_plot()
#
# # set the best parameter
# self.potential.pppm_mesh = self.best_mesh * np.ones(3, dtype=int)
# self.potential.rc = self.parameters.box_lengths.min() / self.best_cells
# self.potential.pppm_alpha_ewald = 0.3 * self.best_mesh / self.parameters.box_lengths.min()
# self.potential.pppm_setup(self.parameters)
#
# # print report
# self.io.timing_study(self)
# # time prediction
# self.predicted_times = pp_times[best] + pm_times[best[0]]
# # Print estimate of run times
# self.io.time_stamp('Equilibration',
# self.timer.time_division(self.predicted_times * self.parameters.equilibration_steps))
# self.io.time_stamp('Production',
# self.timer.time_division(self.predicted_times * self.parameters.production_steps))
# self.io.time_stamp('Total Run',
# self.timer.time_division(self.predicted_times * (self.parameters.equilibration_steps
# + self.parameters.production_steps)))
[docs]class Simulation(Process):
"""
Sarkas simulation wrapper. This class manages the entire simulation and its small moving parts.
Parameters
----------
input_file : str
Path to the YAML input file.
"""
[docs] def __init__(self, input_file: str = None):
self.__name__ = "simulation"
super().__init__(input_file)
[docs] def check_restart(self, phase):
"""
Check if the simulation is a restart.
Parameters
----------
phase: str
Simulation phase, e.g. equilibration, Magnetization, production.
Returns
-------
it_start: int
Restart step.
"""
if self.parameters.verbose:
print(f"\n\n{phase.capitalize():-^70} \n")
# Check if this is restart
if self.parameters.load_method[:2] == phase[:2] and self.parameters.load_method[-7:] == "restart":
it_start = self.parameters.restart_step
else:
it_start = 0
self.io.dump(phase, self.particles, 0)
return it_start
[docs] def equilibrate(self) -> None:
"""
Run the time integrator with the thermostat to evolve the system to its thermodynamics equilibrium state.
"""
it_start = self.check_restart(phase="equilibration")
self.integrator.update = self.integrator.type_setup(self.integrator.equilibration_type)
# Start timer, equilibrate, and print run time.
self.timer.start()
self.evolve_loop(
"equilibration",
self.integrator.thermalization,
it_start,
self.parameters.equilibration_steps,
self.parameters.eq_dump_step,
)
time_eq = self.timer.stop()
self.io.time_stamp("Equilibration", self.timer.time_division(time_eq))
# def evolve_loop(self, phase, it_start, it_end, dump_step):
#
# for it in tqdm(range(it_start, it_end), disable=not self.parameters.verbose):
# # Calculate the Potential energy and update particles' data
#
# self.integrator.update(self.particles)
#
# if (it + 1) % dump_step == 0:
# th = Thread(
# target=self.io.dump,
# name=f"Sarkas_{phase.capitalize()}_Thread - {it + 1}",
# args=(phase, self.particles.__deepcopy__(), it + 1,),
# )
#
# self.threads_ls.append(th)
#
# th.start()
#
# # Wait for all the threads to finish
# for x in self.threads_ls:
# x.join()
#
# self.threads_ls.clear()
[docs] def magnetize(self) -> None:
# Check for magnetization phase
it_start = self.check_restart(phase="magnetization")
# Update integrator
self.integrator.update = self.integrator.type_setup(self.integrator.magnetization_type)
# Start timer, magnetize, and print run time.
self.timer.start()
self.evolve_loop(
"magnetization",
self.integrator.thermalization,
it_start,
self.parameters.magnetization_steps,
self.parameters.mag_dump_step,
)
time_eq = self.timer.stop()
self.io.time_stamp("Magnetization", self.timer.time_division(time_eq))
[docs] def produce(self) -> None:
it_start = self.check_restart(phase="production")
self.integrator.update = self.integrator.type_setup(self.integrator.production_type)
# Update measurement flag for rdf.
self.potential.measure = True
self.timer.start()
self.evolve_loop("production", False, it_start, self.parameters.production_steps, self.parameters.prod_dump_step)
time_eq = self.timer.stop()
self.io.time_stamp("Production", self.timer.time_division(time_eq))
[docs] def run(self) -> None:
"""Run the simulation."""
time0 = self.timer.current()
if self.parameters.equilibration_phase and self.parameters.electrostatic_equilibration:
self.equilibrate()
if self.parameters.magnetization_phase:
self.magnetize()
if self.parameters.production_phase:
self.produce()
time_tot = self.timer.current()
self.io.time_stamp("Total", self.timer.time_division(time_tot - time0))