Source code for sarkas.utilities.io
"""
Module handling the I/O for an MD run.
"""
import csv
import pickle
import re
import sys
import yaml
from copy import copy, deepcopy
from IPython import get_ipython
from numpy import float64
from numpy import load as np_load
from numpy import savetxt, savez, zeros
from numpy.random import randint
from os import listdir, mkdir
from os.path import basename, exists, join
from pyfiglet import Figlet, print_figlet
from warnings import warn
if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
# If you are using Jupyter Notebook
from tqdm import tqdm_notebook as tqdm
else:
# If you are using IPython or Python kernel
from tqdm import tqdm
FONTS = ["speed", "starwars", "graffiti", "chunky", "epic", "larry3d", "ogre"]
# Light Colors.
LIGHT_COLORS = [
"255;255;255",
"13;177;75",
"153;162;162",
"240;133;33",
"144;154;183",
"209;222;63",
"232;217;181",
"200;154;88",
"148;174;74",
"203;90;40",
]
# Dark Colors.
DARK_COLORS = ["24;69;49", "0;129;131", "83;80;84", "110;0;95"]
[docs]class InputOutput:
"""
Class handling the input and output functions of the MD run.
Parameters
----------
process : str
Name of the process class containing MD run info.
"""
electrostatic_equilibration: bool = False
eq_dump_dir: str = "dumps"
equilibration_dir: str = "Equilibration"
input_file: str = None # MD run input file.
job_dir: str = None
job_id: str = None
log_file: str = None
mag_dump_dir: str = "dumps"
magnetization_dir: str = "Magnetization"
magnetized: bool = False
preprocess_file: str = None
preprocessing: bool = False
preprocessing_dir: str = "PreProcessing"
process: str = "preprocessing"
processes_dir: str = None
prod_dump_dir: str = "dumps"
production_dir: str = "Production"
postprocessing_dir: str = "PostProcessing"
simulations_dir: str = "Simulations"
simulation_dir: str = "Simulation"
verbose: bool = False
xyz_dir: str = None
xyz_filename: str = None
def __repr__(self):
sortedDict = dict(sorted(self.__dict__.items(), key=lambda x: x[0].lower()))
disp = "InputOuput( \n"
for key, value in sortedDict.items():
disp += "\t{} : {}\n".format(key, value)
disp += ")"
return disp
def __copy__(self):
"""Make a shallow copy of the object using copy by creating a new instance of the object and copying its __dict__."""
# Create a new object
_copy = type(self)()
# copy the dictionary
_copy.__dict__.update(self.__dict__)
return _copy
[docs] def from_dict(self, input_dict: dict):
"""
Update attributes from input dictionary.
Parameters
----------
input_dict: dict
Dictionary to be copied.
"""
self.__dict__.update(input_dict)
[docs] def setup(self):
"""Create file paths and directories for the simulation."""
self.create_file_paths()
self.make_directories()
self.file_header()
[docs] def from_yaml(self, filename: str):
"""
Parse inputs from YAML file.
Parameters
----------
filename: str
Input YAML file.
Returns
-------
dics : dict
Content of YAML file parsed in a nested dictionary
"""
self.input_file = filename
with open(filename, "r") as stream:
dics = yaml.load(stream, Loader=yaml.FullLoader)
self.__dict__.update(dics["IO"])
if "Parameters" in dics.keys():
keyed = "Parameters"
for key, value in dics[keyed].items():
if key == "verbose":
self.verbose = value
if key == "magnetized":
self.magnetized = value
if key == "load_method":
self.load_method = value
if value[-7:] == "restart":
self.restart = True
else:
self.restart = False
if key == "preprocessing":
self.preprocessing = value
if "Integrator" in dics.keys():
keyed = "Integrator"
for key, value in dics[keyed].items():
if key == "electrostatic_equilibration":
self.electrostatic_equilibration = value
# rdf_nbins can be defined in either Parameters or Postprocessing. However, Postprocessing will always
# supersede Parameters choice.
if "Observables" in dics.keys():
for i in dics["Observables"]:
if "RadialDistributionFunction" in i.keys():
dics["Parameters"]["rdf_nbins"] = i["RadialDistributionFunction"]["no_bins"]
return dics
[docs] def create_file_paths(self):
"""Create all directories', subdirectories', and files' paths."""
if self.job_dir is None:
self.job_dir = basename(self.input_file).split(".")[0]
if self.job_id is None:
self.job_id = self.job_dir
self.job_dir = join(self.simulations_dir, self.job_dir)
# Create Processes directories
self.processes_dir = [
join(self.job_dir, self.preprocessing_dir),
join(self.job_dir, self.simulation_dir),
join(self.job_dir, self.postprocessing_dir),
]
# Redundancy
self.preprocessing_dir = self.processes_dir[0]
self.simulation_dir = self.processes_dir[1]
self.postprocessing_dir = self.processes_dir[2]
# Redirect to the correct process folder
if self.process == "preprocessing":
indx = 0
else:
# Note that Postprocessing needs the link to simulation's folder
# because that is where I look for energy files and pickle files
indx = 1
# Equilibration directory and sub_dir
self.equilibration_dir = join(self.processes_dir[indx], self.equilibration_dir)
self.eq_dump_dir = join(self.equilibration_dir, "dumps")
# Production dir and sub_dir
self.production_dir = join(self.processes_dir[indx], self.production_dir)
self.prod_dump_dir = join(self.production_dir, "dumps")
# Production phase filenames
self.prod_energy_filename = join(self.production_dir, "ProductionEnergy_" + self.job_id + ".csv")
self.prod_ptcls_filename = join(self.prod_dump_dir, "checkpoint_")
# Equilibration phase filenames
self.eq_energy_filename = join(self.equilibration_dir, "EquilibrationEnergy_" + self.job_id + ".csv")
self.eq_ptcls_filename = join(self.eq_dump_dir, "checkpoint_")
# Magnetic dir
if self.electrostatic_equilibration:
self.magnetization_dir = join(self.processes_dir[indx], self.magnetization_dir)
self.mag_dump_dir = join(self.magnetization_dir, "dumps")
# Magnetization phase filenames
self.mag_energy_filename = join(self.magnetization_dir, "MagnetizationEnergy_" + self.job_id + ".csv")
self.mag_ptcls_filename = join(self.mag_dump_dir, "checkpoint_")
if self.process == "postprocessing":
indx = 2 # Redirect to the correct folder
# Log File
if self.log_file is None:
self.log_file = join(self.processes_dir[indx], "log_" + self.job_id + ".out")
else:
self.log_file = join(self.processes_dir[indx], self.log_file)
[docs] def make_directories(self):
"""Create directories where to store MD results."""
# Check if the directories exist
if not exists(self.simulations_dir):
mkdir(self.simulations_dir)
if not exists(self.job_dir):
mkdir(self.job_dir)
# Create Process' directories and their subdir
for i in self.processes_dir:
if not exists(i):
mkdir(i)
# The following automatically create directories in the correct Process
if not exists(self.equilibration_dir):
mkdir(self.equilibration_dir)
if not exists(self.eq_dump_dir):
mkdir(self.eq_dump_dir)
if not exists(self.production_dir):
mkdir(self.production_dir)
if not exists(self.prod_dump_dir):
mkdir(self.prod_dump_dir)
if self.electrostatic_equilibration:
if not exists(self.magnetization_dir):
mkdir(self.magnetization_dir)
if not exists(self.mag_dump_dir):
mkdir(self.mag_dump_dir)
if self.preprocessing:
if not exists(self.preprocessing_dir):
mkdir(self.preprocessing_dir)
if not exists(self.postprocessing_dir):
mkdir(self.postprocessing_dir)
[docs] def file_header(self):
"""Create the log file and print the figlet if not a restart run."""
if not self.restart:
with open(self.log_file, "w+") as f_log:
figlet_obj = Figlet(font="starwars")
print(figlet_obj.renderText("Sarkas"), file=f_log)
print("An open-source pure-Python molecular dynamics suite for non-ideal plasmas.", file=f_log)
# Print figlet to screen if verbose
if self.verbose:
self.screen_figlet()
[docs] def simulation_summary(self, simulation):
"""
Print out to file a summary of simulation's parameters.
If verbose output then it will print twice: the first time to file and second time to screen.
Parameters
----------
simulation : :class:`sarkas.processes.Process`
Simulation's parameters
"""
screen = sys.stdout
f_log = open(self.log_file, "a+")
repeat = 2 if self.verbose else 1
# redirect printing to file
sys.stdout = f_log
# Print to file first then to screen if repeat == 2
while repeat > 0:
if simulation.parameters.load_method in ["production_restart", "prod_restart"]:
print("\n\n--------------------------- Production Restart -------------------------------------")
self.time_info(simulation)
elif simulation.parameters.load_method in ["equilibration_restart", "eq_restart"]:
print("\n\n------------------------ Equilibration Restart ----------------------------------")
self.time_info(simulation)
elif simulation.parameters.load_method in ["magnetization_restart", "mag_restart"]:
print("\n\n------------------------ Magnetization Restart ----------------------------------")
self.time_info(simulation)
elif self.process == "postprocessing":
# Header of process
process_title = "{:^80}".format(self.process.capitalize())
print("\n\n")
print(*["*" for i in range(50)])
print(process_title)
print(*["*" for i in range(50)])
print(f"\nJob ID: {self.job_id}")
print(f"Job directory: {self.job_dir}")
print(f"PostProcessing directory: \n{self.postprocessing_dir}")
print(f"\nEquilibration dumps directory: {self.eq_dump_dir}")
print(f"Production dumps directory: \n{self.prod_dump_dir}")
print(f"\nEquilibration Thermodynamics file: \n{self.eq_energy_filename}")
print(f"Production Thermodynamics file: \n{self.prod_energy_filename}")
else:
# Header of process
process_title = "{:^80}".format(self.process.capitalize())
print("\n\n")
print(*["*" for i in range(50)])
print(process_title)
print(*["*" for i in range(50)])
print(f"\nJob ID: {self.job_id}")
print(f"Job directory: {self.job_dir}")
print(f"\nEquilibration dumps directory: \n", {self.eq_dump_dir})
print(f"Production dumps directory: \n", {self.prod_dump_dir})
print(f"\nEquilibration Thermodynamics file: \n{self.eq_energy_filename}")
print(f"Production Thermodynamics file: \n{self.prod_energy_filename}")
print("\nPARTICLES:")
print(f"Total No. of particles = {simulation.parameters.total_num_ptcls}")
print(f"No. of species = {len(simulation.parameters.species_num)}")
for isp, sp in enumerate(simulation.species):
if sp.name != "electron_background":
print("Species ID: {}".format(isp))
sp.pretty_print(simulation.potential.type, simulation.parameters.units)
# Parameters Info
simulation.parameters.pretty_print()
# Potential Info
simulation.potential.pretty_print()
# Integrator
simulation.integrator.pretty_print()
repeat -= 1
sys.stdout = screen # Restore the original sys.stdout
f_log.close()
[docs] def time_stamp(self, time_stamp, t):
"""
Print out to screen elapsed times. If verbose output, print to file first and then to screen.
Parameters
----------
time_stamp : str
Array of time stamps.
t : float
Elapsed time.
"""
screen = sys.stdout
f_log = open(self.log_file, "a+")
repeat = 2 if self.verbose else 1
t_hrs, t_min, t_sec, t_msec, t_usec, t_nsec = t
# redirect printing to file
sys.stdout = f_log
while repeat > 0:
if "Potential Initialization" in time_stamp:
print("\n\n{:-^70} \n".format("Initialization Times"))
if t_hrs == 0 and t_min == 0 and t_sec <= 2:
print(f"\n{time_stamp} Time: {int(t_sec)} sec {int(t_msec)} msec {int(t_usec)} usec {int(t_nsec)} nsec")
else:
print(f"\n{time_stamp} Time: {int(t_hrs)} hrs {int(t_min)} min {int(t_sec)} sec")
repeat -= 1
sys.stdout = screen
f_log.close()
[docs] def timing_study(self, simulation):
"""
Info specific for timing study.
Parameters
----------
simulation : :class:`sarkas.processes.Process`
Process class containing the info to print.
"""
screen = sys.stdout
f_log = open(self.log_file, "a+")
repeat = 2 if self.verbose else 1
# redirect printing to file
sys.stdout = f_log
# Print to file first then to screen if repeat == 2
while repeat > 0:
print("\n\n------------ Conclusion ------------\n")
print("Suggested Mesh = [ {} , {} , {} ]".format(*simulation.potential.pppm_mesh))
print(
"Suggested Ewald parameter alpha = {:2.4f} / a_ws = {:1.6e} ".format(
simulation.potential.pppm_alpha_ewald * simulation.parameters.a_ws,
simulation.potential.pppm_alpha_ewald,
),
end="",
)
print("[1/cm]" if simulation.parameters.units == "cgs" else "[1/m]")
print(
"Suggested rcut = {:2.4f} a_ws = {:.6e} ".format(
simulation.potential.rc / simulation.parameters.a_ws, simulation.potential.rc
),
end="",
)
print("[cm]" if simulation.parameters.units == "cgs" else "[m]")
self.algorithm_info(simulation)
repeat -= 1
sys.stdout = screen # Restore the original sys.stdout
f_log.close()
[docs] def preprocess_sizing(self, sizes):
"""Print the estimated file sizes."""
screen = sys.stdout
f_log = open(self.log_file, "a+")
repeat = 2 if self.verbose else 1
# redirect printing to file
sys.stdout = f_log
while repeat > 0:
print("\n\n{:=^70} \n".format(" Filesize Estimates "))
size_GB, size_MB, size_KB, rem = convert_bytes(sizes[0, 0])
print("\nEquilibration:\n")
print(
"Checkpoint filesize: {} GB {} MB {} KB {} bytes".format(
int(size_GB), int(size_MB), int(size_KB), int(rem)
)
)
size_GB, size_MB, size_KB, rem = convert_bytes(sizes[0, 1])
print(
"Checkpoint folder size: {} GB {} MB {} KB {} bytes".format(
int(size_GB), int(size_MB), int(size_KB), int(rem)
)
)
if self.magnetized and self.electrostatic_equilibration:
print("\nMagnetization:\n")
size_GB, size_MB, size_KB, rem = convert_bytes(sizes[2, 0])
print(
"Checkpoint filesize: {} GB {} MB {} KB {} bytes".format(
int(size_GB), int(size_MB), int(size_KB), int(rem)
)
)
size_GB, size_MB, size_KB, rem = convert_bytes(sizes[2, 1])
print(
"Checkpoint folder size: {} GB {} MB {} KB {} bytes".format(
int(size_GB), int(size_MB), int(size_KB), int(rem)
)
)
size_GB, size_MB, size_KB, rem = convert_bytes(sizes[1, 0])
print("\nProduction:\n")
print(
"Checkpoint filesize: {} GB {} MB {} KB {} bytes".format(
int(size_GB), int(size_MB), int(size_KB), int(rem)
)
)
size_GB, size_MB, size_KB, rem = convert_bytes(sizes[1, 1])
print(
"Checkpoint folder size: {} GB {} MB {} KB {} bytes".format(
int(size_GB), int(size_MB), int(size_KB), int(rem)
)
)
size_GB, size_MB, size_KB, rem = convert_bytes(sizes[:, 1].sum())
print(
"\nTotal minimum needed space: {} GB {} MB {} KB {} bytes".format(
int(size_GB), int(size_MB), int(size_KB), int(rem)
)
)
repeat -= 1
sys.stdout = screen
f_log.close()
[docs] def preprocess_timing(self, str_id, t, loops):
"""Print times estimates of simulation to file first and then to screen if verbose."""
screen = sys.stdout
f_log = open(self.log_file, "a+")
repeat = 2 if self.verbose else 1
t_hrs, t_min, t_sec, t_msec, t_usec, t_nsec = t
# redirect printing to file
sys.stdout = f_log
while repeat > 0:
if str_id == "header":
print("\n\n{:=^70} \n".format(" Times Estimates "))
elif str_id == "GF":
print(
"Optimal Green's Function Time: \n"
"{} min {} sec {} msec {} usec {} nsec \n".format(
int(t_min), int(t_sec), int(t_msec), int(t_usec), int(t_nsec)
)
)
elif str_id in ["PP", "PM", "FMM"]:
print(f"Time of {str_id} acceleration calculation averaged over {loops - 1} steps:")
print(f"{int(t_min)} min {int(t_sec)} sec {int(t_msec)} msec {int(t_usec)} usec {int(t_nsec)} nsec \n")
elif str_id in ["Equilibration", "Magnetization", "Production"]:
print(f"Time of a single {str_id} step averaged over {loops - 1} steps:")
print(f"{int(t_min)} min {int(t_sec)} sec {int(t_msec)} msec {int(t_usec)} usec {int(t_nsec)} nsec \n")
if str_id == "Production":
print("\n\n{:-^70} \n".format(" Total Estimated Times "))
repeat -= 1
sys.stdout = screen
f_log.close()
[docs] def postprocess_info(self, simulation, write_to_file=False, observable=None):
"""
Print Post-processing info to file and/or screen in a reader-friendly format.
Parameters
----------
simulation : :class:`sarkas.processes.PostProcess`
PostProcess class.
write_to_file : bool
Flag for printing info also to file. Default= False.
observable : str
Observable whose info to print. Default = None.
Choices = ['header','rdf', 'ccf', 'dsf', 'ssf', 'vm']
"""
choices = ["header", "rdf", "ccf", "dsf", "ssf", "vd"]
msg = (
"Observable not defined. \n "
"Please choose an observable from this list \n"
"'rdf' = Radial Distribution Function, \n"
"'ccf' = Current Correlation Function, \n"
"'dsf' = Dynamic Structure Function, \n"
"'ssf' = Static Structure Factor, \n"
"'vd' = Velocity Distribution"
)
if observable is None:
raise ValueError(msg)
if observable not in choices:
raise ValueError(msg)
if write_to_file:
screen = sys.stdout
f_log = open(self.log_file, "a+")
repeat = 2 if self.verbose else 1
# redirect printing to file
sys.stdout = f_log
else:
repeat = 1
while repeat > 0:
if observable == "header":
# Header of process
process_title = "{:^80}".format(self.process.capitalize())
print("\n\n")
print(*["*" for i in range(50)])
print(process_title)
print(*["*" for i in range(50)])
elif observable == "rdf":
simulation.rdf.pretty_print()
elif observable == "ssf":
simulation.ssf.pretty_print()
elif observable == "dsf":
simulation.dsf.pretty_print()
elif observable == "ccf":
simulation.ccf.pretty_print()
elif observable == "vd":
simulation.vm.setup(simulation.parameters)
print("\nVelocity Moments:")
print("Maximum no. of moments = {}".format(simulation.vm.max_no_moment))
print("Maximum velocity moment = {}".format(int(2 * simulation.vm.max_no_moment)))
repeat -= 1
if write_to_file:
sys.stdout = screen
if write_to_file:
f_log.close()
[docs] @staticmethod
def screen_figlet():
"""
Print a colored figlet of Sarkas to screen.
"""
if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
# Assume white background in Jupyter Notebook
clr = DARK_COLORS[randint(0, len(DARK_COLORS))]
else:
# Assume dark background in IPython/Python Kernel
clr = LIGHT_COLORS[randint(0, len(LIGHT_COLORS))]
fnt = FONTS[randint(0, len(FONTS))]
print_figlet("\nSarkas\n", font=fnt, colors=clr)
print("\nAn open-source pure-python molecular dynamics suite for non-ideal plasmas.\n\n")
[docs] @staticmethod
def time_info(simulation):
"""
Print time simulation's parameters.
Parameters
----------
simulation : :class:`sarkas.processes.Process`
Process class containing the timing info and other parameters.
"""
warn(
"Deprecated feature. It will be removed in the v2.0.0 release.\n" "Use Integrator.pretty_print()",
category=DeprecationWarning,
)
simulation.integrator.pretty_print()
[docs] @staticmethod
def algorithm_info(simulation):
"""
Print algorithm information.
Parameters
----------
simulation : :class:`sarkas.processes.Process`
Process class containing the algorithm info and other parameters.
"""
warn(
"Deprecated feature. It will be removed in the v2.0.0 release. Use potential.method_pretty_print()",
category=DeprecationWarning,
)
simulation.potential.method_pretty_print()
[docs] @staticmethod
def potential_info(simulation):
"""
Print potential information.
Parameters
----------
simulation : :class:`sarkas.processes.Process`
Process class containing the potential info and other parameters.
"""
warn(
"Deprecated feature. It will be removed in the v2.0.0 release. Use potential.pot_pretty_print()",
category=DeprecationWarning,
)
simulation.potential.pot_pretty_print(simulation.potential)
[docs] def copy_params(self, params):
"""
Copy necessary parameters.
Parameters
----------
params: :class:`sarkas.core.Parameters`
Simulation's parameters.
"""
self.dt = params.dt
self.a_ws = params.a_ws
self.total_num_ptcls = params.total_num_ptcls
self.total_plasma_frequency = params.total_plasma_frequency
self.species_names = params.species_names.copy()
self.coupling = params.coupling_constant * params.T_desired
[docs] def setup_checkpoint(self, params):
"""
Assign attributes needed for saving dumps.
Parameters
----------
params : :class:`sarkas.core.Parameters`
General simulation parameters.
species : :class:`sarkas.plasma.Species`
List of Species classes.
"""
self.copy_params(params)
# Check whether energy files exist already
if not exists(self.prod_energy_filename):
# Create the Energy file
dkeys = ["Time", "Total Energy", "Total Kinetic Energy", "Potential Energy", "Temperature"]
if len(self.species_names) > 1:
for i, sp_name in enumerate(self.species_names):
dkeys.append("{} Kinetic Energy".format(sp_name))
dkeys.append("{} Potential Energy".format(sp_name))
dkeys.append("{} Temperature".format(sp_name))
data = dict.fromkeys(dkeys)
with open(self.prod_energy_filename, "w+") as f:
w = csv.writer(f)
w.writerow(data.keys())
if not exists(self.eq_energy_filename) and not params.load_method[-7:] == "restart":
# Create the Energy file
dkeys = ["Time", "Total Energy", "Total Kinetic Energy", "Potential Energy", "Temperature"]
if len(self.species_names) > 1:
for i, sp_name in enumerate(self.species_names):
dkeys.append("{} Kinetic Energy".format(sp_name))
dkeys.append("{} Potential Energy".format(sp_name))
dkeys.append("{} Temperature".format(sp_name))
data = dict.fromkeys(dkeys)
with open(self.eq_energy_filename, "w+") as f:
w = csv.writer(f)
w.writerow(data.keys())
if self.electrostatic_equilibration:
if not exists(self.mag_energy_filename) and not params.load_method[-7:] == "restart":
# Create the Energy file
dkeys = ["Time", "Total Energy", "Total Kinetic Energy", "Potential Energy", "Temperature"]
if len(self.species_names) > 1:
for i, sp_name in enumerate(self.species_names):
dkeys.append("{} Kinetic Energy".format(sp_name))
dkeys.append("{} Potential Energy".format(sp_name))
dkeys.append("{} Temperature".format(sp_name))
data = dict.fromkeys(dkeys)
with open(self.mag_energy_filename, "w+") as f:
w = csv.writer(f)
w.writerow(data.keys())
[docs] def save_pickle(self, simulation):
"""
Save all simulations parameters in pickle files.
Parameters
----------
simulation : :class:`sarkas.processes.Process`
Process class containing MD run info to save.
"""
file_list = ["parameters", "integrator", "potential", "species"]
# Redirect to the correct process folder
if self.process == "preprocessing":
indx = 0
else:
# Note that Postprocessing needs the link to simulation's folder
# because that is where I look for energy files and pickle files
indx = 1
for fl in file_list:
filename = join(self.processes_dir[indx], fl + ".pickle")
with open(filename, "wb") as pickle_file:
pickle.dump(simulation.__dict__[fl], pickle_file)
pickle_file.close()
[docs] def read_pickle(self, process):
"""
Read pickle files containing all the simulation information.
Parameters
----------
process : :class:`sarkas.processes.Process`
Process class containing MD run info to save.
"""
file_list = ["parameters", "integrator", "potential"]
# Redirect to the correct process folder
if self.process == "preprocessing":
indx = 0
else:
# Note that Postprocessing needs the link to simulation's folder
# because that is where I look for energy files and pickle files
indx = 1
for fl in file_list:
filename = join(self.processes_dir[indx], fl + ".pickle")
with open(filename, "rb") as handle:
data = pickle.load(handle)
process.__dict__[fl] = copy(data)
# Read species
filename = join(self.processes_dir[indx], "species.pickle")
process.species = []
with open(filename, "rb") as handle:
data = pickle.load(handle)
process.species = copy(data)
[docs] def read_pickle_single(self, class_to_read: str):
"""
Read the desired pickle file.
Parameters
----------
class_to_read : str
Name of the class to read.
Returns
-------
_copy : cls
Copy of desired class.
"""
# Redirect to the correct process folder
if self.process == "preprocessing":
indx = 0
else:
# Note that Postprocessing needs the link to simulation's folder
# because that is where I look for energy files and pickle files
indx = 1
filename = join(self.processes_dir[indx], class_to_read + ".pickle")
with open(filename, "rb") as pickle_file:
data = pickle.load(pickle_file)
_copy = deepcopy(data)
return _copy
[docs] def dump(self, phase, ptcls, it):
"""
Save particles' data to binary file for future restart.
Parameters
----------
phase : str
Simulation phase.
ptcls : :class:`sarkas.particles.Particles`
Particles data.
it : int
Timestep number.
"""
if phase == "production":
ptcls_file = self.prod_ptcls_filename + str(it)
tme = it * self.dt
savez(
ptcls_file,
id=ptcls.id,
names=ptcls.names,
pos=ptcls.pos,
vel=ptcls.vel,
acc=ptcls.acc,
cntr=ptcls.pbc_cntr,
rdf_hist=ptcls.rdf_hist,
virial=ptcls.virial,
time=tme,
)
energy_file = self.prod_energy_filename
elif phase == "equilibration":
ptcls_file = self.eq_ptcls_filename + str(it)
tme = it * self.dt
savez(
ptcls_file,
id=ptcls.id,
names=ptcls.names,
pos=ptcls.pos,
vel=ptcls.vel,
acc=ptcls.acc,
virial=ptcls.virial,
time=tme,
)
energy_file = self.eq_energy_filename
elif phase == "magnetization":
ptcls_file = self.mag_ptcls_filename + str(it)
tme = it * self.dt
savez(
ptcls_file,
id=ptcls.id,
names=ptcls.names,
pos=ptcls.pos,
vel=ptcls.vel,
acc=ptcls.acc,
virial=ptcls.virial,
time=tme,
)
energy_file = self.mag_energy_filename
kinetic_energies, temperatures = ptcls.kinetic_temperature()
potential_energies = ptcls.potential_energies()
# Save Energy data
data = {
"Time": it * self.dt,
"Total Energy": kinetic_energies.sum() + ptcls.potential_energy,
"Total Kinetic Energy": kinetic_energies.sum(),
"Potential Energy": ptcls.potential_energy,
"Total Temperature": ptcls.species_num.transpose() @ temperatures / ptcls.total_num_ptcls,
}
if len(temperatures) > 1:
for sp, kin in enumerate(kinetic_energies):
data[f"{self.species_names[sp]} Kinetic Energy"] = kin
data[f"{self.species_names[sp]} Potential Energy"] = potential_energies[sp]
data[f"{self.species_names[sp]} Temperature"] = temperatures[sp]
with open(energy_file, "a") as f:
w = csv.writer(f)
w.writerow(data.values())
[docs] def dump_xyz(self, phase: str = "production"):
"""
Save the XYZ file by reading Sarkas dumps.
Parameters
----------
phase : str
Phase from which to read dumps. 'equilibration' or 'production'.
dump_skip : int
Interval of dumps to skip. Default = 1
"""
if phase == "equilibration":
self.xyz_filename = join(self.equilibration_dir, "pva_" + self.job_id + ".xyz")
dump_dir = self.eq_dump_dir
else:
self.xyz_filename = join(self.production_dir, "pva_" + self.job_id + ".xyz")
dump_dir = self.prod_dump_dir
f_xyz = open(self.xyz_filename, "w+")
if not hasattr(self, "a_ws"):
params = self.read_pickle_single("parameters")
self.a_ws = params.a_ws
self.total_num_ptcls = params.total_num_ptcls
self.total_plasma_frequency = params.total_plasma_frequency
# Rescale constants. This is needed since OVITO has a small number limit.
pscale = 1.0 / self.a_ws
vscale = 1.0 / (self.a_ws * self.total_plasma_frequency)
ascale = 1.0 / (self.a_ws * self.total_plasma_frequency**2)
# Read the list of dumps and sort them in the correct (natural) order
dumps = listdir(dump_dir)
dumps.sort(key=num_sort)
for dump in tqdm(dumps, disable=not self.verbose):
data = self.read_npz(dump_dir, dump)
data["pos_x"] *= pscale
data["pos_y"] *= pscale
data["pos_z"] *= pscale
data["vel_x"] *= vscale
data["vel_y"] *= vscale
data["vel_z"] *= vscale
data["acc_x"] *= ascale
data["acc_y"] *= ascale
data["acc_z"] *= ascale
f_xyz.writelines("{0:d}\n".format(self.total_num_ptcls))
f_xyz.writelines("name x y z vx vy vz ax ay az\n")
savetxt(f_xyz, data, fmt="%s %.6e %.6e %.6e %.6e %.6e %.6e %.6e %.6e %.6e")
f_xyz.close()
[docs] @staticmethod
def read_npz(fldr: str, filename: str):
"""
Load particles' data from dumps.
Parameters
----------
fldr : str
Folder containing dumps.
filename: str
Name of the dump file to load.
Returns
-------
struct_array : numpy.ndarray
Structured data array.
"""
file_name = join(fldr, filename)
data = np_load(file_name, allow_pickle=True)
# Dev Notes: the old way of saving the xyz file by
# savetxt(f_xyz, np.c_[data["names"],data["pos"] ....]
# , fmt="%10s %.6e %.6e %.6e %.6e %.6e %.6e %.6e %.6e %.6e")
# was not working, because the columns of np.c_[] all have the same data type <U32
# which is in conflict with the desired fmt. i.e. data["names"] was not recognized as a string.
# So I have to create a new structured array and pass this. I could not think of a more Pythonic way.
struct_array = zeros(
data["names"].size,
dtype=[
("names", "U6"),
("pos_x", float64),
("pos_y", float64),
("pos_z", float64),
("vel_x", float64),
("vel_y", float64),
("vel_z", float64),
("acc_x", float64),
("acc_y", float64),
("acc_z", float64),
],
)
struct_array["names"] = data["names"]
struct_array["pos_x"] = data["pos"][:, 0]
struct_array["pos_y"] = data["pos"][:, 1]
struct_array["pos_z"] = data["pos"][:, 2]
struct_array["vel_x"] = data["vel"][:, 0]
struct_array["vel_y"] = data["vel"][:, 1]
struct_array["vel_z"] = data["vel"][:, 2]
struct_array["acc_x"] = data["acc"][:, 0]
struct_array["acc_y"] = data["acc"][:, 1]
struct_array["acc_z"] = data["acc"][:, 2]
return struct_array
[docs]def alpha_to_int(text):
"""Convert strings of numbers into integers.
Parameters
----------
text : str
Text to be converted into an int, if `text` is a number.
Returns
-------
_ : int, str
Integral number otherwise returns a string.
"""
return int(text) if text.isdigit() else text
[docs]def num_sort(text):
"""
Sort strings with numbers inside.
Parameters
----------
text : str
Text to be split into str and int
Returns
-------
: list
List containing text and integers
Notes
-----
Function copied from
https://stackoverflow.com/questions/5967500/how-to-correctly-sort-a-string-with-a-number-inside.
Originally from http://nedbatchelder.com/blog/200712/human_sorting.html
(See Toothy's implementation in the comments)
"""
return [alpha_to_int(c) for c in re.split(r"(\d+)", text)]
[docs]def convert_bytes(tot_bytes):
"""Convert bytes to human-readable GB, MB, KB.
Parameters
----------
tot_bytes : int
Total number of bytes.
Returns
-------
[GB, MB, KB, rem] : list
Bytes divided into Giga, Mega, Kilo bytes.
"""
GB, rem = divmod(tot_bytes, 1024 * 1024 * 1024)
MB, rem = divmod(rem, 1024 * 1024)
KB, rem = divmod(rem, 1024)
return [GB, MB, KB, rem]