"""
Module for calculating physical quantities from Sarkas checkpoints.
"""
from IPython import get_ipython
if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
import matplotlib.pyplot as plt
import pandas as pd
import scipy.stats as scp_stats
from matplotlib.gridspec import GridSpec
from numba import njit
from numpy import append as np_append
from numpy import (
argsort,
array,
complex128,
concatenate,
exp,
format_float_scientific,
histogram,
isfinite,
load,
ndarray,
pi,
real,
repeat,
savez,
sqrt,
trapz,
unique,
zeros,
)
from numpy.polynomial import hermite_e
from os import listdir, mkdir
from os import remove as os_remove
from os.path import exists as os_path_exists
from os.path import join as os_path_join
from pandas import concat, DataFrame, HDFStore, MultiIndex, read_csv, read_hdf, Series
from pickle import dump
from pickle import load as pickle_load
from scipy.fft import fft, fftfreq, fftshift
from scipy.linalg import norm
from scipy.special import factorial
from seaborn import histplot as sns_histplot
from ..utilities.maths import correlationfunction
from ..utilities.timing import SarkasTimer
UNITS = [
# MKS Units
{
"Energy": "J",
"Time": "s",
"Length": "m",
"Charge": "C",
"Temperature": "K",
"ElectronVolt": "eV",
"Mass": "kg",
"Magnetic Field": "T",
"Current": "A",
"Power": "erg/s",
"Pressure": "Pa",
"Electrical Conductivity": "S/m",
"Diffusion": r"m$^2$/s",
"Bulk Viscosity": r"kg/m s",
"Shear Viscosity": r"kg/m s",
"none": "",
},
# CGS Units
{
"Energy": "erg",
"Time": "s",
"Length": "m",
"Charge": "esu",
"Temperature": "K",
"ElectronVolt": "eV",
"Mass": "g",
"Magnetic Field": "G",
"Current": "esu/s",
"Power": "erg/s",
"Pressure": "Ba",
"Electrical Conductivity": "mho/m",
"Diffusion": r"m$^2$/s",
"Bulk Viscosity": r"g/ cm s",
"Shear Viscosity": r"g/ cm s",
"none": "",
},
]
PREFIXES = {
"y": 1.0e-24, # yocto
"z": 1.0e-21, # zepto
"a": 1.0e-18, # atto
"f": 1.0e-15, # femto
"p": 1.0e-12, # pico
"n": 1.0e-9, # nano
r"$\mu$": 1.0e-6, # micro
"m": 1.0e-3, # milli
"c": 1.0e-2, # centi
"": 1.0,
"k": 1e3, # kilo
"M": 1e6, # mega
"G": 1e9, # giga
"T": 1e12, # tera
"P": 1e15, # peta
"E": 1e18, # exa
"Z": 1e21, # zetta
"Y": 1e24, # yotta
}
[docs]def compute_doc(func):
func.__doc__ = """
Calculate the observable (and its autocorrelation function). See class doc for exact quantities. \n
The data of each slice is saved in hierarchical dataframes,
:py:attr:`~.dataframe_slices` (:py:attr:`~.dataframe_acf_slices`). \n
The sliced averaged data is saved in other hierarchical dataframes,
:py:attr:`~.dataframe` (:py:attr:`~.dataframe_acf_slices`).
"""
return func
[docs]def setup_doc(func):
func.__doc__ = """
Assign attributes from simulation's parameters.
Parameters
----------
params : :class:`sarkas.core.Parameters`
Simulation's parameters.
phase : str, optional
Phase to compute. Default = 'production'.
no_slices : int, optional
Number of independent runs inside a long simulation. Default = 1.
**kwargs :
These will overwrite any :class:`sarkas.core.Parameters`
or default :class:`sarkas.tools.observables.Observable`
attributes and/or add new ones.
"""
return func
[docs]def arg_update_doc(func):
func.__doc__ = """Update observable specific attributes and call :meth:`~.update_finish` to save info."""
return func
[docs]class Observable:
"""
Parent class of all the observables.
Attributes
----------
dataframe : pandas.DataFrame
Dataframe containing the observable's data averaged over the number of slices.
dataframe_acf : pandas.DataFrame
Dataframe containing the observable's autocorrelation function data averaged over the number of slices.
dataframe_acf_slices : pandas.DataFrame
Dataframe containing the observable's autocorrelation data for each slice.
dataframe_slices : pandas.DataFrame
Dataframe containing the observable's data for each slice.
max_k_harmonics : list
Maximum number of :math:`\\mathbf{k}` harmonics to calculate along each dimension.
phase : str
Phase to analyze.
prod_no_dumps : int
Number of production phase checkpoints. Calculated from the number of files in the Production directory.
eq_no_dumps : int
Number of equilibration phase checkpoints. Calculated from the number of files in the Equilibration directory.
no_dumps : int
Number of simulation's checkpoints. Calculated from the number of files in the phase folder.
dump_dir : str
Path to correct dump directory.
dump_step : int
Correct step interval.
It is either :py:attr:`sarkas.core.Parameters.prod_dump_step` or :py:attr:`sarkas.core.Parameters.eq_dump_step`.
no_obs : int
Number of independent binary observable quantities.
It is calculated as :math:`N_s (N_s + 1) / 2` where :math:`N_s` is the number of species.
k_file : str
Path to the npz file storing the :math:`k` vector values.
nkt_hdf_file : str
Path to the npy file containing the Fourier transform of density fluctuations. :math:`n(\\mathbf k, t)`.
vkt_file : str
Path to the npz file containing the Fourier transform of velocity fluctuations. :math:`\\mathbf v(\\mathbf k, t)`.
k_space_dir : str
Directory where :math:`\\mathbf {k}` data is stored.
saving_dir : str
Path to the directory where computed data is stored.
slice_steps : int
Number of steps per slice.
dimensional_average: bool
Flag for averaging over all dimensions. Default = False.
runs: int
Number of independent MD runs. Default = 1.
multi_run_average: bool
Flag for averaging over multiple runs. Default = False.
If True, `runs` needs be specified. It will collect data from all runs and stored them in a large ndarray to
be averaged over.
"""
[docs] def __init__(self):
self.postprocessing_dir = None
self.mag_no_dumps = None
self.eq_no_dumps = None
self.prod_no_dumps = None
self.no_obs = None
self.filename_hdf_acf = None
self.species_index_start = None
self.filename_hdf_acf_slices = None
self.filename_hdf_slices = None
self.filename_hdf = None
self.__long_name__ = None
self.__name__ = None
self.saving_dir = None
self.phase = "production"
self.multi_run_average = False
self.dimensional_average = False
self.runs = 1
self.no_slices = 1
self.slice_steps = None
self.screen_output = True
self.timer = SarkasTimer()
# k observable attributes
self.k_observable = False
self.max_aa_harmonics = None
self.angle_averaging = "principal_axis"
self.max_k_harmonics = None
self.max_aa_ka_value = None
self.kw_observable = False
self.dim_labels = ["X", "Y", "Z"]
self.acf_observable = False
self.dataframe = None
self.dataframe_slices = None
self.dataframe_acf = None
self.dataframe_acf_slices = None
def __repr__(self):
sortedDict = dict(sorted(self.__dict__.items(), key=lambda x: x[0].lower()))
disp = "Observable( " + self.__class__.__name__ + "\n"
exclude_list = ["dataframe", "dataframe_slices", "dataframe_acf", "dataframe_acf_slices"]
for key, value in sortedDict.items():
if not key in exclude_list:
disp += "\t{} : {}\n".format(key, value)
disp += ")"
return disp
def __getstate__(self):
"""Copy the object's state from self.__dict__ which contains all our instance attributes.
Always use the dict.copy() method to avoid modifying the original state.
Reference: https://docs.python.org/3/library/pickle.html#handling-stateful-objects
"""
state = self.__dict__.copy()
# Remove the data that is stored already
del state["dataframe"]
del state["dataframe_slices"]
del state["dataframe_acf"]
del state["dataframe_acf_slices"]
return state
def __setstate__(self, state):
# Restore instance attributes.
self.__dict__.update(state)
# Restore the previously deleted dataframes.
self.parse()
[docs] def from_dict(self, input_dict: dict):
"""
Update attributes from input dictionary.
Parameters
----------
input_dict: dict
Dictionary to be copied.
"""
self.__dict__.update(input_dict)
[docs] def setup_multirun_dirs(self):
"""Set the attributes postprocessing_dir and dump_dirs_list.
The attribute postprocessing_dir refers to the location where to store postprocessing results.
If the attribute :py:attr:`sarkas.tools.observables.Observable.multi_run_average` is set to `True` then the
postprocessing data will be saved in the :py:attr:`sarkas.core.Parameters.simulations_dir` directory, i.e. where
all the runs are.
Otherwise :py:attr:`sarkas.tools.observables.Observable.postprocessing_dir` will be
in the :py:attr:`sarkas.core.Parameters.job_dir`.
The attribute :py:attr:`sarkas.tools.observables.Observable.dump_dirs_list` is a list of the refers to the locations
of the production (or other phases) dumps. If :py:attr:`sarkas.tools.observables.Observable.multi_run_average` is
`False` then the list will contain only one path, namely :py:attr:`sarkas.tools.observables.Observable.dump_dir`.
"""
self.dump_dirs_list = []
if self.multi_run_average:
for r in range(self.runs):
# Direct to the correct dumps directory
dump_dir = os_path_join(
f"run{r}", os_path_join("Simulation", os_path_join(self.phase.capitalize(), "dumps"))
)
dump_dir = os_path_join(self.simulations_dir, dump_dir)
self.dump_dirs_list.append(dump_dir)
# Re-path the saving directory.
# Data is saved in Simulations/PostProcessing/Observable/Phase/
self.postprocessing_dir = os_path_join(self.simulations_dir, "PostProcessing")
if not os_path_exists(self.postprocessing_dir):
mkdir(self.postprocessing_dir)
else:
self.dump_dirs_list = [self.dump_dir]
[docs] def setup_init(
self, params, phase: str = None, no_slices: int = None, multi_run_average: bool = None, runs: int = None
):
"""
Assign Observables attributes and copy the simulation's parameters.
Parameters
----------
params : sarkas.core.Parameters
Simulation's parameters.
phase : str, optional
Phase to compute. Default = 'production'.
no_slices : int, optional
Number of independent runs inside a long simulation. Default = 1.
"""
if phase:
self.phase = phase.lower()
if no_slices:
self.no_slices = no_slices
if multi_run_average:
self.multi_run_average = multi_run_average
if runs:
self.runs = 1
# The dict update could overwrite the names
name = self.__name__
long_name = self.__long_name__
self.__dict__.update(params.__dict__)
# Restore the correct names
self.__name__ = name
self.__long_name__ = long_name
if self.k_observable:
# Check for k space information.
if self.angle_averaging in ["full", "principal_axis"]:
if self.max_k_harmonics is None and self.max_ka_value is None:
raise AttributeError("max_ka_value and max_k_harmonics not defined.")
elif self.angle_averaging == "custom":
# if custom i expect that there is a portion that must be angle averaged.
# Therefore check for those values.
if self.max_aa_ka_value is None and self.max_aa_harmonics is None:
raise AttributeError("max_aa_harmonics and max_aa_ka_value not defined.")
# More checks on k attributes and initialization of k vectors
if self.max_k_harmonics is not None:
# Update angle averaged attributes depending on the choice of angle_averaging
# Convert max_k_harmonics to a numpy array. YAML reads in a list.
if not isinstance(self.max_k_harmonics, ndarray):
self.max_k_harmonics = array(self.max_k_harmonics)
# Calculate max_aa_harmonics based on the choice of angle averaging and inputs
if self.angle_averaging == "full":
self.max_aa_harmonics = self.max_k_harmonics.copy()
elif self.angle_averaging == "custom":
# Check if the user has defined the max_aa_harmonics
if self.max_aa_ka_value:
nx = int(self.max_aa_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws * sqrt(3.0)))
self.max_aa_harmonics = array([nx, nx, nx])
# else max_aa_harmonics is user defined
elif self.angle_averaging == "principal_axis":
self.max_aa_harmonics = array([0, 0, 0])
# max_ka_value is still None
elif self.max_ka_value is not None:
# Update max_k_harmonics and angle average attributes based on the choice of angle_averaging
# Calculate max_k_harmonics from max_ka_value
# Check for angle_averaging choice
if self.angle_averaging == "full":
# The maximum value is calculated assuming that max nx = max ny = max nz
# ka_max = 2pi a/L sqrt( nx^2 + ny^2 + nz^2) = 2pi a/L nx sqrt(3)
nx = int(self.max_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws * sqrt(3.0)))
self.max_k_harmonics = array([nx, nx, nx])
self.max_aa_harmonics = array([nx, nx, nx])
elif self.angle_averaging == "custom":
# ka_max = 2pi a/L sqrt( nx^2 + 0 + 0) = 2pi a/L nx
nx = int(self.max_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws))
self.max_k_harmonics = array([nx, nx, nx])
# Check if the user has defined the max_aa_harmonics
if self.max_aa_ka_value:
nx = int(self.max_aa_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws * sqrt(3.0)))
self.max_aa_harmonics = array([nx, nx, nx])
# else max_aa_harmonics is user defined
elif self.angle_averaging == "principal_axis":
# ka_max = 2pi a/L sqrt( nx^2 + 0 + 0) = 2pi a/L nx
nx = int(self.max_ka_value * self.box_lengths[0] / (2.0 * pi * self.a_ws))
self.max_k_harmonics = array([nx, nx, nx])
self.max_aa_harmonics = array([0, 0, 0])
# Calculate the maximum ka value based on user's choice of angle_averaging
# Dev notes: Make sure max_ka_value, max_aa_ka_value are defined when this if is done
if self.angle_averaging == "full":
self.max_ka_value = 2.0 * pi * self.a_ws * norm(self.max_k_harmonics / self.box_lengths)
self.max_aa_ka_value = 2.0 * pi * self.a_ws * norm(self.max_k_harmonics / self.box_lengths)
elif self.angle_averaging == "principal_axis":
self.max_ka_value = 2.0 * pi * self.a_ws * self.max_k_harmonics[0] / self.box_lengths[0]
self.max_aa_ka_value = 0.0
elif self.angle_averaging == "custom":
self.max_aa_ka_value = 2.0 * pi * self.a_ws * norm(self.max_aa_harmonics / self.box_lengths)
self.max_ka_value = 2.0 * pi * self.a_ws * self.max_k_harmonics[0] / self.box_lengths[0]
# Create paths for files
self.k_space_dir = os_path_join(self.postprocessing_dir, "k_space_data")
self.k_file = os_path_join(self.k_space_dir, "k_arrays.npz")
self.nkt_hdf_file = os_path_join(self.k_space_dir, "nkt.h5")
self.vkt_hdf_file = os_path_join(self.k_space_dir, "vkt.h5")
# Get the number of independent observables if multi-species
self.no_obs = int(self.num_species * (self.num_species + 1) / 2)
# Get the total number of dumps by looking at the files in the directory
self.prod_no_dumps = len(listdir(self.prod_dump_dir))
self.eq_no_dumps = len(listdir(self.eq_dump_dir))
# Check for magnetized plasma options
if self.magnetized and self.electrostatic_equilibration:
self.mag_no_dumps = len(listdir(self.mag_dump_dir))
# Assign dumps variables based on the choice of phase
if self.phase == "equilibration":
self.no_dumps = self.eq_no_dumps
self.dump_step = self.eq_dump_step
self.no_steps = self.equilibration_steps
self.dump_dir = self.eq_dump_dir
elif self.phase == "production":
self.no_dumps = self.prod_no_dumps
self.dump_step = self.prod_dump_step
self.no_steps = self.production_steps
self.dump_dir = self.prod_dump_dir
elif self.phase == "magnetization":
self.no_dumps = self.mag_no_dumps
self.dump_step = self.mag_dump_step
self.no_steps = self.magnetization_steps
self.dump_dir = self.mag_dump_dir
# Needed for preprocessing pretty print
self.slice_steps = (
int(self.no_steps / self.dump_step / self.no_slices)
if self.no_dumps < self.no_slices
else int(self.no_dumps / self.no_slices)
)
# Array containing the start index of each species.
self.species_index_start = array([0, *self.species_num.cumsum()], dtype=int)
[docs] def create_dirs_filenames(self):
# Saving Directory
self.setup_multirun_dirs()
# If multi_run_average self.postprocessing_dir is Simulations/Postprocessing else Job_dir/Postprocessing
saving_dir = os_path_join(self.postprocessing_dir, self.__long_name__.replace(" ", ""))
if not os_path_exists(saving_dir):
mkdir(saving_dir)
self.saving_dir = os_path_join(saving_dir, self.phase.capitalize())
if not os_path_exists(self.saving_dir):
mkdir(self.saving_dir)
# Filenames and strings
self.filename_hdf = os_path_join(self.saving_dir, self.__long_name__.replace(" ", "") + "_" + self.job_id + ".h5")
self.filename_hdf_slices = os_path_join(
self.saving_dir, self.__long_name__.replace(" ", "") + "_slices_" + self.job_id + ".h5"
)
if self.acf_observable:
self.filename_hdf_acf = os_path_join(
self.saving_dir, self.__long_name__.replace(" ", "") + "ACF_" + self.job_id + ".h5"
)
self.filename_hdf_acf_slices = os_path_join(
self.saving_dir, self.__long_name__.replace(" ", "") + "ACF_slices_" + self.job_id + ".h5"
)
[docs] def grab_sim_data(self, pva: str = None):
"""Read in particles data"""
# Velocity array for storing simulation data
data_all = zeros((self.no_dumps, self.dim, self.runs * self.inv_dim * self.total_num_ptcls))
time = zeros(self.no_dumps)
print("\nCollecting data from snapshots ...")
if self.dimensional_average:
# Loop over the runs
for r, dump_dir_r in enumerate(tqdm(self.dump_dirs_list, disable=(not self.verbose), desc="Runs Loop")):
# Loop over the timesteps
for it in tqdm(range(self.no_dumps), disable=(not self.verbose), desc="Timestep Loop"):
# Read data from file
dump = int(it * self.dump_step)
datap = load_from_restart(dump_dir_r, dump)
# Loop over the particles' species
for sp_indx, (sp_name, sp_num) in enumerate(zip(self.species_names, self.species_num)):
# Calculate the correct start and end index for storage
start_indx = self.species_index_start[sp_indx] + self.inv_dim * sp_num * r
end_indx = self.species_index_start[sp_indx] + self.inv_dim * sp_num * (r + 1)
# Use a mask to grab only the selected species and flatten along the first axis
# data = ( v1_x, v1_y, v1_z,
# v2_x, v2_y, v2_z,
# v3_x, v3_y, v3_z,
# ...)
# The flatten array would like this
# flattened = ( v1_x, v2_x, v3_x, ..., v1_y, v2_y, v3_y, ..., v1_z, v2_z, v3_z, ...)
mask = datap["names"] == sp_name
data_all[it, 0, start_indx:end_indx] = datap[pva][mask].flatten("F")
time[it] = datap["time"]
else: # Dimensional Average = False
# Loop over the runs
for r, dump_dir_r in enumerate(tqdm(self.dump_dirs_list, disable=(not self.verbose), desc="Runs Loop")):
# Loop over the timesteps
for it in tqdm(range(self.no_dumps), disable=(not self.verbose), desc="Timestep Loop"):
# Read data from file
dump = int(it * self.dump_step)
datap = load_from_restart(dump_dir_r, dump)
# Loop over the particles' species
for sp_indx, (sp_name, sp_num) in enumerate(zip(self.species_names, self.species_num)):
# Calculate the correct start and end index for storage
start_indx = self.species_index_start[sp_indx] + self.inv_dim * sp_num * r
end_indx = self.species_index_start[sp_indx] + self.inv_dim * sp_num * (r + 1)
# Use a mask to grab only the selected species and transpose the array to put dimensions first
for d in range(self.dimensions):
mask = datap["names"] == sp_name
data_all[it, d, start_indx:end_indx] = datap[pva][mask][:, d]
time[it] = datap["time"]
return time, data_all
[docs] def parse(self):
"""
Grab the pandas dataframe from the saved csv file. If file does not exist call ``compute``.
"""
if self.k_observable:
try:
self.dataframe = read_hdf(self.filename_hdf, mode="r", index_col=False)
k_data = load(self.k_file)
self.k_list = k_data["k_list"]
self.k_counts = k_data["k_counts"]
self.ka_values = k_data["ka_values"]
except FileNotFoundError:
print("\nFile {} not found!".format(self.filename_hdf))
print("\nComputing Observable now ...")
self.compute()
else:
try:
if hasattr(self, "filename_csv"):
self.dataframe = read_csv(self.filename_csv, index_col=False)
else:
self.dataframe = read_hdf(self.filename_hdf, mode="r", index_col=False)
except FileNotFoundError:
if hasattr(self, "filename_csv"):
data_file = self.filename_csv
else:
data_file = self.filename_hdf
print("\nData file not found! \n {}".format(data_file))
print("\nComputing Observable now ...")
self.compute()
if hasattr(self, "dataframe_slices"):
self.dataframe_slices = read_hdf(self.filename_hdf_slices, mode="r", index_col=False)
if self.acf_observable:
self.dataframe_acf = read_hdf(self.filename_hdf_acf, mode="r", index_col=False)
self.dataframe_acf_slices = read_hdf(self.filename_hdf_acf_slices, mode="r", index_col=False)
[docs] def parse_k_data(self):
"""Read in the precomputed Fourier space data. Recalculate if not correct."""
try:
k_data = load(self.k_file)
# Check for the correct number of k values
if self.angle_averaging == k_data["angle_averaging"]:
# Check for the correct max harmonics
comp = self.max_k_harmonics == k_data["max_k_harmonics"]
if comp.all():
self.k_list = k_data["k_list"]
self.k_harmonics = k_data["k_harmonics"]
self.k_counts = k_data["k_counts"]
self.k_values = k_data["k_values"]
self.ka_values = k_data["ka_values"]
self.no_ka_values = len(self.ka_values)
else:
self.calc_k_data()
else:
self.calc_k_data()
except FileNotFoundError:
self.calc_k_data()
[docs] def calc_k_data(self):
"""Calculate and save Fourier space data."""
# Do some checks
if not isinstance(self.angle_averaging, str):
raise TypeError("angle_averaging not a string. " "Choose from ['full', 'custom', 'principal_axis']")
elif self.angle_averaging not in ["full", "custom", "principal_axis"]:
raise ValueError(
"Option not available. " "Choose from ['full', 'custom', 'principal_axis']" "Note case sensitivity."
)
assert self.max_k_harmonics.all(), "max_k_harmonics not defined."
# Calculate the k arrays
self.k_list, self.k_counts, k_unique, self.k_harmonics = kspace_setup(
self.box_lengths, self.angle_averaging, self.max_k_harmonics, self.max_aa_harmonics
)
# Save the ka values
self.ka_values = 2.0 * pi * k_unique * self.a_ws
self.k_values = 2.0 * pi * k_unique
self.no_ka_values = len(self.ka_values)
# Check if the writing folder exist
if not (os_path_exists(self.k_space_dir)):
mkdir(self.k_space_dir)
# Write the npz file
savez(
self.k_file,
k_list=self.k_list,
k_harmonics=self.k_harmonics,
k_counts=self.k_counts,
k_values=self.k_values,
ka_values=self.ka_values,
angle_averaging=self.angle_averaging,
max_k_harmonics=self.max_k_harmonics,
max_aa_harmonics=self.max_aa_harmonics,
)
[docs] def parse_kt_data(self, nkt_flag: bool = False, vkt_flag: bool = False):
"""
Read in the precomputed time dependent Fourier space data. Recalculate if not.
Parameters
----------
nkt_flag : bool
Flag for reading microscopic density Fourier components :math:`n(\\mathbf k, t)`. \n
Default = False.
vkt_flag : bool
Flag for reading microscopic velocity Fourier components, :math:`v(\\mathbf k, t)`. \n
Default = False.
"""
if nkt_flag:
try:
# Check that what was already calculated is correct
with HDFStore(self.nkt_hdf_file, mode="r") as nkt_hfile:
metadata = nkt_hfile.get_storer("nkt").attrs.metadata
if metadata["no_slices"] == self.no_slices:
# Check for the correct number of k values
if metadata["angle_averaging"] == self.angle_averaging:
# Check for the correct max harmonics
comp = self.max_k_harmonics == metadata["max_k_harmonics"]
if not comp.all():
self.calc_kt_data(nkt_flag=True)
else:
self.calc_kt_data(nkt_flag=True)
else:
self.calc_kt_data(nkt_flag=True)
# elif metadata['max_k_harmonics']
#
# if self.angle_averaging == nkt_data["angle_averaging"]:
#
# comp = self.max_k_harmonics == nkt_data["max_harmonics"]
# if not comp.all():
# self.calc_kt_data(nkt_flag=True)
# else:
# self.calc_kt_data(nkt_flag=True)
except OSError:
self.calc_kt_data(nkt_flag=True)
if vkt_flag:
try:
# Check that what was already calculated is correct
with HDFStore(self.vkt_hdf_file, mode="r") as vkt_hfile:
metadata = vkt_hfile.get_storer("vkt").attrs.metadata
if metadata["no_slices"] == self.no_slices:
# Check for the correct number of k values
if metadata["angle_averaging"] == self.angle_averaging:
# Check for the correct max harmonics
comp = self.max_k_harmonics == metadata["max_k_harmonics"]
if not comp.all():
self.calc_kt_data(vkt_flag=True)
else:
self.calc_kt_data(vkt_flag=True)
else:
self.calc_kt_data(vkt_flag=True)
except OSError:
self.calc_kt_data(vkt_flag=True)
[docs] def calc_kt_data(self, nkt_flag: bool = False, vkt_flag: bool = False):
"""Calculate Time dependent Fourier space quantities.
Parameters
----------
nkt_flag : bool
Flag for calculating microscopic density Fourier components :math:`n(\\mathbf k, t)`. \n
Default = False.
vkt_flag : bool
Flag for calculating microscopic velocity Fourier components, :math:`v(\\mathbf k, t)`. \n
Default = False.
"""
start_slice = 0
end_slice = self.slice_steps * self.dump_step
if nkt_flag:
nkt_dataframe = DataFrame()
tinit = self.timer.current()
for isl in range(self.no_slices):
print("\nCalculating n(k,t) for slice {}/{}.".format(isl + 1, self.no_slices))
nkt = calc_nkt(
self.dump_dir,
(start_slice, end_slice, self.slice_steps),
self.dump_step,
self.species_num,
self.k_list,
self.verbose,
)
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
# n(k,t).shape = [no_species, time, k vectors]
slc_column = "slice {}".format(isl + 1)
for isp, sp_name in enumerate(self.species_names):
df_columns = [
slc_column + "_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
# df_columns = [time_column, *k_columns]
nkt_dataframe = concat([nkt_dataframe, DataFrame(nkt[isp, :, :], columns=df_columns)], axis=1)
tuples = [tuple(c.split("_")) for c in nkt_dataframe.columns]
nkt_dataframe.columns = MultiIndex.from_tuples(tuples, names=["slices", "species", "harmonics"])
# Sample nkt_dataframe
# slices slice 1
# species H
# harmonics k = [0, 0, 1] | k = [0, 1, 0] | ...
# Save the data and append metadata
if os_path_exists(self.nkt_hdf_file):
os_remove(self.nkt_hdf_file)
hfile = HDFStore(self.nkt_hdf_file, mode="w")
hfile.put("nkt", nkt_dataframe)
# This metadata is needed to check if I need to recalculate
metadata = {
"no_slices": self.no_slices,
"max_k_harmonics": self.max_k_harmonics,
"angle_averaging": self.angle_averaging,
}
hfile.get_storer("nkt").attrs.metadata = metadata
hfile.close()
tend = self.timer.current()
self.time_stamp("n(k,t) Calculation", self.timer.time_division(tend - tinit))
if vkt_flag:
vkt_dataframe = DataFrame()
tinit = self.timer.current()
for isl in range(self.no_slices):
print(
"\nCalculating longitudinal and transverse "
"velocity fluctuations v(k,t) for slice {}/{}.".format(isl + 1, self.no_slices)
)
vkt, vkt_i, vkt_j, vkt_k = calc_vkt(
self.dump_dir,
(start_slice, end_slice, self.slice_steps),
self.dump_step,
self.species_num,
self.k_list,
self.verbose,
)
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
slc_column = "slice {}".format(isl + 1)
for isp, sp_name in enumerate(self.species_names):
df_columns = [
slc_column
+ "_Longitudinal_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
# df_columns = [time_column, *k_columns]
vkt_dataframe = concat([vkt_dataframe, DataFrame(vkt[isp, :, :], columns=df_columns)], axis=1)
df_columns = [
slc_column
+ "_Transverse i_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
vkt_dataframe = concat([vkt_dataframe, DataFrame(vkt_i[isp, :, :], columns=df_columns)], axis=1)
df_columns = [
slc_column
+ "_Transverse j_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
vkt_dataframe = concat([vkt_dataframe, DataFrame(vkt_j[isp, :, :], columns=df_columns)], axis=1)
df_columns = [
slc_column
+ "_Transverse k_{}_k = [{}, {}, {}]".format(sp_name, *self.k_harmonics[ik, :-2].astype(int))
for ik in range(len(self.k_harmonics))
]
vkt_dataframe = concat([vkt_dataframe, DataFrame(vkt_k[isp, :, :], columns=df_columns)], axis=1)
# Full string: slice 1_Longitudinal_He_k = [1, 0, 0]
tuples = [tuple(c.split("_")) for c in vkt_dataframe.columns]
vkt_dataframe.columns = MultiIndex.from_tuples(tuples, names=["slices", "species", "direction", "harmonics"])
# Sample nkt_dataframe
# slices slice 1
# species H
# direction Longitudinal/Transverse
# harmonics k = [0, 0, 1] | k = [0, 1, 0] | ...
if os_path_exists(self.vkt_hdf_file):
os_remove(self.vkt_hdf_file)
# Save the data and append metadata
hfile = HDFStore(self.vkt_hdf_file, mode="w")
hfile.put("vkt", vkt_dataframe)
# This metadata is needed to check if I need to recalculate
metadata = {
"no_slices": self.no_slices,
"max_k_harmonics": self.max_k_harmonics,
"angle_averaging": self.angle_averaging,
}
hfile.get_storer("vkt").attrs.metadata = metadata
hfile.close()
tend = self.timer.current()
self.time_stamp("v(k,t) Calculation", self.timer.time_division(tend - tinit))
# savez(self.vkt_file + '_slice_' + str(isl) + '.npz',
# longitudinal=vkt,
# transverse_i=vkt_i,
# transverse_j=vkt_j,
# transverse_k=vkt_k,
# max_harmonics=self.max_k_harmonics,
# angle_averaging=self.angle_averaging)
[docs] def plot(self, scaling: tuple = None, acf: bool = False, figname: str = None, show: bool = False, **kwargs):
"""
Plot the observable by calling the pandas.DataFrame.plot() function and save the figure.
Parameters
----------
scaling : float, tuple
Factor by which to rescale the x and y axis.
acf : bool
Flag for renormalizing the autocorrelation functions. Default= False
figname : str
Name with which to save the file. It automatically saves it in the correct directory.
show : bool
Flag for prompting the plot to screen. Default=False
**kwargs :
Options to pass to matplotlib plotting method.
Returns
-------
axes_handle : matplotlib.axes.Axes
Axes. See `pandas` documentation for more info
"""
# Grab the data
# self.parse()
# Make a copy of the dataframe for plotting
plot_dataframe = self.dataframe.copy()
if scaling:
if isinstance(scaling, tuple):
plot_dataframe.iloc[:, 0] /= scaling[0]
plot_dataframe[kwargs["y"]] /= scaling[1]
else:
plot_dataframe.iloc[:, 0] /= scaling
# Autocorrelation function renormalization
if acf:
plot_dataframe = self.dataframe_acf.copy()
for i, col in enumerate(plot_dataframe.columns[1:], 1):
plot_dataframe[col] /= plot_dataframe[col].iloc[0]
kwargs["logx"] = True
# kwargs['xlabel'] = 'Time difference'
# if self.__class__.__name__ == 'StaticStructureFactor':
# errorbars = plot_dataframe.copy()
# for i, col in enumerate(self.dataframe.columns):
# if col[-8:] == 'Errorbar':
# errorbars.drop(col, axis=1, inplace=True)
# errorbars.rename({col: col[:-9]}, axis=1, inplace=True)
# plot_dataframe.drop(col, axis=1, inplace=True)
# kwargs['yerr'] = errorbars
#
axes_handle = plot_dataframe.plot(x=plot_dataframe.columns[0], **kwargs)
fig = axes_handle.figure
fig.tight_layout()
# Saving
if figname:
fig.savefig(os_path_join(self.saving_dir, figname + "_" + self.job_id + ".png"))
else:
fig.savefig(os_path_join(self.saving_dir, "Plot_" + self.__name__ + "_" + self.job_id + ".png"))
if show:
fig.show()
return axes_handle
[docs] def time_stamp(self, message: str, timing: tuple):
"""Print to screen the elapsed time of the calculation.
Parameters
----------
message : str
Message to print.
timing : tuple
Time in hrs, min, sec, msec, usec, nsec.
"""
t_hrs, t_min, t_sec, t_msec, t_usec, t_nsec = timing
if t_hrs == 0 and t_min == 0 and t_sec <= 2:
print_message = "\n{} Time: {} sec {} msec {} usec {} nsec".format(
message, int(t_sec), int(t_msec), int(t_usec), int(t_nsec)
)
else:
print_message = "\n{} Time: {} hrs {} min {} sec".format(message, int(t_hrs), int(t_min), int(t_sec))
# logging.info(print_message)
print(print_message)
[docs] def save_hdf(self):
# Create the columns for the HDF df
if not self.k_observable:
if not isinstance(self.dataframe_slices.columns, MultiIndex):
self.dataframe_slices.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_slices.columns]
)
if not isinstance(self.dataframe.columns, MultiIndex):
self.dataframe.columns = MultiIndex.from_tuples([tuple(c.split("_")) for c in self.dataframe.columns])
# Sort the index for speed
# see https://stackoverflow.com/questions/54307300/what-causes-indexing-past-lexsort-depth-warning-in-pandas
self.dataframe = self.dataframe.sort_index()
self.dataframe_slices = self.dataframe_slices.sort_index()
# TODO: Fix this hack. We should be able to add data to HDF instead of removing it and rewriting it.
# Save the data.
if os_path_exists(self.filename_hdf_slices):
os_remove(self.filename_hdf_slices)
self.dataframe_slices.to_hdf(self.filename_hdf_slices, mode="w", key=self.__name__)
if os_path_exists(self.filename_hdf):
os_remove(self.filename_hdf)
self.dataframe.to_hdf(self.filename_hdf, mode="w", key=self.__name__)
if self.acf_observable:
if not isinstance(self.dataframe_acf.columns, pd.MultiIndex):
self.dataframe_acf.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_acf.columns]
)
if not isinstance(self.dataframe_acf_slices.columns, pd.MultiIndex):
self.dataframe_acf_slices.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.dataframe_acf_slices.columns]
)
self.dataframe_acf = self.dataframe_acf.sort_index()
self.dataframe_acf_slices = self.dataframe_acf_slices.sort_index()
if os_path_exists(self.filename_hdf_acf):
os_remove(self.filename_hdf_acf)
self.dataframe_acf.to_hdf(self.filename_hdf_acf, mode="w", key=self.__name__)
if os_path_exists(self.filename_hdf_acf_slices):
os_remove(self.filename_hdf_acf_slices)
self.dataframe_acf_slices.to_hdf(self.filename_hdf_acf_slices, mode="w", key=self.__name__)
[docs] def save_pickle(self):
"""Save the observable's info into a pickle file."""
self.filename_pickle = os_path_join(self.saving_dir, self.__long_name__.replace(" ", "") + ".pickle")
with open(self.filename_pickle, "wb") as pickle_file:
dump(self, pickle_file)
pickle_file.close()
[docs] def read_pickle(self):
"""Read the observable's info from the pickle file."""
self.filename_pickle = os_path_join(self.saving_dir, self.__long_name__.replace(" ", "") + ".pickle")
with open(self.filename_pickle, "rb") as pkl_data:
data = pickle_load()
self.from_dict(data.__dict__)
[docs] def update_finish(self):
"""Update the :py:attr:`~.slice_steps`, CCF's and DSF's attributes, and save pickle file with observable's info.
Notes
-----
The information is saved without the dataframe(s).
"""
# Needed if no_slices has been passed
self.slice_steps = (
int(self.no_steps / self.dump_step / self.no_slices)
if self.no_dumps < self.no_slices
else int(self.no_dumps / self.no_slices)
)
if self.k_observable:
self.parse_k_data()
if self.kw_observable:
# These calculation are needed for the io.postprocess_info().
# This is a hack and we need to find a faster way to do it
dt_r = self.dt * self.dump_step
self.w_min = 2.0 * pi / (self.slice_steps * self.dt * self.dump_step)
self.w_max = pi / dt_r # Half because fft calculates negative and positive frequencies
self.frequencies = 2.0 * pi * fftfreq(self.slice_steps, self.dt * self.dump_step)
self.frequencies = fftshift(self.frequencies)
self.create_dirs_filenames()
self.save_pickle()
# This re-initialization of the dataframe is needed to avoid len mismatch conflicts when re-calculating
self.dataframe = DataFrame()
self.dataframe_slices = DataFrame()
if self.acf_observable:
self.dataframe_acf = DataFrame()
self.dataframe_acf_slices = DataFrame()
[docs]class CurrentCorrelationFunction(Observable):
"""
Current Correlation Functions. \n
The species dependent longitudinal ccf :math:`L_{AB}(\\mathbf k, \\omega)` is defined as
.. math::
L_{AB}(\\mathbf k,\\omega) = \\int_0^\\infty dt \\,
\\left \\langle \\left [\\mathbf k \\cdot \\mathbf v_{A} ( \\mathbf k, t) \\right ]
\\left [ - \\mathbf k \\cdot \\mathbf v_{B} ( -\\mathbf k, t) \\right \\rangle \\right ]
e^{i \\omega t},
while the transverse are
.. math::
T_{AB}(\\mathbf k,\\omega) = \\int_0^\\infty dt \\,
\\left \\langle \\left [ \\mathbf k \\times \\mathbf v_{A} ( \\mathbf k, t) \\right ] \\cdot
\\left [ -\\mathbf k \\times \\mathbf v_{A} ( -\\mathbf k, t) \\right \\rangle \\right ]
e^{i \\omega t},
where the microscopic velocity of species :math:`A` with number of particles :math:`N_{A}` is given by
.. math::
\\mathbf v_{A}(\\mathbf k,t) = \\sum^{N_{A}}_{j} \\mathbf v_j(t) e^{-i \\mathbf k \\cdot \\mathbf r_j(t)} .
Attributes
----------
k_list : list
List of all possible :math:`k` vectors with their corresponding magnitudes and indexes.
k_counts : numpy.ndarray
Number of occurrences of each :math:`k` magnitude.
ka_values : numpy.ndarray
Magnitude of each allowed :math:`ka` vector.
no_ka_values: int
Length of :py:attr:`~.ka_values` array.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "ccf"
self.__long_name__ = "Current Correlation Function"
self.k_observable = True
self.kw_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase=phase, no_slices=no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] @compute_doc
def compute(self):
self.parse_kt_data(nkt_flag=False, vkt_flag=True)
tinit = self.timer.current()
vkt_df = read_hdf(self.vkt_hdf_file, mode="r", key="vkt")
# Containers
vkt = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
vkt_i = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
vkt_j = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
vkt_k = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
for isl in range(self.no_slices):
# Put data in the container to pass
for sp, sp_name in enumerate(self.species_names):
vkt[sp] = array(vkt_df["slice {}".format(isl + 1)]["Longitudinal"][sp_name])
vkt_i[sp] = array(vkt_df["slice {}".format(isl + 1)]["Transverse i"][sp_name])
vkt_j[sp] = array(vkt_df["slice {}".format(isl + 1)]["Transverse j"][sp_name])
vkt_k[sp] = array(vkt_df["slice {}".format(isl + 1)]["Transverse k"][sp_name])
# Calculate Lkw and Tkw
Lkw = calc_Skw(vkt, self.k_list, self.species_num, self.slice_steps, self.dt, self.dump_step)
Tkw_i = calc_Skw(vkt_i, self.k_list, self.species_num, self.slice_steps, self.dt, self.dump_step)
Tkw_j = calc_Skw(vkt_j, self.k_list, self.species_num, self.slice_steps, self.dt, self.dump_step)
Tkw_k = calc_Skw(vkt_k, self.k_list, self.species_num, self.slice_steps, self.dt, self.dump_step)
Tkw = (Tkw_i + Tkw_j + Tkw_k) / 3.0
# Lkw_tot += Lkw / self.no_slices
# Tkw_tot += Tkw / self.no_slices
# Create the dataframe's column names
slc_column = "slice {}".format(isl + 1)
ka_columns = ["ka = {:.6f}".format(ka) for ik, ka in enumerate(self.ka_values)]
# Save the full Lkw into a Dataframe
sp_indx = 0
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
columns = [
"Longitudinal_{}-{}_".format(sp1, sp2)
+ slc_column
+ "_{}_k = [{}, {}, {}]".format(
ka_columns[int(self.k_harmonics[ik, -1])], *self.k_harmonics[ik, :-2].astype(int)
)
# Final string : Longitudinal_H-He_slice 1_ka = 0.123456_k = [0, 0, 1]
for ik in range(len(self.k_harmonics))
]
self.dataframe_slices = concat(
[self.dataframe_slices, DataFrame(Lkw[sp_indx, :, :].T, columns=columns)], axis=1
)
columns = [
"Transverse_{}-{}_".format(sp1, sp2)
+ slc_column
+ "_{}_k = [{}, {}, {}]".format(
ka_columns[int(self.k_harmonics[ik, -1])], *self.k_harmonics[ik, :-2].astype(int)
)
# Final string : Transverse_H-He_slice 1_ka = 0.123456_k = [0, 0, 1]
for ik in range(len(self.k_harmonics))
]
self.dataframe_slices = concat(
[self.dataframe_slices, DataFrame(Tkw[sp_indx, :, :].T, columns=columns)], axis=1
)
sp_indx += 1
# Create the MultiIndex
tuples = [tuple(c.split("_")) for c in self.dataframe_slices.columns]
self.dataframe_slices.columns = MultiIndex.from_tuples(
tuples, names=["direction", "species", "slices", "ka_value", "k_harmonics"]
)
# Now the actual dataframe
self.dataframe[" _ _ _ _Frequencies"] = self.frequencies
# Take the mean and std and store them into the dataframe to return
for sp1, sp1_name in enumerate(self.species_names):
for sp2, sp2_name in enumerate(self.species_names[sp1:], sp1):
comp_name = "{}-{}".format(sp1_name, sp2_name)
### LONGITUDINAL
# Rename the columns with values of ka
ka_columns = [
"Longitudinal_" + comp_name + "_Mean_ka{} = {:.4f}".format(ik + 1, ka)
for ik, ka in enumerate(self.ka_values)
]
# Mean: level = 1 corresponds to averaging all the k harmonics with the same magnitude
df_mean = self.dataframe_slices["Longitudinal"][comp_name].mean(level=1, axis="columns")
df_mean = df_mean.rename(col_mapper(df_mean.columns, ka_columns), axis=1)
# Std
ka_columns = [
"Longitudinal_" + comp_name + "_Std_ka{} = {:.4f}".format(ik + 1, ka)
for ik, ka in enumerate(self.ka_values)
]
df_std = self.dataframe_slices["Longitudinal"][comp_name].std(level=1, axis="columns")
df_std = df_std.rename(col_mapper(df_std.columns, ka_columns), axis=1)
self.dataframe = concat([self.dataframe, df_mean, df_std], axis=1)
### Transverse
# Rename the columns with values of ka
ka_columns = [
"Transverse_" + comp_name + "_Mean_ka{} = {:.4f}".format(ik + 1, ka)
for ik, ka in enumerate(self.ka_values)
]
# Mean: level = 1 corresponds to averaging all the k harmonics with the same magnitude
tdf_mean = self.dataframe_slices["Transverse"][comp_name].mean(level=1, axis="columns")
tdf_mean = tdf_mean.rename(col_mapper(tdf_mean.columns, ka_columns), axis=1)
# Std
ka_columns = [
"Transverse_" + comp_name + "_Std_ka{} = {:.4f}".format(ik + 1, ka)
for ik, ka in enumerate(self.ka_values)
]
tdf_std = self.dataframe_slices["Transverse"][comp_name].std(level=1, axis="columns")
tdf_std = tdf_std.rename(col_mapper(tdf_std.columns, ka_columns), axis=1)
self.dataframe = concat([self.dataframe, tdf_mean, tdf_std], axis=1)
# Create the MultiIndex columns:
# Longitudinal_H-He_Mean_Frequencies | ka = 0.123456
# Longitudinal_H-He_Std_ka = 0.123456
tend = self.timer.current()
self.time_stamp(self.__long_name__ + " Calculation", self.timer.time_division(tend - tinit))
self.save_hdf()
[docs] def pretty_print(self):
"""Print current correlation function calculation parameters for help in choice of simulation parameters."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("k wavevector information saved in: \n", self.k_file)
print("v(k,t) data saved in: \n", self.vkt_hdf_file)
print("Data saved in: \n{}".format(self.filename_hdf))
print("Data accessible at: self.k_list, self.k_counts, self.ka_values, self.frequencies," " \n\t self.dataframe")
print("\nFrequency Space Parameters:")
print("\tNo. of slices = {}".format(self.no_slices))
print("\tNo. dumps per slice = {}".format(self.slice_steps))
print("\tFrequency step dw = 2 pi (no_slices * prod_dump_step)/(production_steps * dt)")
print("\tdw = {:1.4f} w_p = {:1.4e} [rad/s]".format(self.w_min / self.total_plasma_frequency, self.w_min))
print("\tMaximum Frequency w_max = 2 pi /(prod_dump_step * dt)")
print("\tw_max = {:1.4f} w_p = {:1.4e} [rad/s]".format(self.w_max / self.total_plasma_frequency, self.w_max))
print("\n\nWavevector parameters:")
print("Smallest wavevector k_min = 2 pi / L = 3.9 / N^(1/3)")
print("k_min = {:.4f} / a_ws = {:.4e} ".format(self.ka_values[0], self.ka_values[0] / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\nAngle averaging choice: {}".format(self.angle_averaging))
if self.angle_averaging == "full":
print("\tMaximum angle averaged k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_aa_harmonics))
print("\tLargest angle averaged k_max = k_min * sqrt( n_x^2 + n_y^2 + n_z^2)")
print(
"\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_aa_ka_value, self.max_aa_ka_value / self.a_ws),
end="",
)
print("[1/cm]" if self.units == "cgs" else "[1/m]")
elif self.angle_averaging == "custom":
print("\tMaximum angle averaged k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_aa_harmonics))
print("\tLargest angle averaged k_max = k_min * sqrt( n_x^2 + n_y^2 + n_z^2)")
print(
"\tAA k_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_aa_ka_value, self.max_aa_ka_value / self.a_ws),
end="",
)
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\tMaximum k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_k_harmonics))
print("\tLargest wavector k_max = k_min * n_x")
print("\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_ka_value, self.max_ka_value / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
elif self.angle_averaging == "principal_axis":
print("\tMaximum k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_k_harmonics))
print("\tLargest wavector k_max = k_min * n_x")
print("\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_ka_value, self.max_ka_value / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\nTotal number of k values to calculate = {}".format(len(self.k_list)))
print("No. of unique ka values to calculate = {}".format(len(self.ka_values)))
[docs]class DiffusionFlux(Observable):
"""Diffusion Fluxes and their Auto-correlation functions.\n
The :math:`\\alpha` diffusion flux :math:`\\mathbf J_{\\alpha}(t)` is calculated from eq.~(3.5) in :cite:`Zhou1996` which
reads
.. math::
\\mathbf J_{\\alpha}(t) = \\frac{m_{\\alpha}}{m_{\\rm tot} }
\\sum_{\\beta = 1}^{M} \\left ( m_{\\rm tot} \\delta_{\\alpha\\beta} - x_{\\alpha} m_{\\beta} \\right )
\\mathbf j_{\\beta} (t)
where :math:`M` is the total number of species, :math:`m_{\\rm tot} = \\sum_{i}^M m_{i}` is the total mass of the
system, :math:`x_{\\alpha} = N_{\\alpha} / N_{\\rm tot}` is the concentration of species :math:`\\alpha`, and
.. math::
\\mathbf j_{\\alpha}(t) = \\sum_{i = 1}^{N_{\\alpha}} \\mathbf v_{i}(t)
is the microscopic velocity field of species :math:`\\alpha`.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "diff_flux"
self.__long_name__ = "Diffusion Flux"
self.acf_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.no_fluxes = self.num_species - 1
self.no_fluxes_acf = int(self.no_fluxes * self.no_fluxes)
self.update_finish()
[docs] @compute_doc
def compute(self):
start_slice = 0
end_slice = self.slice_steps * self.dump_step
time = zeros(self.slice_steps)
# Initialize timer
t0 = self.timer.current()
df_str = "Diffusion Flux"
df_acf_str = "Diffusion Flux ACF"
for isl in range(self.no_slices):
print("\nCalculating diffusion flux and its acf for slice {}/{}.".format(isl + 1, self.no_slices))
# Parse the particles from the dump files
vel = zeros((self.dimensions, self.slice_steps, self.total_num_ptcls))
#
# print("\nParsing particles' velocities.")
for it, dump in enumerate(
tqdm(range(start_slice, end_slice, self.dump_step), desc="Read in data", disable=not self.verbose)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
vel[0, it, :] = datap["vel"][:, 0]
vel[1, it, :] = datap["vel"][:, 1]
vel[2, it, :] = datap["vel"][:, 2]
#
if isl == 0:
self.dataframe["Time"] = time
self.dataframe_slices["Time"] = time
self.dataframe_acf["Time"] = time
self.dataframe_acf_slices["Time"] = time
# This returns two arrays
# diff_fluxes = array of shape (no_fluxes, no_dim, no_dumps_per_slice)
# df_acf = array of shape (no_fluxes_acf, no_dim + 1, no_dumps_per_slice)
diff_fluxes, df_acf = calc_diff_flux_acf(
vel, self.species_num, self.species_concentrations, self.species_masses
)
# # Store the data
for i, flux in enumerate(diff_fluxes):
self.dataframe_slices[df_str + " {}_X_slice {}".format(i, isl)] = flux[0, :]
self.dataframe_slices[df_str + " {}_Y_slice {}".format(i, isl)] = flux[1, :]
self.dataframe_slices[df_str + " {}_Z_slice {}".format(i, isl)] = flux[2, :]
for i, flux_acf in enumerate(df_acf):
self.dataframe_acf_slices[df_acf_str + " {}_X_slice {}".format(i, isl)] = flux_acf[0, :]
self.dataframe_acf_slices[df_acf_str + " {}_Y_slice {}".format(i, isl)] = flux_acf[1, :]
self.dataframe_acf_slices[df_acf_str + " {}_Z_slice {}".format(i, isl)] = flux_acf[2, :]
self.dataframe_acf_slices[df_acf_str + " {}_Total_slice {}".format(i, isl)] = flux_acf[3, :]
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
# Average and std over the slices
for i in range(self.no_fluxes):
xcol_str = [df_str + " {}_X_slice {}".format(i, isl) for isl in range(self.no_slices)]
ycol_str = [df_str + " {}_Y_slice {}".format(i, isl) for isl in range(self.no_slices)]
zcol_str = [df_str + " {}_Z_slice {}".format(i, isl) for isl in range(self.no_slices)]
self.dataframe[df_str + " {}_X_Mean".format(i)] = self.dataframe_slices[xcol_str].mean(axis=1)
self.dataframe[df_str + " {}_X_Std".format(i)] = self.dataframe_slices[xcol_str].std(axis=1)
self.dataframe[df_str + " {}_Y_Mean".format(i)] = self.dataframe_slices[ycol_str].mean(axis=1)
self.dataframe[df_str + " {}_Y_Std".format(i)] = self.dataframe_slices[ycol_str].std(axis=1)
self.dataframe[df_str + " {}_Z_Mean".format(i)] = self.dataframe_slices[zcol_str].mean(axis=1)
self.dataframe[df_str + " {}_Z_Std".format(i)] = self.dataframe_slices[zcol_str].std(axis=1)
# Average and std over the slices
for i in range(self.no_fluxes_acf):
xcol_str = [df_acf_str + " {}_X_slice {}".format(i, isl) for isl in range(self.no_slices)]
ycol_str = [df_acf_str + " {}_Y_slice {}".format(i, isl) for isl in range(self.no_slices)]
zcol_str = [df_acf_str + " {}_Z_slice {}".format(i, isl) for isl in range(self.no_slices)]
tot_col_str = [df_acf_str + " {}_Total_slice {}".format(i, isl) for isl in range(self.no_slices)]
self.dataframe_acf[df_acf_str + " {}_X_Mean".format(i)] = self.dataframe_acf_slices[xcol_str].mean(axis=1)
self.dataframe_acf[df_acf_str + " {}_X_Std".format(i)] = self.dataframe_acf_slices[xcol_str].std(axis=1)
self.dataframe_acf[df_acf_str + " {}_Y_Mean".format(i)] = self.dataframe_acf_slices[ycol_str].mean(axis=1)
self.dataframe_acf[df_acf_str + " {}_Y_Std".format(i)] = self.dataframe_acf_slices[ycol_str].std(axis=1)
self.dataframe_acf[df_acf_str + " {}_Z_Mean".format(i)] = self.dataframe_acf_slices[zcol_str].mean(axis=1)
self.dataframe_acf[df_acf_str + " {}_Z_Std".format(i)] = self.dataframe_acf_slices[zcol_str].std(axis=1)
self.dataframe_acf[df_acf_str + " {}_Total_Mean".format(i)] = self.dataframe_acf_slices[tot_col_str].mean(
axis=1
)
self.dataframe_acf[df_acf_str + " {}_Total_Std".format(i)] = self.dataframe_acf_slices[tot_col_str].std(
axis=1
)
self.save_hdf()
tend = self.timer.current()
self.time_stamp("Diffusion Flux and its ACF Calculation", self.timer.time_division(tend - t0))
[docs] def pretty_print(self):
"""Print observable parameters for help in choice of simulation parameters."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("Data saved in: \n", self.filename_hdf)
print("Data accessible at: self.dataframe")
print("\nNo. of slices = {}".format(self.no_slices))
print("No. dumps per slice = {}".format(int(self.slice_steps / self.dump_step)))
print(
"Time interval of autocorrelation function = {:.4e} [s] ~ {} w_p T".format(
self.dt * self.slice_steps, int(self.dt * self.slice_steps * self.total_plasma_frequency)
)
)
[docs]class DynamicStructureFactor(Observable):
"""Dynamic Structure factor.
The species dependent DSF :math:`S_{AB}(k,\\omega)` is calculated from
.. math::
S_{AB }(k,\\omega) = \\int_0^\\infty dt \\,
\\left \\langle | n_{A}( \\mathbf k, t)n_{B}( -\\mathbf k, t) \\right \\rangle e^{i \\omega t},
where the microscopic density of species :math:`A` with number of particles :math:`N_{A}` is given by
.. math::
n_{A}(\\mathbf k,t) = \\sum^{N_{A}}_{j} e^{-i \\mathbf k \\cdot \\mathbf r_j(t)} .
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "dsf"
self.__long_name__ = "Dynamic Structure Factor"
self.kw_observable = True
self.k_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = 1, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] @compute_doc
def compute(self):
# Parse nkt otherwise calculate it
self.parse_kt_data(nkt_flag=True)
tinit = self.timer.current()
nkt_df = read_hdf(self.nkt_hdf_file, mode="r", key="nkt")
for isl in range(0, self.no_slices):
# Initialize container
nkt = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
for sp, sp_name in enumerate(self.species_names):
nkt[sp] = array(nkt_df["slice {}".format(isl + 1)][sp_name])
# Calculate Skw
Skw_all = calc_Skw(nkt, self.k_list, self.species_num, self.slice_steps, self.dt, self.dump_step)
# Create the dataframe's column names
slc_column = "slice {}".format(isl + 1)
ka_columns = ["ka = {:.8f}".format(ka) for ik, ka in enumerate(self.ka_values)]
# Save the full Skw into a Dataframe
sp_indx = 0
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
columns = [
"{}-{}_".format(sp1, sp2)
+ slc_column
+ "_{}_k = [{}, {}, {}]".format(
ka_columns[int(self.k_harmonics[ik, -1])], *self.k_harmonics[ik, :-2].astype(int)
)
for ik in range(len(self.k_harmonics))
]
self.dataframe_slices = concat(
[self.dataframe_slices, DataFrame(Skw_all[sp_indx, :, :].T, columns=columns)], axis=1
)
sp_indx += 1
# Create the MultiIndex
tuples = [tuple(c.split("_")) for c in self.dataframe_slices.columns]
self.dataframe_slices.columns = MultiIndex.from_tuples(
tuples, names=["species", "slices", "k_index", "k_harmonics"]
)
# Now for the actual df
self.dataframe[" _ _Frequencies"] = self.frequencies
# Take the mean and std and store them into the dataframe to return
for sp1, sp1_name in enumerate(self.species_names):
for sp2, sp2_name in enumerate(self.species_names[sp1:], sp1):
skw_name = "{}-{}".format(sp1_name, sp2_name)
# Rename the columns with values of ka
ka_columns = [skw_name + "_Mean_ka{} = {:.4f}".format(ik + 1, ka) for ik, ka in enumerate(self.ka_values)]
# Mean: level = 1 corresponds to averaging all the k harmonics with the same magnitude
df_mean = self.dataframe_slices[skw_name].mean(level=1, axis="columns")
df_mean = df_mean.rename(col_mapper(df_mean.columns, ka_columns), axis=1)
# Std
ka_columns = [skw_name + "_Std_ka{} = {:.4f}".format(ik + 1, ka) for ik, ka in enumerate(self.ka_values)]
df_std = self.dataframe_slices[skw_name].std(level=1, axis="columns")
df_std = df_std.rename(col_mapper(df_std.columns, ka_columns), axis=1)
self.dataframe = concat([self.dataframe, df_mean, df_std], axis=1)
tend = self.timer.current()
self.time_stamp(self.__long_name__ + " Calculation", self.timer.time_division(tend - tinit))
self.save_hdf()
[docs] def pretty_print(self):
"""Print dynamic structure factor calculation parameters for help in choice of simulation parameters."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("k wavevector information saved in: \n", self.k_file)
print("n(k,t) data saved in: \n", self.nkt_hdf_file)
print("Data saved in: \n", self.filename_hdf)
print("Data accessible at: self.k_list, self.k_counts, self.ka_values, self.frequencies, self.dataframe")
print("\nFrequency Space Parameters:")
print("\tNo. of slices = {}".format(self.no_slices))
print("\tNo. dumps per slice = {}".format(self.slice_steps))
print("\tFrequency step dw = 2 pi (no_slices * prod_dump_step)/(production_steps * dt)")
print("\tdw = {:1.4f} w_p = {:1.4e} [rad/s]".format(self.w_min / self.total_plasma_frequency, self.w_min))
print("\tMaximum Frequency w_max = 2 pi /(prod_dump_step * dt)")
print("\tw_max = {:1.4f} w_p = {:1.4e} [rad/s]".format(self.w_max / self.total_plasma_frequency, self.w_max))
print("\n\nWavevector parameters:")
print("Smallest wavevector k_min = 2 pi / L = 3.9 / N^(1/3)")
print("k_min = {:.4f} / a_ws = {:.4e} ".format(self.ka_values[0], self.ka_values[0] / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\nAngle averaging choice: {}".format(self.angle_averaging))
if self.angle_averaging == "full":
print("\tMaximum angle averaged k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_aa_harmonics))
print("\tLargest angle averaged k_max = k_min * sqrt( n_x^2 + n_y^2 + n_z^2)")
print(
"\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_aa_ka_value, self.max_aa_ka_value / self.a_ws),
end="",
)
print("[1/cm]" if self.units == "cgs" else "[1/m]")
elif self.angle_averaging == "custom":
print("\tMaximum angle averaged k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_aa_harmonics))
print("\tLargest angle averaged k_max = k_min * sqrt( n_x^2 + n_y^2 + n_z^2)")
print(
"\tAA k_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_aa_ka_value, self.max_aa_ka_value / self.a_ws),
end="",
)
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\tMaximum k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_k_harmonics))
print("\tLargest wavector k_max = k_min * n_x")
print("\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_ka_value, self.max_ka_value / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
elif self.angle_averaging == "principal_axis":
print("\tMaximum k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_k_harmonics))
print("\tLargest wavector k_max = k_min * n_x")
print("\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_ka_value, self.max_ka_value / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\nTotal number of k values to calculate = {}".format(len(self.k_list)))
print("No. of unique ka values to calculate = {}".format(len(self.ka_values)))
[docs]class ElectricCurrent(Observable):
"""Electric Current Auto-correlation function."""
[docs] def __init__(self):
super().__init__()
self.__name__ = "ec"
self.__long_name__ = "Electric Current"
self.acf_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments. e.g time_averaging and timesteps_to_skip
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] @compute_doc
def compute(self):
# Initialize timer
t0 = self.timer.current()
self.calculate_run_slices_data()
tend = self.timer.current()
self.time_stamp("Electric Current Calculation", self.timer.time_division(tend - t0))
[docs] def calculate_run_slices_data(self):
start_slice = 0
end_slice = self.slice_steps * self.dump_step
time = zeros(self.slice_steps)
# Loop over the slices of each run
for isl in range(self.no_slices):
print(f"\nCalculating electric current and its acf for slice {isl + 1}/{self.no_slices}.")
# Parse the particles from the dump files
vel = zeros((self.dimensions, self.slice_steps, self.total_num_ptcls))
#
# print("\nParsing particles' velocities.")
for it, dump in enumerate(
tqdm(range(start_slice, end_slice, self.dump_step), desc="Read in data", disable=not self.verbose)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
for d in range(self.dimensions):
vel[d, it, :] = datap["vel"][:, d]
#
if isl == 0:
self.dataframe["Time"] = time.copy()
self.dataframe_acf["Time"] = time.copy()
self.dataframe_slices["Time"] = time.copy()
self.dataframe_acf_slices["Time"] = time.copy()
species_current, total_current = calc_elec_current(vel, self.species_charges, self.species_num)
# Store species data
for i, sp_name in enumerate(self.species_names):
sp_col_str = f"{sp_name} " + self.__long_name__
sp_col_str_acf = f"{sp_name} " + self.__long_name__ + " ACF"
sp_tot_acf = zeros(len(species_current))
for d in range(self.dimensions):
dl = self.dim_labels[d]
self.dataframe_slices[sp_col_str + f"_{dl}_slice {isl}"] = species_current[i, d, :]
# Calculate ACF
sp_acf_dim = correlationfunction(species_current[i, d, :], species_current[i, d, :])
self.dataframe_acf_slices[sp_col_str_acf + f"_{dl}_slice {isl}"] = sp_acf_dim
sp_tot_acf += sp_acf_dim
# Store ACF
self.dataframe_acf_slices[sp_col_str_acf + f"_Total_slice {isl}"] = sp_tot_acf
# Total current and its ACF
tot_acf = zeros(len(total_current))
for d in range(self.dimensions):
dl = self.dim_labels[d]
col_str = self.__long_name__ + f"_{dl}_slice {isl}"
col_str_acf = self.__long_name__ + f" ACF_{dl}_slice {isl}"
self.dataframe_slices[col_str] = total_current[d, :]
# Calculate ACF
tot_acf_dim = correlationfunction(total_current[d, :], total_current[d, :])
tot_acf += tot_acf_dim
self.dataframe_acf_slices[col_str_acf] = tot_acf_dim
self.dataframe_acf_slices[self.__long_name__ + f"_Total_slice {isl}"] = tot_acf
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
self.average_slice_data()
self.save_hdf()
[docs] def average_slices_data(self):
"""Average and std over the slices."""
# Species data
for i, sp_name in enumerate(self.species_names):
col_str = f"{sp_name} " + self.__long_name__
col_str_acf = f"{sp_name} " + self.__long_name__ + " ACF"
for d in range(self.dimensions):
dl = self.dim_labels[d]
dim_col_str = [col_str + f"_{dl}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe[col_str + f"_{dl}_Mean"] = self.dataframe_slices[dim_col_str].mean(axis=1)
self.dataframe[col_str + f"_{dl}_Std"] = self.dataframe_slices[dim_col_str].std(axis=1)
# ACF averages
dim_col_str_acf = [col_str_acf + f"_{dl}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe_acf[col_str_acf + f"_{dl}_Mean"] = self.dataframe_acf_slices[dim_col_str_acf].mean(axis=1)
self.dataframe_acf[col_str_acf + f"_{dl}_Std"] = self.dataframe_acf_slices[dim_col_str_acf].std(axis=1)
tot_col_str = [col_str + f"_Total_slice {isl}" for isl in range(self.no_slices)]
self.dataframe_acf[col_str + "_Total_Mean"] = self.dataframe_acf_slices[tot_col_str].mean(axis=1)
self.dataframe_acf[col_str + "_Total_Std"] = self.dataframe_acf_slices[tot_col_str].std(axis=1)
# Average and std over the slices
# Total Current data
for d in range(self.dimensions):
dl = self.dim_labels[d]
dim_col_str = [self.__long_name__ + f"_{dl}_slice {isl}" for isl in range(self.no_slices)]
dim_col_str_acf = [self.__long_name__ + f" ACF_{dl}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe[dim_col_str + f"_{dl}_Mean"] = self.dataframe_slices[dim_col_str].mean(axis=1)
self.dataframe[dim_col_str + f"_{dl}_Std"] = self.dataframe_slices[dim_col_str].std(axis=1)
# ACF
self.dataframe_acf[dim_col_str_acf + f"_{dl}_Mean"] = self.dataframe_acf_slices[dim_col_str_acf].mean(axis=1)
self.dataframe_acf[dim_col_str_acf + f"_{dl}_Std"] = self.dataframe_acf_slices[dim_col_str_acf].std(axis=1)
# Total ACF
tot_col_str = [self.__long_name__ + f" ACF_Total_slice {isl}" for isl in range(self.no_slices)]
self.dataframe_acf[self.__long_name__ + f" ACF_Total_Mean"] = self.dataframe_acf_slices[tot_col_str].mean(axis=1)
self.dataframe_acf[self.__long_name__ + f" ACF_Total_Std"] = self.dataframe_acf_slices[tot_col_str].std(axis=1)
[docs]class PressureTensor(Observable):
"""Pressure Tensor."""
[docs] def __init__(self):
super().__init__()
self.__name__ = "pressure_tensor"
self.__long_name__ = "Pressure Tensor"
self.acf_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase, no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(**kwargs)
self.update_finish()
[docs] def df_column_names(self):
# Dataframes' columns names
pt_str_kin = "Pressure Tensor Kinetic"
pt_str_pot = "Pressure Tensor Potential"
pt_str = "Pressure Tensor"
pt_acf_str_kin = "Pressure Tensor Kinetic ACF"
pt_acf_str_pot = "Pressure Tensor Potential ACF"
pt_acf_str_kinpot = "Pressure Tensor Kin-Pot ACF"
pt_acf_str_potkin = "Pressure Tensor Pot-Kin ACF"
pt_acf_str = "Pressure Tensor ACF"
# Pre compute the number of columns in the dataframe and make the list of names
if self.dimensions == 3:
dim_lbl = ["x", "y", "z"]
elif self.dimensions == 2:
dim_lbl = ["x", "y"]
# Slice Dataframe
slice_df_column_names = ["Time"]
for isl in range(self.no_slices):
slice_df_column_names.append(f"Pressure_slice {isl}")
slice_df_column_names.append(f"Delta Pressure_slice {isl}")
for i, ax1 in enumerate(dim_lbl):
for j, ax2 in enumerate(dim_lbl):
slice_df_column_names.append(pt_str_kin + f" {ax1}{ax2}_slice {isl}")
slice_df_column_names.append(pt_str_pot + f" {ax1}{ax2}_slice {isl}")
slice_df_column_names.append(pt_str + f" {ax1}{ax2}_slice {isl}")
# ACF Slice Dataframe
acf_slice_df_column_names = ["Time"]
for isl in range(self.no_slices):
acf_slice_df_column_names.append(f"Pressure ACF_slice {isl}")
acf_slice_df_column_names.append(f"Delta Pressure ACF_slice {isl}")
for ax1 in dim_lbl:
for ax2 in dim_lbl:
for ax3 in dim_lbl:
for ax4 in dim_lbl:
acf_slice_df_column_names.append(pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
acf_slice_df_column_names.append(pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
acf_slice_df_column_names.append(pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
acf_slice_df_column_names.append(pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
acf_slice_df_column_names.append(pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}")
# Mean and std Dataframe
df_column_names = ["Time", "Pressure_Mean", "Pressure_Std", "Delta Pressure_Mean", "Delta Pressure_Std"]
for i, ax1 in enumerate(dim_lbl):
for j, ax2 in enumerate(dim_lbl):
df_column_names.append(pt_str_kin + f" {ax1}{ax2}_Mean")
df_column_names.append(pt_str_kin + f" {ax1}{ax2}_Std")
df_column_names.append(pt_str_pot + f" {ax1}{ax2}_Mean")
df_column_names.append(pt_str_pot + f" {ax1}{ax2}_Std")
df_column_names.append(pt_str + f" {ax1}{ax2}_Mean")
df_column_names.append(pt_str + f" {ax1}{ax2}_Std")
# Mean and std Dataframe
acf_df_column_names = [
"Time",
"Pressure ACF_Mean",
"Pressure ACF_Std",
"Delta Pressure ACF_Mean",
"Delta Pressure ACF_Std",
]
for ax1 in dim_lbl:
for ax2 in dim_lbl:
for ax3 in dim_lbl:
for ax4 in dim_lbl:
acf_df_column_names.append(pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_Std")
acf_df_column_names.append(pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_Std")
acf_df_column_names.append(pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_Std")
acf_df_column_names.append(pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_Std")
acf_df_column_names.append(pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_Mean")
acf_df_column_names.append(pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_Std")
self.dataframe = pd.DataFrame(columns=df_column_names)
self.dataframe_slices = pd.DataFrame(columns=slice_df_column_names)
self.dataframe_acf = pd.DataFrame(columns=acf_df_column_names)
self.dataframe_acf_slices = pd.DataFrame(columns=acf_slice_df_column_names)
[docs] @compute_doc
def compute(self):
start_slice = 0
end_slice = self.slice_steps * self.dump_step
time = zeros(self.slice_steps)
# Initialize timer
t0 = self.timer.current()
# Dataframes' columns names
pt_str_kin = "Pressure Tensor Kinetic"
pt_str_pot = "Pressure Tensor Potential"
pt_str = "Pressure Tensor"
pt_acf_str_kin = "Pressure Tensor Kinetic ACF"
pt_acf_str_pot = "Pressure Tensor Potential ACF"
pt_acf_str_kinpot = "Pressure Tensor Kin-Pot ACF"
pt_acf_str_potkin = "Pressure Tensor Pot-Kin ACF"
pt_acf_str = "Pressure Tensor ACF"
if self.dimensions == 3:
dim_lbl = ["x", "y", "z"]
elif self.dimensions == 2:
dim_lbl = ["x", "y"]
# Pre compute the number of columns in the dataframe and make the list of names
self.df_column_names()
# Initialize timer
t0 = self.timer.current()
# Let's compute
for isl in range(self.no_slices):
print("\nCalculating stress tensor and the acfs for slice {}/{}.".format(isl + 1, self.no_slices))
# Parse the particles from the dump files
pressure = zeros(self.slice_steps)
pt_kin_temp = zeros((self.dimensions, self.dimensions, self.slice_steps))
pt_pot_temp = zeros((self.dimensions, self.dimensions, self.slice_steps))
pt_temp = zeros((self.dimensions, self.dimensions, self.slice_steps))
for it, dump in enumerate(
tqdm(
range(start_slice, end_slice, self.dump_step),
desc="Calculating Pressure Tensor",
disable=not self.verbose,
)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
pressure[it], pt_kin_temp[:, :, it], pt_pot_temp[:, :, it], pt_temp[:, :, it] = calc_pressure_tensor(
datap["vel"], datap["virial"], self.species_masses, self.species_num, self.box_volume, self.dimensions
)
if isl == 0:
self.dataframe["Time"] = time.copy()
self.dataframe_acf["Time"] = time.copy()
self.dataframe_slices["Time"] = time.copy()
self.dataframe_acf_slices["Time"] = time.copy()
self.dataframe_slices["Pressure_slice {}".format(isl)] = pressure
self.dataframe_acf_slices["Pressure ACF_slice {}".format(isl)] = correlationfunction(pressure, pressure)
# This is needed for the bulk viscosity
delta_pressure = pressure - pressure.mean()
self.dataframe_slices["Delta Pressure_slice {}".format(isl)] = delta_pressure
self.dataframe_acf_slices["Delta Pressure ACF_slice {}".format(isl)] = correlationfunction(
delta_pressure, delta_pressure
)
for i, ax1 in enumerate(dim_lbl):
pt_temp[i, i, :] -= pressure.mean()
# The reason for dividing these two loops is that I want a specific order in the dataframe.
# Pressure, Stress Tensor, All
for i, ax1 in enumerate(dim_lbl):
for j, ax2 in enumerate(dim_lbl):
self.dataframe_slices[pt_str_kin + " {}{}_slice {}".format(ax1, ax2, isl)] = pt_kin_temp[i, j, :]
self.dataframe_slices[pt_str_pot + " {}{}_slice {}".format(ax1, ax2, isl)] = pt_pot_temp[i, j, :]
self.dataframe_slices[pt_str + " {}{}_slice {}".format(ax1, ax2, isl)] = pt_temp[i, j, :]
# Calculate the ACF of the thermal fluctuations of the pressure tensor elements
# Note: C_{abcd} = < sigma_{ab} sigma_{cd} >
for i, ax1 in enumerate(dim_lbl):
for j, ax2 in enumerate(dim_lbl):
for k, ax3 in enumerate(dim_lbl):
for l, ax4 in enumerate(dim_lbl):
C_ijkl_kin = correlationfunction(pt_kin_temp[i, j, :], pt_kin_temp[k, l, :])
C_ijkl_pot = correlationfunction(pt_pot_temp[i, j, :], pt_pot_temp[k, l, :])
C_ijkl_kinpot = correlationfunction(pt_kin_temp[i, j, :], pt_pot_temp[k, l, :])
C_ijkl_potkin = correlationfunction(pt_pot_temp[i, j, :], pt_kin_temp[k, l, :])
C_ijkl = correlationfunction(pt_temp[i, j, :], pt_temp[k, l, :])
# Kinetic
self.dataframe_acf_slices[
pt_acf_str_kin + " {}{}{}{}_slice {}".format(ax1, ax2, ax3, ax4, isl)
] = C_ijkl_kin
# Potential
self.dataframe_acf_slices[
pt_acf_str_pot + " {}{}{}{}_slice {}".format(ax1, ax2, ax3, ax4, isl)
] = C_ijkl_pot
# Kin-Pot
self.dataframe_acf_slices[
pt_acf_str_kinpot + " {}{}{}{}_slice {}".format(ax1, ax2, ax3, ax4, isl)
] = C_ijkl_kinpot
# Pot-Kin
self.dataframe_acf_slices[
pt_acf_str_potkin + " {}{}{}{}_slice {}".format(ax1, ax2, ax3, ax4, isl)
] = C_ijkl_potkin
# Total
self.dataframe_acf_slices[
pt_acf_str + " {}{}{}{}_slice {}".format(ax1, ax2, ax3, ax4, isl)
] = C_ijkl
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
# end of slice loop
# Average and std over the slices
col_str = ["Pressure_slice {}".format(isl) for isl in range(self.no_slices)]
self.dataframe["Pressure_Mean"] = self.dataframe_slices[col_str].mean(axis=1)
self.dataframe["Pressure_Std"] = self.dataframe_slices[col_str].std(axis=1)
col_str = ["Pressure ACF_slice {}".format(isl) for isl in range(self.no_slices)]
self.dataframe_acf["Pressure ACF_Mean"] = self.dataframe_acf_slices[col_str].mean(axis=1)
self.dataframe_acf["Pressure ACF_Std"] = self.dataframe_acf_slices[col_str].std(axis=1)
col_str = ["Delta Pressure_slice {}".format(isl) for isl in range(self.no_slices)]
self.dataframe["Delta Pressure_Mean"] = self.dataframe_slices[col_str].mean(axis=1)
self.dataframe["Delta Pressure_Std"] = self.dataframe_slices[col_str].std(axis=1)
col_str = ["Delta Pressure ACF_slice {}".format(isl) for isl in range(self.no_slices)]
self.dataframe_acf["Delta Pressure ACF_Mean"] = self.dataframe_acf_slices[col_str].mean(axis=1)
self.dataframe_acf["Delta Pressure ACF_Std"] = self.dataframe_acf_slices[col_str].std(axis=1)
for i, ax1 in enumerate(dim_lbl):
for j, ax2 in enumerate(dim_lbl):
# Kinetic Terms
ij_col_str = [pt_str_kin + f" {ax1}{ax2}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe[pt_str_kin + f" {ax1}{ax2}_Mean"] = self.dataframe_slices[ij_col_str].mean(axis=1)
self.dataframe[pt_str_kin + f" {ax1}{ax2}_Std"] = self.dataframe_slices[ij_col_str].std(axis=1)
# Potential Terms
ij_col_str = [pt_str_pot + f" {ax1}{ax2}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe[pt_str_pot + f" {ax1}{ax2}_Mean"] = self.dataframe_slices[ij_col_str].mean(axis=1)
self.dataframe[pt_str_pot + f" {ax1}{ax2}_Std"] = self.dataframe_slices[ij_col_str].std(axis=1)
# Full
ij_col_str = [pt_str + f" {ax1}{ax2}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe[pt_str + f" {ax1}{ax2}_Mean"] = self.dataframe_slices[ij_col_str].mean(axis=1)
self.dataframe[pt_str + f" {ax1}{ax2}_Std"] = self.dataframe_slices[ij_col_str].std(axis=1)
for i, ax1 in enumerate(dim_lbl):
for j, ax2 in enumerate(dim_lbl):
for k, ax3 in enumerate(dim_lbl):
for l, ax4 in enumerate(dim_lbl):
# Kinetic Terms
ij_col_acf_str = [
pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
mean_column = pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_Mean"
std_column = pt_acf_str_kin + f" {ax1}{ax2}{ax3}{ax4}_Std"
self.dataframe_acf[mean_column] = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1)
self.dataframe_acf[std_column] = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1)
# Potential Terms
ij_col_acf_str = [
pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
mean_column = pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_Mean"
std_column = pt_acf_str_pot + f" {ax1}{ax2}{ax3}{ax4}_Std"
self.dataframe_acf[mean_column] = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1)
self.dataframe_acf[std_column] = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1)
# Kinetic-Potential Terms
ij_col_acf_str = [
pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
mean_column = pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_Mean"
std_column = pt_acf_str_kinpot + f" {ax1}{ax2}{ax3}{ax4}_Std"
self.dataframe_acf[mean_column] = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1)
self.dataframe_acf[std_column] = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1)
# Potential-Kinetic Terms
ij_col_acf_str = [
pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
mean_column = pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_Mean"
std_column = pt_acf_str_potkin + f" {ax1}{ax2}{ax3}{ax4}_Std"
self.dataframe_acf[mean_column] = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1)
self.dataframe_acf[std_column] = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1)
# Full
ij_col_acf_str = [
pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_slice {isl}" for isl in range(self.no_slices)
]
mean_column = pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_Mean"
std_column = pt_acf_str + f" {ax1}{ax2}{ax3}{ax4}_Std"
self.dataframe_acf[mean_column] = self.dataframe_acf_slices[ij_col_acf_str].mean(axis=1)
self.dataframe_acf[std_column] = self.dataframe_acf_slices[ij_col_acf_str].std(axis=1)
self.save_hdf()
tend = self.timer.current()
self.time_stamp("Stress Tensor and ACF Calculation", self.timer.time_division(tend - t0))
[docs] def sum_rule(self, beta, rdf, potential):
r"""
Calculate the sum rule integrals from the rdf.
.. math::
:nowrap:
\begin{eqnarray}
\sigma_{zzzz} & = & \frac{n}{\beta^2} \left [ 3 +
\frac{2\beta}{15} I^{(1)} + \frac{\beta}{5} I^{(2)} \right ] , \\
\sigma_{zzxx} & =& \frac{n}{\beta^2} \left [ 1
- \frac{2\beta}{5} I^{(1)} + \frac {\beta}{15} I^{(2)} \right ] , \\
\sigma_{xyxy} & = & \frac{n}{\beta^2} \left [ 1 +
\frac{4\beta}{15} I^{(2)} + \frac {\beta}{15} I^{(2)} \right ] ,
\end{eqnarray}
where :math:`I^{(k)} = \sum_{A} \sum_{B \geq A}I_{AB}^{(\rm {Hartree}, k)} + I_{AB}^{(\rm {Corr}, k)}`
calculated from
:meth:`sarkas.tools.observables.RadialDistributionFunction.compute_sum_rule_integrals`.
Parameters
----------
beta: float
Inverse temperature factor. Grab it from :py:attr:`sarkas.tools.observables.Thermodynamics.beta`.
rdf: sarkas.tools.observables.RadialDistributionFunction
Radial Distribution function object.
potential : :class:`sarkas.potentials.core.Potential`
Potential object.
Returns
-------
sigma_zzzz : float
See above integral.
sigma_zzxx : float
See above integral.
sigma_xyxy : float
See above integral.
"""
hartrees, corrs = rdf.compute_sum_rule_integrals(potential)
# Eq. 2.3.43 -44 in Boon and Yip
I_1 = hartrees[:, 1].sum() + corrs[:, 1].sum()
I_2 = hartrees[:, 2].sum() + corrs[:, 2].sum()
nkT = self.total_num_density / beta
sigma_zzzz = 3.0 * nkT + 2.0 / 15.0 * I_1 + I_2 / 5.0
sigma_zzxx = nkT - 2.0 / 5.0 * I_1 + I_2 / 15.0
sigma_xyxy = nkT + 4.0 / 15.0 * I_1 + I_2 / 15.0
return sigma_zzzz, sigma_zzxx, sigma_xyxy
[docs] def pretty_print(self):
"""Print observable parameters for help in choice of simulation parameters."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("Data saved in: \n", self.filename_hdf)
print("Data accessible at: self.dataframe")
print("\nNo. of slices = {}".format(self.no_slices))
print("No. dumps per slice = {}".format(int(self.slice_steps / self.dump_step)))
print(
"Time interval of autocorrelation function = {:.4e} [s] ~ {} w_p T".format(
self.dt * self.slice_steps, int(self.dt * self.slice_steps * self.total_plasma_frequency)
)
)
[docs]class RadialDistributionFunction(Observable):
"""
Radial Distribution Function.
Attributes
----------
no_bins : int
Number of bins.
dr_rdf : float
Size of each bin.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "rdf"
self.__long_name__ = "Radial Distribution Function"
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase=phase, no_slices=no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
# These definitions are needed for the print out.
self.rc = self.cutoff_radius
self.no_bins = self.rdf_nbins
self.dr_rdf = self.rc / self.no_bins
self.update_finish()
[docs] @compute_doc
def compute(self):
# initialize temporary arrays
r_values = zeros(self.no_bins)
bin_vol = zeros(self.no_bins)
pair_density = zeros((self.num_species, self.num_species))
# This is needed to be certain the number of bins is the same.
# if not isinstance(rdf_hist, ndarray):
# # Find the last dump by looking for the largest number in the checkpoints filenames
# dumps_list = listdir(self.dump_dir)
# dumps_list.sort(key=num_sort)
# name, ext = os.path.splitext(dumps_list[-1])
# _, number = name.split('_')
datap = load_from_restart(self.dump_dir, 0)
# Make sure you are getting the right number of bins and redefine dr_rdf.
self.no_bins = datap["rdf_hist"].shape[0]
self.dr_rdf = self.rc / self.no_bins
t0 = self.timer.current()
# No. of pairs per volume
for i, sp1 in enumerate(self.species_num):
pair_density[i, i] = sp1 * (sp1 - 1) / self.box_volume
if self.num_species > 1:
for j, sp2 in enumerate(self.species_num[i + 1 :], i + 1):
pair_density[i, j] = sp1 * sp2 / self.box_volume
# Calculate the volume of each bin
# The formula for the N-dimensional sphere is
# pi^{N/2}/( factorial( N/2) )
# from https://en.wikipedia.org/wiki/N-sphere#:~:text=In%20general%2C%20the-,volume,-%2C%20in%20n%2Ddimensional
sphere_shell_const = (pi ** (self.dimensions / 2.0)) / factorial(self.dimensions / 2.0)
bin_vol[0] = sphere_shell_const * self.dr_rdf**self.dimensions
for ir in range(1, self.no_bins):
r1 = ir * self.dr_rdf
r2 = (ir + 1) * self.dr_rdf
bin_vol[ir] = sphere_shell_const * (r2**self.dimensions - r1**self.dimensions)
r_values[ir] = (ir + 0.5) * self.dr_rdf
# Save the ra values for simplicity
self.ra_values = r_values / self.a_ws
self.dataframe["Distance"] = r_values
self.dataframe_slices["Distance"] = r_values
for isl in tqdm(range(self.no_slices), disable=not self.verbose):
# Grab the data from the dumps. The -1 is for '0'-indexing
dump_no = (isl + 1) * (self.slice_steps - 1) * self.dump_step
datap = load_from_restart(self.dump_dir, int(dump_no))
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:], i):
denom_const = pair_density[i, j] * self.slice_steps * self.dump_step
col_str = "{}-{} RDF_slice {}".format(sp1, sp2, isl)
self.dataframe_slices[col_str] = (
(datap["rdf_hist"][:, i, j] + datap["rdf_hist"][:, j, i]) / denom_const / bin_vol
)
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:], i):
col_str = ["{}-{} RDF_slice {}".format(sp1, sp2, isl) for isl in range(self.no_slices)]
self.dataframe["{}-{} RDF_Mean".format(sp1, sp2)] = self.dataframe_slices[col_str].mean(axis=1)
self.dataframe["{}-{} RDF_Std".format(sp1, sp2)] = self.dataframe_slices[col_str].std(axis=1)
self.save_hdf()
self.save_pickle()
tend = self.timer.current()
self.time_stamp(self.__long_name__ + " Calculation", self.timer.time_division(tend - t0))
[docs] def compute_sum_rule_integrals(self, potential):
"""
Compute integrals of the RDF used in sum rules. \n
The species dependent integrals are
.. math::
I_{AB}^{\\rm (Hartree, k)} = 2^{D - 2} \\pi n_{A} n_{B} \\int_0^{\\infty} dr \\,
r^{D - 1 + k} \\frac{d^k}{dr^k} \\phi_{AB}(r),
.. math::
I_{AB}^{\\rm (Corr, k)} = 2^{D - 2} \\pi n_{A} n_{B} \\int_0^{\\infty} dr \\,
r^{D - 1 + k} h_{AB} (r) \\frac{d^k}{dr^k} \\phi_{AB}(r),
where :math:`D` is the number of dimensions, :math:`k = {0, 1, 2}`,
and :math:`\\phi_{AB}(r)` is the potential between species :math:`A` and :math:`B`. \n
Only Coulomb and Yukawa potentials are supported at the moment.
Parameters
----------
potential : :class:`sarkas.potentials.core.Potential`
Sarkas Potential object. Needed for all its attributes.
Returns
-------
hartrees : numpy.ndarray
Hartree integrals with :math:`k = {0, 1, 2}`. \n
Shape = ( :py:attr:`sarkas.tools.observables.Observable.no_obs`, 3).
corrs : numpy.ndarray
Correlational integrals with :math:`k = {0, 1, 2}`. \n
Shape = ( :py:attr:`sarkas.tools.observables.Observable.no_obs`, 3).
"""
r = self.dataframe["Distance"].iloc[:, 0].to_numpy().copy()
dims = self.dimensions
dim_const = 2.0 ** (dims - 2) * pi
if r[0] == 0.0:
r[0] = r[1]
r2 = r * r
r3 = r2 * r
corrs = zeros((self.no_obs, 3))
hartrees = zeros((self.no_obs, 3))
obs_indx = 0
for sp1, sp1_name in enumerate(self.species_names):
for sp2, sp2_name in enumerate(self.species_names[sp1:], sp1):
h_r = self.dataframe[(f"{sp1_name}-{sp2_name} RDF", "Mean")].to_numpy() - 1.0
if potential.type == "coulomb":
u_r = potential.matrix[0, sp1, sp2] / r
dv_dr = -potential.matrix[0, sp1, sp2] / r2
d2v_dr2 = 2.0 * potential.matrix[0, sp1, sp2] / r3
# Check for finiteness of first element when r[0] = 0.0
if not isfinite(dv_dr[0]):
dv_dr[0] = dv_dr[1]
d2v_dr2[0] = d2v_dr2[1]
elif potential.type == "yukawa":
kappa = potential.matrix[1, sp1, sp2]
u_r = potential.matrix[0, sp1, sp2] * exp(-kappa * r) / r
dv_dr = -(1.0 + kappa * r) * u_r / r
d2v_dr2 = -u_r / r - (1.0 + kappa * r) ** 2 * u_r / r2 + (1.0 + kappa * r) * u_r / r2
# Check for finiteness of first element when r[0] = 0.0
if not isfinite(dv_dr[0]):
dv_dr[0] = dv_dr[1]
d2v_dr2[0] = d2v_dr2[1]
elif potential.type == "lj":
epsilon = potential.matrix[0, sp1, sp2]
sigma = potential.matrix[1, sp1, sp2]
s_over_r = sigma / r
s_over_r_high = s_over_r ** potential.matrix[2, sp1, sp2]
s_over_r_low = s_over_r ** potential.matrix[3, sp1, sp2]
u_r = epsilon * (s_over_r_high - s_over_r_low)
dv_dr = (
-epsilon
* (potential.matrix[2, sp1, sp2] * s_over_r_high - potential.matrix[3, sp1, sp2] * s_over_r_low)
/ r
)
d2v_dr2 = (
epsilon
* (
potential.matrix[2, sp1, sp2] * (potential.matrix[2, sp1, sp2] + 1) * s_over_r_high
- potential.matrix[3, sp1, sp2] * (potential.matrix[3, sp1, sp2] + 1) * s_over_r_low
)
/ r2
)
else:
raise ValueError("Unknown potential")
densities = self.species_num_dens[sp1] * self.species_num_dens[sp2]
hartrees[obs_indx, 0] = dim_const * densities * trapz(u_r * r ** (dims - 1), x=r)
corrs[obs_indx, 0] = dim_const * densities * trapz(u_r * h_r * r ** (dims - 1), x=r)
hartrees[obs_indx, 1] = dim_const * densities * trapz(dv_dr * r**dims, x=r)
corrs[obs_indx, 1] = dim_const * densities * trapz(dv_dr * h_r * r**dims, x=r)
hartrees[obs_indx, 2] = dim_const * densities * trapz(d2v_dr2 * r ** (dims + 1), x=r)
corrs[obs_indx, 2] = dim_const * densities * trapz(d2v_dr2 * h_r * r ** (dims + 1), x=r)
obs_indx += 1
return hartrees, corrs
[docs] def pretty_print(self):
"""Print radial distribution function calculation parameters for help in choice of simulation parameters."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("Data saved in: \n", self.filename_hdf)
print("Data accessible at: self.ra_values, self.dataframe")
print("\nNo. bins = {}".format(self.no_bins))
print("dr = {:1.4f} a_ws = {:1.4e} ".format(self.dr_rdf / self.a_ws, self.dr_rdf), end="")
print("[cm]" if self.units == "cgs" else "[m]")
print(
"Maximum Distance (i.e. potential.rc)= {:1.4f} a_ws = {:1.4e} ".format(self.rc / self.a_ws, self.rc), end=""
)
print("[cm]" if self.units == "cgs" else "[m]")
[docs]class StaticStructureFactor(Observable):
"""
Static Structure Factors.
The species dependent SSF :math:`S_{AB}(\\mathbf k)` is calculated from
.. math::
S_{AB }(\\mathbf k) = \\int_0^\\infty dt \\,
\\left \\langle | n_{A}( \\mathbf k, t)n_{B}( -\\mathbf k, t) \\right \\rangle,
where the microscopic density of species :math:`A` with number of particles :math:`N_{A}` is given by
.. math::
n_{A}(\\mathbf k,t) = \\sum^{N_{A}}_{j} e^{-i \\mathbf k \\cdot \\mathbf r_j(t)} .
Attributes
----------
k_list : list
List of all possible :math:`k` vectors with their corresponding magnitudes and indexes.
k_counts : numpy.ndarray
Number of occurrences of each :math:`k` magnitude.
ka_values : numpy.ndarray
Magnitude of each allowed :math:`ka` vector.
no_ka_values: int
Length of ``ka_values`` array.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "ssf"
self.__long_name__ = "Static Structure Function"
self.k_observable = True
self.kw_observable = False
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase=phase, no_slices=no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] @compute_doc
def compute(self):
self.parse_kt_data(nkt_flag=True)
no_dumps_calculated = self.slice_steps * self.no_slices
Sk_avg = zeros((self.no_obs, len(self.k_counts), no_dumps_calculated))
print("\nCalculating S(k) ...")
k_column = "Inverse Wavelength"
self.dataframe_slices[k_column] = self.k_values
self.dataframe[k_column] = self.k_values
tinit = self.timer.current()
nkt_df = read_hdf(self.nkt_hdf_file, mode="r", key="nkt")
for isl in tqdm(range(self.no_slices)):
# Initialize container
nkt = zeros((self.num_species, self.slice_steps, len(self.k_list)), dtype=complex128)
for sp, sp_name in enumerate(self.species_names):
nkt[sp] = array(nkt_df["slice {}".format(isl + 1)][sp_name])
init = isl * self.slice_steps
fin = (isl + 1) * self.slice_steps
Sk_avg[:, :, init:fin] = calc_Sk(nkt, self.k_list, self.k_counts, self.species_num, self.slice_steps)
sp_indx = 0
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
column = "{}-{} SSF_slice {}".format(sp1, sp2, isl)
self.dataframe_slices[column] = Sk_avg[sp_indx, :, init:fin].mean(axis=-1)
sp_indx += 1
for i, sp1 in enumerate(self.species_names):
for j, sp2 in enumerate(self.species_names[i:]):
column = ["{}-{} SSF_slice {}".format(sp1, sp2, isl) for isl in range(self.no_slices)]
self.dataframe["{}-{} SSF_Mean".format(sp1, sp2)] = self.dataframe_slices[column].mean(axis=1)
self.dataframe["{}-{} SSF_Std".format(sp1, sp2)] = self.dataframe_slices[column].std(axis=1)
self.save_hdf()
tend = self.timer.current()
self.time_stamp(self.__long_name__ + " Calculation", self.timer.time_division(tend - tinit))
[docs] def pretty_print(self):
"""Print static structure factor calculation parameters for help in choice of simulation parameters."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("k wavevector information saved in: \n", self.k_file)
print("n(k,t) Data saved in: \n", self.nkt_hdf_file)
print("Data saved in: \n", self.filename_hdf)
print("Data accessible at: self.k_list, self.k_counts, self.ka_values, self.dataframe")
print("\nSmallest wavevector k_min = 2 pi / L = 3.9 / N^(1/3)")
print("k_min = {:.4f} / a_ws = {:.4e} ".format(self.ka_values[0], self.ka_values[0] / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\nAngle averaging choice: {}".format(self.angle_averaging))
if self.angle_averaging == "full":
print("\tMaximum angle averaged k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_aa_harmonics))
print("\tLargest angle averaged k_max = k_min * sqrt( n_x^2 + n_y^2 + n_z^2)")
print(
"\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_aa_ka_value, self.max_aa_ka_value / self.a_ws),
end="",
)
print("[1/cm]" if self.units == "cgs" else "[1/m]")
elif self.angle_averaging == "custom":
print("\tMaximum angle averaged k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_aa_harmonics))
print("\tLargest angle averaged k_max = k_min * sqrt( n_x^2 + n_y^2 + n_z^2)")
print(
"\tAA k_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_aa_ka_value, self.max_aa_ka_value / self.a_ws),
end="",
)
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\tMaximum k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_k_harmonics))
print("\tLargest wavector k_max = k_min * n_x")
print("\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_ka_value, self.max_ka_value / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
elif self.angle_averaging == "principal_axis":
print("\tMaximum k harmonics = n_x, n_y, n_z = {}, {}, {}".format(*self.max_k_harmonics))
print("\tLargest wavector k_max = k_min * n_x")
print("\tk_max = {:.4f} / a_ws = {:1.4e} ".format(self.max_ka_value, self.max_ka_value / self.a_ws), end="")
print("[1/cm]" if self.units == "cgs" else "[1/m]")
print("\nTotal number of k values to calculate = {}".format(len(self.k_list)))
print("No. of unique ka values to calculate = {}".format(len(self.ka_values)))
[docs]class Thermodynamics(Observable):
"""
Thermodynamic functions.
"""
[docs] def __init__(self):
super().__init__()
self.__name__ = "therm"
self.__long_name__ = "Thermodynamics"
[docs] def setup(self, params, phase: str = None, **kwargs):
"""
Assign attributes from simulation's parameters.
Parameters
----------
params : sarkas.core.Parameters
Simulation's parameters.
phase : str, optional
Phase to compute. Default = 'production'.
**kwargs :
These will overwrite any :class:`sarkas.core.Parameters`
or default :class:`sarkas.tools.observables.Observable`
attributes and/or add new ones.
"""
super().setup_init(params, phase)
if params.load_method == "restart":
self.restart_sim = True
else:
self.restart_sim = False
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
if self.phase.lower() == "production":
self.saving_dir = self.production_dir
elif self.phase.lower() == "equilibration":
self.saving_dir = self.equilibration_dir
elif self.phase.lower() == "magnetization":
self.saving_dir = self.magnetization_dir
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
[docs] def compute_from_rdf(self, rdf, potential):
"""
Calculate the correlational energy and correlation pressure using
:meth:`sarkas.tools.observables.RadialDistributionFunction.compute_sum_rule_integrals` method.
The Hartree and correlational terms between species :math:`A` and :math:`B` are
.. math::
U_{AB}^{\\rm hartree} = 2 \\pi \\frac{N_iN_j}{V} \\int_0^\\infty dr \\, \\phi_{AB}(r) r^2 dr,
.. math::
U_{AB}^{\\rm corr} = 2 \\pi \\frac{N_iN_j}{V} \\int_0^\\infty dr \\, \\phi_{AB}(r) h(r) r^2 dr,
.. math::
P_{AB}^{\\rm hartree} = - \\frac{2 \\pi}{3} \\frac{N_iN_j}{V^2} \\int_0^\\infty dr \\, \\frac{d\\phi_{AB}(r)}{dr} r^3 dr,
.. math::
P_{AB}^{\\rm corr} = - \\frac{2 \\pi}{3} \\frac{N_iN_j}{V^2} \\int_0^\\infty dr \\, \\frac{d\\phi_{AB}(r)}{dr} h(r) r^3 dr,
Parameters
----------
rdf: sarkas.tools.observables.RadialDistributionFunction
Radial Distribution Function object.
potential : :class:`sarkas.potentials.core.Potential`
Potential object.
Returns
-------
nkT : float
Ideal term of the pressure :math:`nk_BT`. Where :math:`n` is the total density.
u_hartree : numpy.ndarray
Hartree energy calculated from the above formula for each :math:`g_{ab}(r)`.
u_corr : numpy.ndarray
Correlational energy calculated from the above formula for each :math:`g_{ab}(r)`.
p_hartree : numpy.ndarray
Hartree pressure calculated from the above formula for each :math:`g_{ab}(r)`.
p_corr : numpy.ndarray
Correlational pressure calculated from the above formula for each :math:`g_{ab}(r)`.
"""
hartrees, corrs = rdf.compute_sum_rule_integrals(potential)
u_hartree = self.box_volume * hartrees[:, 0]
u_corr = self.box_volume * corrs[:, 0]
p_hartree = -hartrees[:, 1] / 3.0
p_corr = -corrs[:, 1] / 3.0
nkT = self.total_num_density / self.beta
return nkT, u_hartree, u_corr, p_hartree, p_corr
[docs] def parse(self, phase=None):
"""
Grab the pandas dataframe from the saved csv file.
"""
if phase:
self.phase = phase.lower()
if self.phase == "equilibration":
self.dataframe = read_csv(self.eq_energy_filename, index_col=False)
self.fldr = self.equilibration_dir
elif self.phase == "production":
self.dataframe = read_csv(self.prod_energy_filename, index_col=False)
self.fldr = self.production_dir
elif self.phase == "magnetization":
self.dataframe = read_csv(self.mag_energy_filename, index_col=False)
self.fldr = self.magnetization_dir
self.beta = 1.0 / (self.dataframe["Temperature"].mean() * self.kB)
[docs] def temp_energy_plot(
self,
process,
info_list: list = None,
phase: str = None,
show: bool = False,
publication: bool = False,
figname: str = None,
):
"""
Plot Temperature and Energy as a function of time with their cumulative sum and average.
Parameters
----------
process : sarkas.processes.Process
Sarkas Process.
info_list: list, optional
List of strings to print next to the plots.
phase: str, optional
Phase to plot. "equilibration" or "production".
show: bool, optional
Flag for displaying the figure.
publication: bool, optional
Flag for publication style plotting.
figname: str, optional
Name with which to save the plot.
"""
if phase:
phase = phase.lower()
self.phase = phase
if self.phase == "equilibration":
self.no_dumps = self.eq_no_dumps
self.dump_dir = self.eq_dump_dir
self.dump_step = self.eq_dump_step
self.saving_dir = self.equilibration_dir
self.no_steps = self.equilibration_steps
self.parse(self.phase)
self.dataframe = self.dataframe.iloc[1:, :]
elif self.phase == "production":
self.no_dumps = self.prod_no_dumps
self.dump_dir = self.prod_dump_dir
self.dump_step = self.prod_dump_step
self.saving_dir = self.production_dir
self.no_steps = self.production_steps
self.parse(self.phase)
elif self.phase == "magnetization":
self.no_dumps = self.mag_no_dumps
self.dump_dir = self.mag_dump_dir
self.dump_step = self.mag_dump_step
self.saving_dir = self.magnetization_dir
self.no_steps = self.magnetization_steps
self.parse(self.phase)
else:
self.parse()
completed_steps = self.dump_step * (self.no_dumps - 1)
fig = plt.figure(figsize=(20, 8))
fsz = 16
if publication:
plt.style.use("PUBstyle")
gs = GridSpec(3, 7)
# Temperature plots
T_delta_plot = fig.add_subplot(gs[0, 0:2])
T_main_plot = fig.add_subplot(gs[1:3, 0:2])
T_hist_plot = fig.add_subplot(gs[1:3, 2])
# Energy plots
E_delta_plot = fig.add_subplot(gs[0, 4:6])
E_main_plot = fig.add_subplot(gs[1:3, 4:6])
E_hist_plot = fig.add_subplot(gs[1:3, 6])
else:
gs = GridSpec(4, 8)
Info_plot = fig.add_subplot(gs[0:4, 0:2])
# Temperature plots
T_delta_plot = fig.add_subplot(gs[0, 2:4])
T_main_plot = fig.add_subplot(gs[1:4, 2:4])
T_hist_plot = fig.add_subplot(gs[1:4, 4])
# Energy plots
E_delta_plot = fig.add_subplot(gs[0, 5:7])
E_main_plot = fig.add_subplot(gs[1:4, 5:7])
E_hist_plot = fig.add_subplot(gs[1:4, 7])
# Grab the current rcParams so that I can restore it later
current_rcParams = plt.rcParams.copy()
# Update rcParams with the necessary values
plt.rc("font", size=fsz) # controls default text sizes
plt.rc("axes", titlesize=fsz) # fontsize of the axes title
plt.rc("axes", labelsize=fsz) # fontsize of the x and y labels
plt.rc("xtick", labelsize=fsz - 2) # fontsize of the tick labels
plt.rc("ytick", labelsize=fsz - 2) # fontsize of the tick labels
# Grab the color line list from the plt cycler. I will used this in the hist plots
color_from_cycler = plt.rcParams["axes.prop_cycle"].by_key()["color"]
# ------------------------------------------- Temperature -------------------------------------------#
# Calculate Temperature plot's labels and multipliers
time_mul, temp_mul, time_prefix, temp_prefix, time_lbl, temp_lbl = plot_labels(
self.dataframe["Time"], self.dataframe["Temperature"], "Time", "Temperature", self.units
)
# Rescale quantities
time = time_mul * self.dataframe["Time"]
Temperature = temp_mul * self.dataframe["Temperature"]
T_desired = temp_mul * self.T_desired
# Temperature moving average
T_cumavg = Temperature.expanding().mean()
# Temperature deviation and its moving average
Delta_T = (Temperature - T_desired) * 100 / T_desired
Delta_T_cum_avg = Delta_T.expanding().mean()
# Temperature Main plot
T_main_plot.plot(time, Temperature, alpha=0.7)
T_main_plot.plot(time, T_cumavg, label="Moving Average")
T_main_plot.axhline(T_desired, ls="--", c="r", alpha=0.7, label="Desired T")
T_main_plot.legend(loc="best")
T_main_plot.set(ylabel="Temperature" + temp_lbl, xlabel="Time" + time_lbl)
# Temperature Deviation plot
T_delta_plot.plot(time, Delta_T, alpha=0.5)
T_delta_plot.plot(time, Delta_T_cum_avg, alpha=0.8)
T_delta_plot.set(xticks=[], ylabel=r"Deviation [%]")
# This was a failed attempt to calculate the theoretical Temperature distribution.
# Gaussian
T_dist = scp_stats.norm(loc=T_desired, scale=Temperature.std())
# Histogram plot
sns_histplot(y=Temperature, bins="auto", stat="density", alpha=0.75, legend="False", ax=T_hist_plot)
T_hist_plot.set(ylabel=None, xlabel=None, xticks=[], yticks=[])
T_hist_plot.plot(T_dist.pdf(Temperature.sort_values()), Temperature.sort_values(), color=color_from_cycler[1])
# ------------------------------------------- Total Energy -------------------------------------------#
# Calculate Energy plot's labels and multipliers
time_mul, energy_mul, _, _, time_lbl, energy_lbl = plot_labels(
self.dataframe["Time"], self.dataframe["Total Energy"], "Time", "Energy", self.units
)
Energy = energy_mul * self.dataframe["Total Energy"]
# Total Energy moving average
E_cumavg = Energy.expanding().mean()
# Total Energy Deviation and its moving average
Delta_E = (Energy - Energy.iloc[0]) * 100 / Energy.iloc[0]
Delta_E_cum_avg = Delta_E.expanding().mean()
# Energy main plot
E_main_plot.plot(time, Energy, alpha=0.7)
E_main_plot.plot(time, E_cumavg, label="Moving Average")
E_main_plot.axhline(Energy.mean(), ls="--", c="r", alpha=0.7, label="Avg")
E_main_plot.legend(loc="best")
E_main_plot.set(ylabel="Total Energy" + energy_lbl, xlabel="Time" + time_lbl)
# Deviation Plot
E_delta_plot.plot(time, Delta_E, alpha=0.5)
E_delta_plot.plot(time, Delta_E_cum_avg, alpha=0.8)
E_delta_plot.set(xticks=[], ylabel=r"Deviation [%]")
# (Failed) Attempt to calculate the theoretical Energy distribution
# In an NVT ensemble Energy fluctuation are given by sigma(E) = sqrt( k_B T^2 C_v)
# where C_v is the isothermal heat capacity
# Since this requires a lot of prior calculation I skip it and just make a Gaussian
E_dist = scp_stats.norm(loc=Energy.mean(), scale=Energy.std())
# Histogram plot
sns_histplot(y=Energy, bins="auto", stat="density", alpha=0.75, legend="False", ax=E_hist_plot)
# Grab the second color since the first is used for histplot
E_hist_plot.plot(E_dist.pdf(Energy.sort_values()), Energy.sort_values(), color=color_from_cycler[1])
E_hist_plot.set(ylabel=None, xlabel=None, xticks=[], yticks=[])
if not publication:
dt_mul, _, _, _, dt_lbl, _ = plot_labels(
process.integrator.dt, self.dataframe["Total Energy"], "Time", "Energy", self.units
)
# Information section
Info_plot.axis([0, 10, 0, 10])
Info_plot.grid(False)
Info_plot.text(0.0, 10, "Job ID: {}".format(self.job_id))
Info_plot.text(0.0, 9.5, "Phase: {}".format(self.phase.capitalize()))
Info_plot.text(0.0, 9.0, "No. of species = {}".format(len(self.species_num)))
y_coord = 8.5
for isp, sp in enumerate(process.species):
if sp.name != "electron_background":
Info_plot.text(0.0, y_coord, "Species {} : {}".format(isp + 1, sp.name))
Info_plot.text(0.0, y_coord - 0.5, " No. of particles = {} ".format(sp.num))
Info_plot.text(
0.0, y_coord - 1.0, " Temperature = {:.2f} {}".format(temp_mul * sp.temperature, temp_lbl)
)
y_coord -= 1.5
y_coord -= 0.25
delta_t = dt_mul * process.integrator.dt
if info_list is None:
integrator_type = {
"equilibration": process.integrator.equilibration_type,
"magnetization": process.integrator.magnetization_type,
"production": process.integrator.production_type,
}
info_list = [f"Total $N$ = {process.parameters.total_num_ptcls}"]
if process.integrator.thermalization:
info_list.append(f"Thermostat: {process.integrator.thermostat_type}")
info_list.append(f" Berendsen rate = {process.integrator.thermalization_rate:.2f}")
eq_cycles = int(
process.parameters.equilibration_steps
* process.integrator.dt
* process.parameters.total_plasma_frequency
)
to_append = [
f"Equilibration cycles = {eq_cycles}",
f"Potential: {process.potential.type}",
f" Coupling Const = {process.parameters.coupling_constant:.2e}",
f" Tot Force Error = {process.potential.force_error:.2e}",
f"Integrator: {integrator_type[self.phase]}",
]
[info_list.append(info) for info in to_append]
if integrator_type[self.phase] == "langevin":
info_list.append(f"langevin gamma = {process.integrator.langevin_gamma:.4e}")
prod_cycles = int(
process.parameters.production_steps
* process.integrator.dt
* process.parameters.total_plasma_frequency
)
to_append = [
f" $\Delta t$ = {delta_t:.2f} {dt_lbl}",
# "Step interval = {}".format(self.dump_step),
# "Step interval time = {:.2f} {}".format(self.dump_step * delta_t, dt_lbl),
f"Completed steps = {completed_steps}",
f"Total steps = {self.no_steps}",
f"{100 * completed_steps / self.no_steps:.2f} % Completed",
# "Completed time = {:.2f} {}".format(completed_steps * delta_t / dt_mul * time_mul, time_lbl),
f"Production time = {self.no_steps * delta_t / dt_mul * time_mul:.2f} {time_lbl}",
f"Production cycles = {prod_cycles}",
]
[info_list.append(info) for info in to_append]
for itext, text_str in enumerate(info_list):
Info_plot.text(0.0, y_coord, text_str)
y_coord -= 0.5
Info_plot.axis("off")
if not publication:
fig.tight_layout()
# Saving
if figname:
fig.savefig(os_path_join(self.saving_dir, figname + "_" + self.job_id + ".png"))
else:
fig.savefig(os_path_join(self.saving_dir, "Plot_EnsembleCheck_" + self.job_id + ".png"))
if show:
fig.show()
# Restore the previous rcParams
plt.rcParams = current_rcParams
[docs]class VelocityAutoCorrelationFunction(Observable):
"""Velocity Auto-correlation function."""
[docs] def __init__(self):
super(VelocityAutoCorrelationFunction, self).__init__()
self.__name__ = "vacf"
self.__long_name__ = "Velocity AutoCorrelation Function"
self.acf_observable = True
[docs] @setup_doc
def setup(self, params, phase: str = None, no_slices: int = None, **kwargs):
super().setup_init(params, phase=phase, no_slices=no_slices)
self.update_args(**kwargs)
[docs] @arg_update_doc
def update_args(self, **kwargs):
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
self.update_finish()
[docs] @compute_doc
def compute(self):
t0 = self.timer.current()
self.calc_slices_data()
self.average_slices_data()
self.save_hdf()
tend = self.timer.current()
self.time_stamp("VACF Calculation", self.timer.time_division(tend - t0))
[docs] def calc_slices_data(self):
"""Calculate the vacf for each slice and add the data to the acf_slices dataframe."""
start_slice = 0
end_slice = self.slice_steps * self.dump_step
time = zeros(self.slice_steps)
for isl in range(self.no_slices):
print(f"\nCalculating vacf for slice {isl + 1}/{self.no_slices}.")
# Parse the particles from the dump files
vel = zeros((self.dimensions, self.total_num_ptcls, self.slice_steps))
#
for it, dump in enumerate(
tqdm(range(start_slice, end_slice, self.dump_step), desc="Read in data", disable=not self.verbose)
):
datap = load_from_restart(self.dump_dir, dump)
time[it] = datap["time"]
for d in range(self.dimensions):
vel[d, :, it] = datap["vel"][:, d]
#
if isl == 0:
self.dataframe["Time"] = time
self.dataframe_slices["Time"] = time
self.dataframe_acf["Time"] = time
self.dataframe_acf_slices["Time"] = time
# Return an array of shape( num_species, dim + 1, slice_steps)
vacf = calc_vacf(vel, self.species_num)
for i, sp1 in enumerate(self.species_names):
sp_vacf_str = f"{sp1} " + self.__name__.swapcase()
for d in range(self.dimensions):
self.dataframe_acf_slices[sp_vacf_str + f"_{self.dim_labels[d]}_slice {isl}"] = vacf[i, d, :]
self.dataframe_acf_slices[sp_vacf_str + f"_Total_slice {isl}"] = vacf[i, -1, :]
start_slice += self.slice_steps * self.dump_step
end_slice += self.slice_steps * self.dump_step
[docs] def average_slices_data(self):
"""Average the data from all the slices and add it to the dataframe."""
# Average the stuff
for i, sp1 in enumerate(self.species_names):
sp_vacf_str = f"{sp1} " + self.__name__.swapcase()
for d in range(self.dimensions):
dl = self.dim_labels[d]
dcol_str = [sp_vacf_str + f"_{dl}_slice {isl}" for isl in range(self.no_slices)]
self.dataframe_acf[sp_vacf_str + f"_{dl}_Mean"] = self.dataframe_acf_slices[dcol_str].mean(axis=1)
self.dataframe_acf[sp_vacf_str + f"_{dl}_Std"] = self.dataframe_acf_slices[dcol_str].std(axis=1)
tot_col_str = [sp_vacf_str + f"_Total_slice {isl}" for isl in range(self.no_slices)]
self.dataframe_acf[sp_vacf_str + "_Total_Mean"] = self.dataframe_acf_slices[tot_col_str].mean(axis=1)
self.dataframe_acf[sp_vacf_str + "_Total_Std"] = self.dataframe_acf_slices[tot_col_str].std(axis=1)
[docs] def pretty_print(self):
"""Print observable parameters for help in choice of simulation parameters."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("Data saved in: \n", self.filename_acf_hdf)
print("Data accessible at: self.dataframe_acf")
print("\nNo. of slices = {}".format(self.no_slices))
print("No. dumps per slice = {}".format(int(self.slice_steps / self.dump_step)))
print(
"Time interval of autocorrelation function = {:.4e} [s] ~ {} w_p T".format(
self.dt * self.slice_steps, int(self.dt * self.slice_steps * self.total_plasma_frequency)
)
)
[docs]class VelocityDistribution(Observable):
"""
Moments of the velocity distributions defined as
.. math::
\\langle v^{\\alpha} \\rangle = \\int_{-\\infty}^{\\infty} d v \\, f(v) v^{2 \\alpha}.
Attributes
----------
no_bins: int
Number of bins used to calculate the velocity distribution.
plots_dir: str
Directory in which to store Hermite coefficients plots.
species_plots_dirs : list, str
Directory for each species where to save Hermite coefficients plots.
max_no_moment: int
Maximum number of moments = :math:`\\alpha`. Default = 6.
"""
[docs] def __init__(self):
super(VelocityDistribution, self).__init__()
self.max_no_moment = None
self.__name__ = "vd"
self.__long_name__ = "Velocity Distribution"
[docs] def setup(
self,
params,
phase: str = None,
no_slices: int = None,
hist_kwargs: dict = None,
max_no_moment: int = None,
multi_run_average: bool = None,
dimensional_average: bool = None,
runs: int = 1,
curve_fit_kwargs: dict = None,
**kwargs,
):
"""
Assign attributes from simulation's parameters.
Parameters
----------
params : sarkas.core.Parameters
Simulation's parameters.
phase : str, optional
Phase to compute. Default = 'production'.
no_slices : int, optional
max_no_moment : int, optional
Maximum number of moments to calculate. Default = 6.
hist_kwargs : dict, optional
Dictionary of keyword arguments to pass to ``histogram`` for the calculation of the distributions.
curve_fit_kwargs: dict, optional
Dictionary of keyword arguments to pass to ``scipy.curve_fit`` for fitting of Hermite coefficients.
**kwargs :
These will overwrite any :class:`sarkas.core.Parameters`
or default :class:`sarkas.tools.observables.Observable`
attributes and/or add new ones.
"""
super().setup_init(params, phase=phase, multi_run_average=multi_run_average, runs=runs, no_slices=no_slices)
self.update_args(hist_kwargs, max_no_moment, curve_fit_kwargs, **kwargs)
[docs] @arg_update_doc
def update_args(self, hist_kwargs: dict = None, max_no_moment: int = None, curve_fit_kwargs: dict = None, **kwargs):
if curve_fit_kwargs:
self.curve_fit_kwargs = curve_fit_kwargs
# Check on hist_kwargs
if hist_kwargs:
# Is it a dictionary ?
if not isinstance(hist_kwargs, dict):
raise TypeError("hist_kwargs not a dictionary. Please pass a dictionary.")
# Did you pass a single dictionary for multispecies?
for key, value in hist_kwargs.items():
# The elements of hist_kwargs should be lists
if not isinstance(hist_kwargs[key], list):
hist_kwargs[key] = [value for i in range(self.num_species)]
self.hist_kwargs = hist_kwargs
# Default number of moments to calculate
if max_no_moment:
self.max_no_moment = max_no_moment
else:
self.max_no_moment = 6
# Update the attribute with the passed arguments
self.__dict__.update(kwargs.copy())
# Create the directory where to store the computed data
# First the name of the observable
saving_dir = os_path_join(self.postprocessing_dir, "VelocityDistribution")
if not os_path_exists(saving_dir):
mkdir(saving_dir)
# then the phase
self.saving_dir = os_path_join(saving_dir, self.phase.capitalize())
if not os_path_exists(self.saving_dir):
mkdir(self.saving_dir)
# Directories in which to store plots
self.plots_dir = os_path_join(self.saving_dir, "Plots")
if not os_path_exists(self.plots_dir):
mkdir(self.plots_dir)
# Paths where to store the dataframes
if self.multi_run_average:
self.filename_csv = os_path_join(self.saving_dir, "VelocityDistribution.csv")
self.filename_hdf = os_path_join(self.saving_dir, "VelocityDistribution.h5")
else:
self.filename_csv = os_path_join(self.saving_dir, "VelocityDistribution_" + self.job_id + ".csv")
self.filename_hdf = os_path_join(self.saving_dir, "VelocityDistribution_" + self.job_id + ".h5")
if hasattr(self, "max_no_moment"):
self.moments_dataframe = None
self.mom_df_filename_csv = os_path_join(self.saving_dir, "Moments_" + self.job_id + ".csv")
if hasattr(self, "max_hermite_order"):
self.hermite_dataframe = None
self.herm_df_filename_csv = os_path_join(self.saving_dir, "HermiteCoefficients_" + self.job_id + ".csv")
# some checks
if not hasattr(self, "hermite_rms_tol"):
self.hermite_rms_tol = 0.05
self.species_plots_dirs = None
# Need this for pretty print
# Calculate the dimension of the velocity container
# 2nd Dimension of the raw velocity array
self.dim = 1 if self.dimensional_average else self.dimensions
# range(inv_dim) for the loop over dimension
self.inv_dim = self.dimensions if self.dimensional_average else 1
self.prepare_histogram_args()
self.save_pickle()
[docs] def compute(self, compute_moments: bool = False, compute_Grad_expansion: bool = False):
"""
Calculate the moments of the velocity distributions and save them to a pandas dataframes and csv.
Parameters
----------
hist_kwargs : dict, optional
Dictionary with arguments to pass to ``numpy.histogram``.
**kwargs :
These will overwrite any :class:`sarkas.core.Parameters`
or default :class:`sarkas.tools.observables.Observable`
attributes and/or add new ones.
"""
# Print info to screen
self.pretty_print()
# Grab simulation data
time, vel_raw = self.grab_sim_data()
# Normality test
self.normality_tests(time=time, vel_data=vel_raw)
# Make the velocity distribution
self.create_distribution(vel_raw, time)
# Calculate velocity moments
if compute_moments:
self.compute_moments(parse_data=False, vel_raw=vel_raw, time=time)
#
if compute_Grad_expansion:
self.compute_hermite_expansion(compute_moments=False)
[docs] def normality_tests(self, time, vel_data):
"""
Calculate the Shapiro-Wilks test for each timestep from the raw velocity data.
Parameters
----------
time:
vel_data
Returns
-------
"""
no_dim = vel_data.shape[1]
stats_df_columns = (
"Time",
*[
"{}_{}_{}".format(sp, d, st)
for sp in self.species_names
for _, d in zip(range(no_dim), ["X", "Y", "Z"])
for st in ["s", "p"]
],
)
stats_mat = zeros((len(time), len(self.species_num) * no_dim * 2 + 1))
for it, tme in enumerate(time):
stats_mat[it, 0] = tme
for d, ds in zip(range(no_dim), ["X", "Y", "Z"]):
for sp, sp_start in enumerate(self.species_index_start[:-1]):
# Calculate the correct start and end index for storage
sp_end = self.species_index_start[sp + 1]
statcs, p_value = scp_stats.shapiro(vel_data[it, d, sp_start:sp_end])
stats_mat[it, 1 + 6 * sp + 2 * d] = statcs
stats_mat[it, 1 + 6 * sp + 2 * d + 1] = p_value
self.norm_test_df = DataFrame(stats_mat, columns=stats_df_columns)
self.norm_test_df.columns = MultiIndex.from_tuples([tuple(c.split("_")) for c in stats_df_columns])
[docs] def prepare_histogram_args(self):
# Initialize histograms arguments
if not hasattr(self, "hist_kwargs"):
self.hist_kwargs = {"density": [], "bins": [], "range": []}
# Default values
bin_width = 0.05
# Range of the histogram = (-wid * vth, wid * vth)
wid = 5
# The number of bins is calculated from default values of bin_width and wid
no_bins = int(2.0 * wid / bin_width)
# Calculate thermal speed from energy/temperature data.
try:
energy_fle = self.prod_energy_filename if self.phase == "production" else self.eq_energy_filename
energy_df = read_csv(energy_fle, index_col=False, encoding="utf-8")
if self.num_species > 1:
vth = zeros(self.num_species)
for sp, (sp_mass, sp_name) in enumerate(zip(self.species_masses, self.species_names)):
vth[sp] = sqrt(energy_df["{} Temperature".format(sp_name)].mean() * self.kB / sp_mass)
else:
vth = sqrt(energy_df["Temperature"].mean() * self.kB / self.species_masses)
except FileNotFoundError:
# In case you are using this in PreProcessing stage
vth = sqrt(self.kB * self.T_desired / self.species_masses)
self.vth = vth.copy()
# Create the default dictionary of histogram args
default_hist_kwargs = {"density": [], "bins": [], "range": []}
if self.num_species > 1:
for sp in range(self.num_species):
default_hist_kwargs["density"].append(True)
default_hist_kwargs["bins"].append(no_bins)
default_hist_kwargs["range"].append((-wid * vth[sp], wid * vth[sp]))
else:
default_hist_kwargs["density"].append(True)
default_hist_kwargs["bins"].append(no_bins)
default_hist_kwargs["range"].append((-wid * vth[0], wid * vth[0]))
# Now do some checks.
# Check for numpy.histogram args in kwargs
must_have_keys = ["bins", "range", "density"]
for k, key in enumerate(must_have_keys):
try:
# Is it empty?
if len(self.hist_kwargs[key]) == 0:
self.hist_kwargs[key] = default_hist_kwargs[key]
except KeyError:
self.hist_kwargs[key] = default_hist_kwargs[key]
# Ok at this point I have a dictionary whose elements are list.
# I want the inverse a list whose elements are dicts
self.list_hist_kwargs = []
for indx in range(self.num_species):
another_dict = {}
# Loop over the keys and grab the species value
for key, values in self.hist_kwargs.items():
another_dict[key] = values[indx]
self.list_hist_kwargs.append(another_dict)
[docs] def create_distribution(self, vel_raw: ndarray = None, time: ndarray = None):
"""
Calculate the velocity distribution of each species and save the corresponding dataframes.
Parameters
----------
vel_raw: ndarray, optional
Container of particles velocity at each time step.
time: ndarray, optional
Time array.
"""
print("\nCreating velocity distributions ...")
tinit = self.timer.current()
no_dumps = vel_raw.shape[0]
no_dim = vel_raw.shape[1]
# I want a Hierarchical dataframe
# Example:
# Ca Yb Ca Yb Ca Yb
# X X Y Y Z Z
# 1.54e-03 -2.54e+22 3.54e+00 1 2 1.54e-03 -2.54e+22 3.54e+00 1 2 1.54e-03 -2.54e+22 3.54e+00 1 2
# This has 3 rows of columns. The first identifies the species, The second the axis,
# and the third the value of the bin_edge
# This can be easily obtained from pandas dataframe MultiIndex. But first I will create a dataframe
# from a huge matrix. The columns of this dataframe will be
# Ca_X_1.54e-03 Ca_X_-2.54e+22 Ca_X_3.54e+00 Yb_X_1 Yb_X_2 Ca_Y_1.54e-03 Ca_Y_-2.54e+22 Ca_Y_3.54e+00 Yb_Y_1 ...
# using MultiIndex.from_tuples([tuple(c.split("_")) for c in df.columns]) I can create a hierarchical df.
# This is because I can access the data as
# df['Ca']['X']
# >>> 1.54e-03 -2.54e+22 3.54e+00
# >>> Time
# >>> 0 -1.694058 1.217008 -0.260678
# with the index = to my time array.
# see https://pandas.pydata.org/pandas-docs/stable/user_guide/advanced.html#reconstructing-the-level-labels
# Columns
full_df_columns = []
# At the first time step I will create the columns list.
dist_matrix = zeros((len(time), self.dim * (self.hist_kwargs["bins"].sum() + self.num_species)))
# The +1 at the end is because I decided to add a column containing the timestep
# For convenience save the bin edges somewhere else. The columns of the dataframe are string. This will give
# problems when plotting.
# Use a dictionary since the arrays could be different lengths
self.species_bin_edges = {}
for k in self.species_names:
# Initialize the sub-dictionary
self.species_bin_edges[k] = {}
for it in range(no_dumps):
for d, ds in zip(range(no_dim), ["X", "Y", "Z"]):
indx_0 = 0
for indx, (sp_name, sp_start) in enumerate(zip(self.species_names, self.species_index_start)):
# Calculate the correct start and end index for storage
sp_end = self.species_index_start[indx + 1]
bin_count, bin_edges = histogram(vel_raw[it, d, sp_start:sp_end], **self.list_hist_kwargs[indx])
# Executive decision: Center the bins
bin_loc = 0.5 * (bin_edges[:-1] + bin_edges[1:])
# Create the second_column_row the dataframe
if it == 0:
# Create the column array
full_df_columns.append(["{}_{}_Time".format(sp_name, ds)])
full_df_columns.append(["{}_{}_{:6e}".format(sp_name, ds, be) for be in bin_loc])
self.species_bin_edges[sp_name][ds] = bin_edges
# Ok. Now I have created the bins columns' names
# Time to insert in the huge matrix.
indx_1 = indx_0 + 1 + len(bin_count)
dist_matrix[it, indx_0] = time[it]
dist_matrix[it, indx_0 + 1 : indx_1] = bin_count
indx_0 = indx_1
# Alright. The matrix is filled now onto the dataframe
# First let's flatten the columns array. This is because I have something like this
# Example: Binary Mixture with 3 H bins per axis and 2 He bins per axis
# columns =[['H_X', 'H_X', 'H_X'], ['He_X', 'He_X'], ['H_Y', 'H_Y', 'H_Y'], ['He_Y', 'He_Y'] ... Z-axis]
# Flatten with list(concatenate(columns).flat)
# Now has become
# first_column_row=['H_X', 'H_X', 'H_X', 'He_X', 'He_X', 'H_Y', 'H_Y', 'H_Y', 'He_Y', 'He_Y' ... Z-axis]
# I think this is easier to understand than using nested list comprehension
# see https://stackabuse.com/python-how-to-flatten-list-of-lists/
full_df_columns = list(concatenate(full_df_columns).flat)
self.dataframe = DataFrame(dist_matrix, columns=full_df_columns)
# Save it
self.dataframe.to_csv(self.filename_csv, encoding="utf-8", index=False)
# Hierarchical DataFrame
self.hierarchical_dataframe = self.dataframe.copy()
self.hierarchical_dataframe.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.hierarchical_dataframe.columns]
)
self.hierarchical_dataframe.to_hdf(self.filename_hdf, key=self.__name__, encoding="utf-8")
tend = self.timer.current()
self.time_stamp("Velocity distribution calculation", self.timer.time_division(tend - tinit))
[docs] def compute_moments(self, parse_data: bool = False, vel_raw: ndarray = None, time: ndarray = None):
"""Calculate and save moments of the distribution.
Parameters
----------
parse_data: bool
Flag for reading data. Default = False. If False, must pass ``vel_raw`` and ``time``.
If True it will parse data from simulations dumps.
vel_raw: ndarray, optional
Container of particles velocity at each time step.
time: ndarray, optional
Time array.
"""
self.moments_dataframe = DataFrame()
self.moments_hdf_dataframe = DataFrame()
if parse_data:
time, vel_raw = self.grab_sim_data()
self.moments_dataframe["Time"] = time
print("\nCalculating velocity moments ...")
tinit = self.timer.current()
moments, ratios = calc_moments(vel_raw, self.max_no_moment, self.species_index_start)
tend = self.timer.current()
self.time_stamp("Velocity moments calculation", self.timer.time_division(tend - tinit))
# Save the dataframe
if self.dimensional_average:
for i, sp in enumerate(self.species_names):
self.moments_hdf_dataframe["{}_X_Time".format(sp)] = time
for m in range(self.max_no_moment):
self.moments_dataframe["{} {} moment".format(sp, m + 1)] = moments[i, :, :, m][:, 0]
self.moments_hdf_dataframe["{}_X_{} moment".format(sp, m + 1)] = moments[i, :, :, m][:, 0]
for m in range(self.max_no_moment):
self.moments_dataframe["{} {} moment ratio".format(sp, m + 1)] = ratios[i, :, :, m][:, 0]
self.moments_hdf_dataframe["{}_X_{}-2 ratio".format(sp, m + 1)] = ratios[i, :, :, m][:, 0]
else:
for i, sp in enumerate(self.species_names):
for d, ds in zip(range(self.dim), ["X", "Y", "Z"]):
self.moments_hdf_dataframe["{}_{}_Time".format(sp, ds)] = time
for m in range(self.max_no_moment):
self.moments_dataframe["{} {} moment axis {}".format(sp, m + 1, ds)] = moments[i, :, d, m][:, 0]
self.moments_hdf_dataframe["{}_{}_{} moment".format(sp, ds, m + 1)] = moments[i, :, :, m][:, 0]
for d, ds in zip(range(self.dim), ["X", "Y", "Z"]):
self.moments_hdf_dataframe["{}_{}_Time".format(sp, ds)] = time
for m in range(self.max_no_moment):
self.moments_dataframe["{} {} moment ratio axis {}".format(sp, m + 1, ds)] = ratios[i, :, d, m][
:, 0
]
self.moments_hdf_dataframe["{}_{}_{}-2 ratio ".format(sp, ds, m + 1)] = ratios[i, :, d, m][:, 0]
self.moments_dataframe.to_csv(self.filename_csv, index=False, encoding="utf-8")
# Hierarchical DF Save
# Make the columns
self.moments_hdf_dataframe.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.moments_hdf_dataframe.columns]
)
# Save the df in the hierarchical df with a new key/group
self.moments_hdf_dataframe.to_hdf(self.filename_hdf, mode="a", key="velocity_moments", encoding="utf-8")
[docs] def compute_hermite_expansion(self, compute_moments: bool = False):
"""
Calculate and save Hermite coefficients of the Grad expansion.
Parameters
----------
compute_moments: bool
Flag for calculating velocity moments. These are needed for the hermite calculation.
Default = True.
Notes
-----
This is still in the development stage. Do not trust the results. It requires more study in non-equilibrium
statistical mechanics.
"""
from scipy.optimize import curve_fit
self.hermite_dataframe = DataFrame()
self.hermite_hdf_dataframe = DataFrame()
if compute_moments:
self.compute_moments(parse_data=True)
if not hasattr(self, "hermite_rms_tol"):
self.hermite_rms_tol = 0.05
self.hermite_dataframe["Time"] = self.moments_dataframe["Time"].copy()
self.hermite_sigmas = zeros((self.num_species, self.dim, len(self.hermite_dataframe["Time"])))
self.hermite_epochs = zeros((self.num_species, self.dim, len(self.hermite_dataframe["Time"])))
hermite_coeff = zeros(
(self.num_species, self.dim, self.max_hermite_order + 1, len(self.hermite_dataframe["Time"]))
)
print("\nCalculating Hermite coefficients ...")
tinit = self.timer.current()
for sp, sp_name in enumerate(tqdm(self.species_names, desc="Species")):
for it, t in enumerate(tqdm(self.hermite_dataframe["Time"], desc="Time")):
# Grab the thermal speed from the moments
vrms = self.moments_dataframe["{} 2 moment".format(sp_name)].iloc[it]
# Loop over dimensions. No tensor availability yet
for d, ds in zip(range(self.dim), ["X", "Y", "Z"]):
# Grab the distribution from the hierarchical df
dist = self.hierarchical_dataframe[sp_name][ds].iloc[it, 1:]
# Grab and center the bins
v_bins = 0.5 * (self.species_bin_edges[sp_name][ds][1:] + self.species_bin_edges[sp_name][ds][:-1])
cntrl = True
j = 0
# This is a routine to calculate the hermite coefficient in the case of non-equilibrium simulations.
# In non-equilibrium we cannot define a temperature. This is because the temperature is defined from
# the rms width of a Gaussian distribution. In non-equilibrium we don't have a Gaussian, thus the
# first thing to do is to find the underlying Gaussian in our distribution. This is what this
# iterative procedure is for.
while cntrl:
# Normalize
norm = trapz(dist, x=v_bins / vrms)
# Calculate the hermite coeff
h_coeff = calculate_herm_coeff(v_bins / vrms, dist / norm, self.max_hermite_order)
# Fit the rms only to the Grad expansion. This finds the underlying Gaussian
res, _ = curve_fit(
# the lambda func is because i need to fit only rms not the h_coeff
lambda x, rms: grad_expansion(x, rms, h_coeff),
v_bins / vrms,
dist / norm,
self.curve_fit_kwargs,
)
vrms *= res[0]
if abs(1.0 - res[0]) < self.hermite_rms_tol:
cntrl = False
self.hermite_sigmas[sp, d, it] = vrms
self.hermite_epochs[sp, d, it] = j
hermite_coeff[sp, d, :, it] = h_coeff
j += 1
tend = self.timer.current()
for sp, sp_name in enumerate(self.species_names):
for d, ds in zip(range(self.dim), ["X", "Y", "Z"]):
for h in range(self.max_hermite_order):
self.hermite_dataframe["{} {} {} Hermite coeff".format(sp_name, ds, h)] = hermite_coeff[sp, d, h, :]
if h == 0:
self.hermite_hdf_dataframe["{}_{}_Time".format(sp_name, ds, h)] = hermite_coeff[sp, d, h, :]
self.hermite_hdf_dataframe["{}_{}_RMS".format(sp_name, ds, h)] = self.hermite_sigmas[sp, d, :]
self.hermite_hdf_dataframe["{}_{}_epoch".format(sp_name, ds, h)] = self.hermite_epochs[sp, d, :]
else:
self.hermite_hdf_dataframe["{}_{}_{} coeff".format(sp_name, ds, h)] = hermite_coeff[sp, d, h, :]
# Save the CSV
self.hermite_dataframe.to_csv(self.herm_df_filename_csv, index=False, encoding="utf-8")
# Make the columns
self.hermite_hdf_dataframe.columns = MultiIndex.from_tuples(
[tuple(c.split("_")) for c in self.hermite_hdf_dataframe.columns]
)
# Save the df in the hierarchical df with a new key/group
self.hermite_hdf_dataframe.to_hdf(self.filename_hdf, mode="a", key="hermite_coefficients", encoding="utf-8")
self.time_stamp("Hermite expansion calculation", self.timer.time_division(tend - tinit))
[docs] def pretty_print(self):
"""Print information in a user-friendly way."""
print("\n\n{:=^70} \n".format(" " + self.__long_name__ + " "))
print("CSV dataframe saved in:\n\t ", self.filename_csv)
print("HDF5 dataframe saved in:\n\t ", self.filename_hdf)
print("Data accessible at: self.dataframe, self.hierarchical_dataframe, self.species_bin_edges")
print("\nMulti run average: ", self.multi_run_average)
print("No. of runs: ", self.runs)
print(
"Size of the parsed velocity array: {} x {} x {}".format(
self.no_dumps, self.dim, self.runs * self.inv_dim * self.total_num_ptcls
)
)
print("\nHistograms Information:")
for sp, (sp_name, dics) in enumerate(zip(self.species_names, self.list_hist_kwargs)):
if sp == 0:
print("Species: {}".format(sp_name))
else:
print("\nSpecies: {}".format(sp_name))
print("No. of samples = {}".format(self.species_num[sp] * self.inv_dim * self.runs))
print("Thermal speed: v_th = {:.6e} ".format(self.vth[sp]), end="")
print("[cm/s]" if self.units == "cgs" else "[m/s]")
for key, values in dics.items():
if key == "range":
print(
"{} : ( {:.2f}, {:.2f} ) v_th,"
"\n\t( {:.4e}, {:.4e} ) ".format(key, *values / self.vth[sp], *values),
end="",
)
print("[cm/s]" if self.units == "cgs" else "[m/s]")
else:
print("{}: {}".format(key, values))
bin_width = abs(dics["range"][1] - dics["range"][0]) / (self.vth[sp] * dics["bins"])
print("Bin Width = {:.4f}".format(bin_width))
if hasattr(self, "max_no_moment"):
print("\nMoments Information:")
print("CSV dataframe saved in:\n\t ", self.mom_df_filename_csv)
print("Data accessible at: self.moments_dataframe, self.moments_hdf_dataframe")
print("Highest moment to calculate: {}".format(self.max_no_moment))
if hasattr(self, "max_hermite_order"):
print("\nGrad Expansion Information:")
print("CSV dataframe saved in:\n\t ", self.herm_df_filename_csv)
print("Data accessible at: self.hermite_dataframe, self.hermite_hdf_dataframe")
print("Highest order to calculate: {}".format(self.max_hermite_order))
print("RMS Tolerance: {:.3f}".format(self.hermite_rms_tol))
@njit
def calc_Sk(nkt, k_list, k_counts, species_np, no_dumps):
"""
Calculate :math:`S_{ij}(k)` at each saved timestep.
Parameters
----------
nkt : numpy.ndarray, complex
Density fluctuations of all species. Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
k_list :
List of :math:`k` indices in each direction with corresponding magnitude and index of ``k_counts``.
Shape=(``no_ka_values``, 5)
k_counts : numpy.ndarray
Number of times each :math:`k` magnitude appears.
species_np : numpy.ndarray
Array with number of particles of each species.
no_dumps : int
Number of dumps.
Returns
-------
Sk_all : numpy.ndarray
Array containing :math:`S_{ij}(k)`. Shape=(``no_Sk``, ``no_ka_values``, ``no_dumps``)
"""
no_sk = int(len(species_np) * (len(species_np) + 1) / 2)
Sk_raw = zeros((no_sk, len(k_counts), no_dumps))
pair_indx = 0
for ip, si in enumerate(species_np):
for jp in range(ip, len(species_np)):
sj = species_np[jp]
dens_const = 1.0 / sqrt(si * sj)
for it in range(no_dumps):
for ik, ka in enumerate(k_list):
indx = int(ka[-1])
nk_i = nkt[ip, it, ik]
nk_j = nkt[jp, it, ik]
Sk_raw[pair_indx, indx, it] += real(nk_i.conjugate() * nk_j) * dens_const / k_counts[indx]
pair_indx += 1
return Sk_raw
[docs]def calc_Skw(nkt, ka_list, species_np, no_dumps, dt, dump_step):
"""
Calculate the Fourier transform of the correlation function of ``nkt``.
Parameters
----------
nkt : complex, numpy.ndarray
Particles' density or velocity fluctuations.
Shape = ( ``no_species``, ``no_dumps``, ``no_k_list``)
ka_list : list
List of :math:`k` indices in each direction with corresponding magnitude and index of ``ka_counts``.
Shape=(``no_ka_values``, 5)
species_np : numpy.ndarray
Array with one element giving number of particles.
no_dumps : int
Number of dumps.
dt : float
Time interval.
dump_step : int
Snapshot interval.
Returns
-------
Skw_all : numpy.ndarray
DSF/CCF of each species and pair of species.
Shape = (``no_skw``, ``no_ka_values``, ``no_dumps``)
"""
# Fourier transform normalization: norm = dt / Total time
norm = dt / sqrt(no_dumps * dt * dump_step)
# number of independent observables
no_skw = int(len(species_np) * (len(species_np) + 1) / 2)
# DSF
# Skw = zeros((no_skw, len(ka_counts), no_dumps))
Skw_all = zeros((no_skw, len(ka_list), no_dumps))
pair_indx = 0
for ip, si in enumerate(species_np):
for jp in range(ip, len(species_np)):
sj = species_np[jp]
dens_const = 1.0 / sqrt(si * sj)
for ik, ka in enumerate(ka_list):
# indx = int(ka[-1])
nkw_i = fft(nkt[ip, :, ik]) * norm
nkw_j = fft(nkt[jp, :, ik]) * norm
Skw_all[pair_indx, ik, :] = fftshift(real(nkw_i.conjugate() * nkw_j) * dens_const)
# Skw[pair_indx, indx, :] += Skw_all[pair_indx, ik, :] / ka_counts[indx]
pair_indx += 1
return Skw_all
@njit
def calc_elec_current(vel, sp_charge, sp_num):
"""
Calculate the total electric current and electric current of each species.
Parameters
----------
vel: numpy.ndarray
Particles' velocities.
sp_charge: numpy.ndarray
Charge of each species.
sp_num: numpy.ndarray
Number of particles of each species.
Returns
-------
Js : numpy.ndarray
Electric current of each species. Shape = (``no_species``, ``no_dim``, ``no_dumps``)
Jtot : numpy.ndarray
Total electric current. Shape = (``no_dim``, ``no_dumps``)
"""
no_dumps = vel.shape[1]
no_dim = vel.shape[0]
Js = zeros((sp_num.shape[0], no_dim, no_dumps))
Jtot = zeros((no_dim, no_dumps))
sp_start = 0
sp_end = 0
for s, (q_sp, n_sp) in enumerate(zip(sp_charge, sp_num)):
# Find the index of the last particle of species s
sp_end += n_sp
# Calculate the current of each species
Js[s, :, :] = q_sp * vel[:, :, sp_start:sp_end].sum(axis=-1)
# Add to the total current
Jtot[:, :] += Js[s, :, :]
sp_start += n_sp
return Js, Jtot
[docs]def calc_moments(dist, max_moment, species_index_start):
"""
Calculate the moments of the (velocity) distribution.
Parameters
----------
dist: numpy.ndarray
Distribution of each time step. Shape = (``no_dumps``, ``dim``, ``runs * inv_dim * total_num_ptlcs``)
max_moment: int
Maximum moment to calculate
species_index_start: numpy.ndarray
Array containing the start index of each species. The last value is equivalent to dist.shape[-1]
Returns
-------
moments: numpy.ndarray
Moments of the distribution.
Shape=( ``no_species``, ``no_dumps``, ``dim``, ``max_moment`` )
ratios: numpy.ndarray
Ratios of each moments with respect to the expected Maxwellian value.
Shape=( ``no_species``, ``no_dumps``, ``no_dim``, ``max_moment - 1``)
Notes
-----
See these `equations <https://en.wikipedia.org/wiki/Normal_distribution#Moments:~:text=distribution.-,Moments>`_
"""
# from scipy.stats import moment as scp_moment
from scipy.special import gamma as scp_gamma
no_species = len(species_index_start)
no_dumps = dist.shape[0]
dim = dist.shape[1]
moments = zeros((no_species, no_dumps, dim, max_moment))
ratios = zeros((no_species, no_dumps, dim, max_moment))
for indx, sp_start in enumerate(species_index_start[:-1]):
# Calculate the correct start and end index for storage
sp_end = species_index_start[indx + 1]
for mom in range(max_moment):
moments[indx, :, :, mom] = scp_stats.moment(dist[:, :, sp_start:sp_end], moment=mom + 1, axis=2)
# sqrt( <v^2> ) = standard deviation = moments[:, :, :, 1] ** (1/2)
for mom in range(max_moment):
pwr = mom + 1
const = 2.0 ** (pwr / 2) * scp_gamma((pwr + 1) / 2) / sqrt(pi)
ratios[:, :, :, mom] = moments[:, :, :, mom] / (const * moments[:, :, :, 1] ** (pwr / 2.0))
return moments, ratios
@njit
def calc_nk(pos_data, k_list):
"""
Calculate the instantaneous microscopic density :math:`n(k)` defined as
.. math::
n_{A} ( k ) = \\sum_i^{N_A} \\exp [ -i \\mathbf k \\cdot \\mathbf r_{i} ]
Parameters
----------
pos_data : numpy.ndarray
Particles' position scaled by the box lengths.
Shape = ( ``no_dumps``, ``no_dim``, ``tot_no_ptcls``)
k_list : list
List of :math:`k` indices in each direction with corresponding magnitude and index of ``ka_counts``.
Shape=(``no_ka_values``, 5)
Returns
-------
nk : numpy.ndarray
Array containing :math:`n(k)`.
"""
nk = zeros(len(k_list), dtype=complex128)
for ik, k_vec in enumerate(k_list):
kr_i = 2.0 * pi * (k_vec[0] * pos_data[:, 0] + k_vec[1] * pos_data[:, 1] + k_vec[2] * pos_data[:, 2])
nk[ik] = (exp(-1j * kr_i)).sum()
return nk
[docs]def calc_nkt(fldr, slices, dump_step, species_np, k_list, verbose):
"""
Calculate density fluctuations :math:`n(k,t)` of all species.
.. math::
n_{A} ( k, t ) = \\sum_i^{N_A} \\exp [ -i \\mathbf k \\cdot \\mathbf r_{i}(t) ]
where :math:`N_A` is the number of particles of species :math:`A`.
Parameters
----------
fldr : str
Name of folder containing particles data.
slices : tuple, int
Initial, final step number of the slice, total number of slice steps.
dump_step : int
Snapshot interval.
species_np : numpy.ndarray
Number of particles of each species.
k_list : list
List of :math: `k` vectors.
Return
------
nkt : numpy.ndarray, complex
Density fluctuations. Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
"""
# Read particles' position for times in the slice
nkt = zeros((len(species_np), slices[2], len(k_list)), dtype=complex128)
for it, dump in enumerate(tqdm(range(slices[0], slices[1], dump_step), disable=not verbose)):
data = load_from_restart(fldr, dump)
pos = data["pos"]
sp_start = 0
sp_end = 0
for i, sp in enumerate(species_np):
sp_end += sp
nkt[i, it, :] = calc_nk(pos[sp_start:sp_end, :], k_list)
sp_start += sp
return nkt
[docs]def calc_pressure_tensor(vel, virial, species_mass, species_np, box_volume, dimensions):
"""
Calculate the pressure tensor.
Parameters
----------
pos : numpy.ndarray
Particles' positions.
vel : numpy.ndarray
Particles' velocities.
acc : numpy.ndarray
Particles' accelerations.
species_mass : numpy.ndarray
Mass of each species.
species_np : numpy.ndarray
Number of particles of each species.
box_volume : float
Volume of simulation's box.
Returns
-------
pressure : float
Scalar Pressure i.e. trace of the pressure tensor
pressure_tensor : numpy.ndarray
Pressure tensor. Shape(``no_dim``,``no_dim``)
"""
sp_start = 0
sp_end = 0
# Rescale vel of each particle by their individual mass
for sp, num in enumerate(species_np):
sp_end += num
vel[sp_start:sp_end, :] *= sqrt(species_mass[sp])
sp_start += num
pressure_kin = (vel.transpose() @ vel) / box_volume
pressure_pot = virial.sum(axis=-1) / box_volume
pressure_tensor = pressure_kin + pressure_pot
# .trace is not supported by numba. hence no njit
pressure = pressure_tensor.trace() / dimensions
return pressure, pressure_kin, pressure_pot, pressure_tensor
@njit
def calc_statistical_efficiency(observable, run_avg, run_std, max_no_divisions, no_dumps):
"""
Todo:
Parameters
----------
observable
run_avg
run_std
max_no_divisions
no_dumps
Returns
-------
"""
tau_blk = zeros(max_no_divisions)
sigma2_blk = zeros(max_no_divisions)
statistical_efficiency = zeros(max_no_divisions)
for i in range(2, max_no_divisions):
tau_blk[i] = int(no_dumps / i)
for j in range(i):
t_start = int(j * tau_blk[i])
t_end = int((j + 1) * tau_blk[i])
blk_avg = observable[t_start:t_end].mean()
sigma2_blk[i] += (blk_avg - run_avg) ** 2
sigma2_blk[i] /= i - 1
statistical_efficiency[i] = tau_blk[i] * sigma2_blk[i] / run_std**2
return tau_blk, sigma2_blk, statistical_efficiency
# @jit Numba doesn't like scipy.signal
[docs]def calc_diff_flux_acf(vel, sp_num, sp_conc, sp_mass):
"""
Calculate the diffusion fluxes and their autocorrelations functions in each direction.
Parameters
----------
vel : numpy.ndarray
Particles' velocities. Shape = (``dimensions``, ``no_dumps``, ``total_num_ptcls``)
sp_num: numpy.ndarray
Number of particles of each species.
sp_conc: numpy.ndarray
Concentration of each species.
sp_mass: numpy.ndarray
Particle's mass of each species.
Returns
-------
J_flux: numpy.ndarray
Diffusion fluxes.
Shape = ( (``num_species - 1``), ``dimensions`` , ``no_dumps``)
jr_acf: numpy.ndarray
Relative Diffusion flux autocorrelation function.
Shape = ( (``num_species - 1``) x (``num_species - 1``), ``no_dim + 1``, ``no_dumps``)
"""
no_dim = vel.shape[0]
no_dumps = vel.shape[1]
no_species = len(sp_num)
# number of independent fluxes = no_species - 1,
# number of acf of ind fluxes = no_species - 1 ^2
no_jc_acf = int((no_species - 1) * (no_species - 1))
# Current of each species in each direction and at each timestep
tot_vel = zeros((no_species, no_dim, no_dumps))
sp_start = 0
sp_end = 0
# Calculate the total center of mass velocity (tot_com_vel)
# and the center of mass velocity of each species (com_vel)
for i, ns in enumerate(sp_num):
sp_end += ns
tot_vel[i, :, :] = vel[:, :, sp_start:sp_end].sum(axis=-1)
# tot_com_vel += mass_densities[i] * com_vel[i, :, :] / tot_mass_dens
sp_start += ns
# Diffusion Fluxes
J_flux = zeros((no_species - 1, no_dim, no_dumps))
# Relative Diffusion fluxes for ACF and Transport calc
jr_flux = zeros((no_species - 1, no_dim, no_dumps))
# Relative diff flux acf
jr_acf = zeros((no_jc_acf, no_dim + 1, no_dumps))
m_bar = sp_mass @ sp_conc
# the diffusion fluxes from eq.(3.5) in Zhou J Phs Chem
for i, m_alpha in enumerate(sp_mass[:-1]):
# Flux
for j, m_beta in enumerate(sp_mass):
delta_ab = 1 * (m_beta == m_alpha)
J_flux[i, :, :] += (m_bar * delta_ab - sp_conc[i] * m_beta) * tot_vel[j, :, :]
jr_flux[i, :, :] += (delta_ab - sp_conc[i]) * tot_vel[j, :, :]
J_flux[i, :, :] *= m_alpha / m_bar
indx = 0
# Remember to change this for 3+ species
# binary_const = ( m_bar/sp_mass.prod() )**2
# Calculate the correlation function in each direction
for i, sp1_flux in enumerate(jr_flux):
for j, sp2_flux in enumerate(jr_flux):
for d in range(no_dim):
# Calculate the correlation function and add it to the array
jr_acf[indx, d, :] = correlationfunction(sp1_flux[d, :], sp2_flux[d, :])
# Calculate the total correlation function by summing the three directions
jr_acf[indx, -1, :] += jr_acf[indx, d, :]
indx += 1
return J_flux, jr_acf
# @jit Numba doesn't like Scipy
[docs]def calc_vacf(vel, sp_num):
"""
Calculate the velocity autocorrelation function of each species and in each direction.
Parameters
----------
vel : numpy.ndarray
Particles' velocities stored in a 3D array with shape = (D x Np x Nt).
D = cartesian dimensions, Np = Number of particles, Nt = number of dumps.
sp_num: numpy.ndarray
Number of particles of each species.
Returns
-------
vacf: numpy.ndarray
Velocity autocorrelation functions. Shape= (No_species, D + 1, Nt)
"""
no_dim = vel.shape[0]
no_dumps = vel.shape[2]
vacf = zeros((len(sp_num), no_dim + 1, no_dumps))
# Calculate the vacf of each species in each dimension
for d in range(no_dim):
sp_start = 0
sp_end = 0
for sp, n_sp in enumerate(sp_num):
sp_end += n_sp
# Temporary species vacf
sp_vacf = zeros(no_dumps)
# Calculate the vacf for each particle of species sp
for ptcl in range(sp_start, sp_end):
# Calculate the correlation function and add it to the array
ptcl_vacf = correlationfunction(vel[d, ptcl, :], vel[d, ptcl, :])
# Add this particle vacf to the species vacf and normalize by the time origins
sp_vacf += ptcl_vacf
# Save the species vacf for dimension i
vacf[sp, d, :] = sp_vacf / n_sp
# Save the total vacf
vacf[sp, -1, :] += sp_vacf / n_sp
# Move to the next species first particle position
sp_start += n_sp
return vacf
@njit
def calc_vk(pos_data, vel_data, k_list):
"""
Calculate the instantaneous longitudinal and transverse velocity fluctuations.
Parameters
----------
pos_data : numpy.ndarray
Particles' position. Shape = ( ``no_dumps``, 3, ``tot_no_ptcls``)
vel_data : numpy.ndarray
Particles' velocities. Shape = ( ``no_dumps``, 3, ``tot_no_ptcls``)
k_list : list
List of :math:`k` indices in each direction with corresponding magnitude and index of ``ka_counts``.
Shape=(``no_ka_values``, 5)
Returns
-------
vkt : numpy.ndarray
Array containing longitudinal velocity fluctuations.
vkt_i : numpy.ndarray
Array containing transverse velocity fluctuations in the :math:`x` direction.
vkt_j : numpy.ndarray
Array containing transverse velocity fluctuations in the :math:`y` direction.
vkt_k : numpy.ndarray
Array containing transverse velocity fluctuations in the :math:`z` direction.
"""
# Longitudinal
vk = zeros(len(k_list), dtype=complex128)
# Transverse
vk_i = zeros(len(k_list), dtype=complex128)
vk_j = zeros(len(k_list), dtype=complex128)
vk_k = zeros(len(k_list), dtype=complex128)
for ik, k_vec in enumerate(k_list):
# Calculate the dot product and cross product between k, r, and v
kr_i = 2.0 * pi * (k_vec[0] * pos_data[:, 0] + k_vec[1] * pos_data[:, 1] + k_vec[2] * pos_data[:, 2])
k_dot_v = 2.0 * pi * (k_vec[0] * vel_data[:, 0] + k_vec[1] * vel_data[:, 1] + k_vec[2] * vel_data[:, 2])
k_cross_v_i = 2.0 * pi * (k_vec[1] * vel_data[:, 2] - k_vec[2] * vel_data[:, 1])
k_cross_v_j = -2.0 * pi * (k_vec[0] * vel_data[:, 2] - k_vec[2] * vel_data[:, 0])
k_cross_v_k = 2.0 * pi * (k_vec[0] * vel_data[:, 1] - k_vec[1] * vel_data[:, 0])
# Microscopic longitudinal current
vk[ik] = (k_dot_v * exp(-1j * kr_i)).sum()
# Microscopic transverse current
vk_i[ik] = (k_cross_v_i * exp(-1j * kr_i)).sum()
vk_j[ik] = (k_cross_v_j * exp(-1j * kr_i)).sum()
vk_k[ik] = (k_cross_v_k * exp(-1j * kr_i)).sum()
return vk, vk_i, vk_j, vk_k
[docs]def calc_vkt(fldr, slices, dump_step, species_np, k_list, verbose):
r"""
Calculate the longitudinal and transverse velocities fluctuations of all species.
Longitudinal
.. math::
\lambda_A(\mathbf{k}, t) = \sum_i^{N_A} \mathbf{k} \cdot \mathbf{v}_{i}(t) \exp [ - i \mathbf{k} \cdot \mathbf{r}_{i}(t) ]
Transverse
.. math::
\\tau_A(\mathbf{k}, t) = \sum_i^{N_A} \mathbf{k} \\times \mathbf{v}_{i}(t) \exp [ - i \mathbf{k} \cdot \mathbf{r}_{i}(t) ]
where :math:`N_A` is the number of particles of species :math:`A`.
Parameters
----------
fldr : str
Name of folder containing particles data.
slices : tuple, int
Initial, final step number of the slice, number of steps per slice.
dump_step : int
Timestep interval saving.
species_np : numpy.ndarray
Number of particles of each species.
k_list : list
List of :math: `k` vectors.
Returns
-------
vkt : numpy.ndarray, complex
Longitudinal velocity fluctuations.
Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
vkt_perp_i : numpy.ndarray, complex
Transverse velocity fluctuations along the :math:`x` axis.
Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
vkt_perp_j : numpy.ndarray, complex
Transverse velocity fluctuations along the :math:`y` axis.
Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
vkt_perp_k : numpy.ndarray, complex
Transverse velocity fluctuations along the :math:`z` axis.
Shape = ( ``no_species``, ``no_dumps``, ``no_ka_values``)
"""
# Read particles' position for all times
no_dumps = slices[2]
vkt_par = zeros((len(species_np), no_dumps, len(k_list)), dtype=complex128)
vkt_perp_i = zeros((len(species_np), no_dumps, len(k_list)), dtype=complex128)
vkt_perp_j = zeros((len(species_np), no_dumps, len(k_list)), dtype=complex128)
vkt_perp_k = zeros((len(species_np), no_dumps, len(k_list)), dtype=complex128)
for it, dump in enumerate(tqdm(range(slices[0], slices[1], dump_step), disable=not verbose)):
data = load_from_restart(fldr, dump)
pos = data["pos"]
vel = data["vel"]
sp_start = 0
sp_end = 0
for i, sp in enumerate(species_np):
sp_end += sp
vkt_par[i, it, :], vkt_perp_i[i, it, :], vkt_perp_j[i, it, :], vkt_perp_k[i, it, :] = calc_vk(
pos[sp_start:sp_end, :], vel[sp_start:sp_end], k_list
)
sp_start += sp
return vkt_par, vkt_perp_i, vkt_perp_j, vkt_perp_k
[docs]def grad_expansion(x, rms, h_coeff):
"""
Calculate the Grad expansion as given by eq.(5.97) in Liboff
Parameters
----------
x : numpy.ndarray
Array of the scaled velocities
rms : float
RMS width of the Gaussian.
h_coeff: numpy.ndarray
Hermite coefficients without the division by factorial.
Returns
-------
_ : numpy.ndarray
Grad expansion.
"""
gaussian = exp(-0.5 * (x / rms) ** 2) / (sqrt(2.0 * pi * rms**2))
herm_coef = h_coeff / [factorial(i) for i in range(len(h_coeff))]
hermite_series = hermite_e.hermeval(x, herm_coef)
return gaussian * hermite_series
[docs]def calculate_herm_coeff(v, distribution, maxpower):
r"""
Calculate Hermite coefficients by integrating the velocity distribution function. That is
.. math::
a_i = \int_{-\\infty}^{\infty} dv \, He_i(v)f(v)
Parameters
----------
v : numpy.ndarray
Range of velocities.
distribution: numpy.ndarray
Velocity histogram.
maxpower: int
Hermite order
Returns
-------
coeff: numpy.ndarray
Coefficients :math:`a_i`
"""
coeff = zeros(maxpower + 1)
for i in range(maxpower + 1):
hc = zeros(1 + i)
hc[-1] = 1.0
Hp = hermite_e.hermeval(v, hc)
coeff[i] = trapz(distribution * Hp, x=v)
return coeff
[docs]def kspace_setup(box_lengths, angle_averaging, max_k_harmonics, max_aa_harmonics):
"""
Calculate all allowed :math:`k` vectors.
Parameters
----------
max_k_harmonics : numpy.ndarray
Number of harmonics in each direction.
box_lengths : numpy.ndarray
Length of each box's side.
angle_averaging : str
Flag for calculating all the possible `k` vector directions and magnitudes. Default = 'principal_axis'
max_aa_harmonics : numpy.ndarray
Maximum `k` harmonics in each direction for angle average.
Returns
-------
k_arr : list
List of all possible combination of :math:`(n_x, n_y, n_z)` with their corresponding magnitudes and indexes.
k_counts : numpy.ndarray
Number of occurrences of each triplet :math:`(n_x, n_y, n_z)` magnitude.
k_unique : numpy.ndarray
Magnitude of the unique allowed triplet :math:`(n_x, n_y, n_z)`.
"""
if angle_averaging == "full":
# The first value of k_arr = [0, 0, 0]
first_non_zero = 1
# Obtain all possible permutations of the wave number arrays
k_arr = [
array([i / box_lengths[0], j / box_lengths[1], k / box_lengths[2]])
for i in range(max_k_harmonics[0] + 1)
for j in range(max_k_harmonics[1] + 1)
for k in range(max_k_harmonics[2] + 1)
]
harmonics = [
array([i, j, k], dtype=int)
for i in range(max_k_harmonics[0] + 1)
for j in range(max_k_harmonics[1] + 1)
for k in range(max_k_harmonics[2] + 1)
]
elif angle_averaging == "principal_axis":
# The first value of k_arr = [1, 0, 0]
first_non_zero = 0
# Calculate the k vectors along the principal axis only
k_arr = [array([i / box_lengths[0], 0, 0]) for i in range(1, max_k_harmonics[0] + 1)]
harmonics = [array([i, 0, 0], dtype=int) for i in range(1, max_k_harmonics[0] + 1)]
k_arr = np_append(k_arr, [array([0, i / box_lengths[1], 0]) for i in range(1, max_k_harmonics[1] + 1)], axis=0)
harmonics = np_append(harmonics, [array([0, i, 0], dtype=int) for i in range(1, max_k_harmonics[1] + 1)], axis=0)
k_arr = np_append(k_arr, [array([0, 0, i / box_lengths[2]]) for i in range(1, max_k_harmonics[2] + 1)], axis=0)
harmonics = np_append(harmonics, [array([0, 0, i], dtype=int) for i in range(1, max_k_harmonics[2] + 1)], axis=0)
elif angle_averaging == "custom":
# The first value of k_arr = [0, 0, 0]
first_non_zero = 1
# Obtain all possible permutations of the wave number arrays up to max_aa_harmonics included
k_arr = [
array([i / box_lengths[0], j / box_lengths[1], k / box_lengths[2]])
for i in range(max_aa_harmonics[0] + 1)
for j in range(max_aa_harmonics[1] + 1)
for k in range(max_aa_harmonics[2] + 1)
]
harmonics = [
array([i, j, k], dtype=int)
for i in range(max_aa_harmonics[0] + 1)
for j in range(max_aa_harmonics[1] + 1)
for k in range(max_aa_harmonics[2] + 1)
]
# Append the rest of k values calculated from principal axis
k_arr = np_append(
k_arr,
[array([i / box_lengths[0], 0, 0]) for i in range(max_aa_harmonics[0] + 1, max_k_harmonics[0] + 1)],
axis=0,
)
harmonics = np_append(
harmonics,
[array([i, 0, 0], dtype=int) for i in range(max_aa_harmonics[0] + 1, max_k_harmonics[0] + 1)],
axis=0,
)
k_arr = np_append(
k_arr,
[array([0, i / box_lengths[1], 0]) for i in range(max_aa_harmonics[1] + 1, max_k_harmonics[1] + 1)],
axis=0,
)
harmonics = np_append(
harmonics,
[array([0, i, 0], dtype=int) for i in range(max_aa_harmonics[1] + 1, max_k_harmonics[1] + 1)],
axis=0,
)
k_arr = np_append(
k_arr,
[array([0, 0, i / box_lengths[2]]) for i in range(max_aa_harmonics[2] + 1, max_k_harmonics[2] + 1)],
axis=0,
)
harmonics = np_append(
harmonics,
[array([0, 0, i], dtype=int) for i in range(max_aa_harmonics[2] + 1, max_k_harmonics[2] + 1)],
axis=0,
)
# Compute wave number magnitude - don't use |k| (skipping first entry in k_arr)
# The round off is needed to avoid ka value different beyond a certain significant digit. It will break other parts
# of the code otherwise.
k_mag = sqrt((array(k_arr) ** 2).sum(axis=1)[..., None])
harm_mag = sqrt((array(harmonics) ** 2).sum(axis=1)[..., None])
for i, k in enumerate(k_mag[:-1]):
if abs(k - k_mag[i + 1]) < 2.0e-5:
k_mag[i + 1] = k
# Add magnitude to wave number array
k_arr = concatenate((k_arr, k_mag), 1)
# Add magnitude to wave number array
harmonics = concatenate((harmonics, harm_mag), 1)
# Sort from lowest to highest magnitude
ind = argsort(k_arr[:, -1])
k_arr = k_arr[ind]
harmonics = harmonics[ind]
# Count how many times a |k| value appears
k_unique, k_counts = unique(k_arr[first_non_zero:, -1], return_counts=True)
# Generate a 1D array containing index to be used in S array
k_index = repeat(range(len(k_counts)), k_counts)[..., None]
# Add index to k_array
k_arr = concatenate((k_arr[int(first_non_zero) :, :], k_index), 1)
harmonics = concatenate((harmonics[int(first_non_zero) :, :], k_index), 1)
return k_arr, k_counts, k_unique, harmonics
[docs]def load_from_restart(fldr, it):
"""
Load particles' data from dumps.
Parameters
----------
fldr : str
Folder containing dumps.
it : int
Timestep to load.
Returns
-------
data : dict
Particles' data.
"""
file_name = os_path_join(fldr, "checkpoint_" + str(it) + ".npz")
data = load(file_name, allow_pickle=True)
return data
[docs]def plot_labels(xdata, ydata, xlbl, ylbl, units):
"""
Create plot labels with correct units and prefixes.
Parameters
----------
xdata: numpy.ndarray
X values.
ydata: numpy.ndarray
Y values.
xlbl: str
String of the X quantity.
ylbl: str
String of the Y quantity.
units: str
'cgs' or 'mks'.
Returns
-------
xmultiplier: float
Scaling factor for X data.
ymultiplier: float
Scaling factor for Y data.
xprefix: str
Prefix for X units label
yprefix: str
Prefix for Y units label.
xlabel: str
X label units.
ylabel: str
Y label units.
"""
if isinstance(xdata, (ndarray, Series)):
xmax = xdata.max()
else:
xmax = xdata
if isinstance(ydata, (ndarray, Series)):
ymax = ydata.max()
else:
ymax = ydata
# Find the correct Units
units_dict = UNITS[1] if units == "cgs" else UNITS[0]
if units == "cgs" and xlbl == "Length":
xmax *= 1e2
if units == "cgs" and ylbl == "Length":
ymax *= 1e2
# Use scientific notation. This returns a string
x_str = format_float_scientific(xmax)
y_str = format_float_scientific(ymax)
# Grab the exponent
x_exp = 10.0 ** (float(x_str[x_str.find("e") + 1 :]))
y_exp = 10.0 ** (float(y_str[y_str.find("e") + 1 :]))
# Find the units' prefix by looping over the possible values
xprefix = "none"
xmul = -1.5
# This is a 10 multiplier
i = 1.0
while xmul < 0:
for key, value in PREFIXES.items():
ratio = i * x_exp / value
if abs(ratio - 1) < 1.0e-6:
xprefix = key
xmul = 1 / value
i /= 10
# find the prefix
yprefix = "none"
ymul = -1.5
i = 1.0
while ymul < 0:
for key, value in PREFIXES.items():
ratio = i * y_exp / value
if abs(ratio - 1) < 1.0e-6:
yprefix = key
ymul = 1 / value
i /= 10.0
if "Energy" in ylbl:
yname = "Energy"
else:
yname = ylbl
if "Pressure" in ylbl:
yname = "Pressure"
else:
yname = ylbl
if yname in units_dict:
ylabel = " [" + yprefix + units_dict[yname] + "]"
else:
ylabel = ""
if "Energy" in xlbl:
xname = "Energy"
else:
xname = xlbl
if "Pressure" in xlbl:
xname = "Pressure"
else:
xname = xlbl
if xname in units_dict:
xlabel = " [" + xprefix + units_dict[xname] + "]"
else:
xlabel = ""
return xmul, ymul, xprefix, yprefix, xlabel, ylabel
[docs]def col_mapper(keys, vals):
return dict(zip(keys, vals))