Source code for sarkas.tools.transport

"""
Transport Module.
"""
from IPython import get_ipython

if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
    from tqdm import tqdm_notebook as tqdm
else:
    from tqdm import tqdm

from matplotlib.pyplot import subplots
from numpy import array, column_stack
from os import mkdir as os_mkdir
from os.path import exists as os_path_exists
from os.path import join as os_path_join
from pandas import DataFrame, MultiIndex, read_hdf

from ..utilities.maths import fast_integral_loop

# Sarkas Modules
from .observables import plot_labels, Thermodynamics


[docs]class TransportCoefficients: """ Transport Coefficients class. Parameters ---------- params : :class:`sarkas.core.Parameters` Simulation parameters. phase : str, optional Simulation's phase. `"equilibration"` or `"production"`. Default = `"production"`. no_slices : str, optional Number of slices in which the phase has been divided. Default = 1. """
[docs] def __init__(self, params, phase: str = "production", no_slices: int = 1): self.__name__ = "TC" self.__long_name__ = "Transport Coefficients" self.tc_names = ["electricalconductivity", "diffusion", "interdiffusion", "viscosities"] # Parameters copies self.postprocessing_dir = params.postprocessing_dir self.units = params.units self.job_id = params.job_id self.verbose = params.verbose self.dt = params.dt self.total_plasma_frequency = params.total_plasma_frequency self.dimensions = params.dimensions self.box_volume = params.box_volume self.pbox_volume = params.pbox_volume # self.saving_dir = None self.phase = phase self.no_slices = no_slices # Calculate the average temperature from the csv data self.kB = params.kB energy = Thermodynamics() energy.setup(params, self.phase) energy.parse() self.T_avg = energy.dataframe["Temperature"].mean() self.beta = 1.0 / (self.kB * self.T_avg) self.diffusion_df = None self.diffusion_df_slices = None self.interdiffusion_df = None self.interdiffusion_df_slices = None self.viscosity_df = None self.viscosity_df_slices = None self.conductivity_df = None self.conductivity_df_slices = None self.create_dir()
def __repr__(self): sortedDict = dict(sorted(self.__dict__.items(), key=lambda x: x[0].lower())) disp = "Transport( \n" for key, value in sortedDict.items(): disp += "\t{} : {}\n".format(key, value) disp += ")" return disp
[docs] def create_dir(self): """Create directory where to save the transport coefficients.""" saving_dir = os_path_join(self.postprocessing_dir, self.__long_name__.replace(" ", "")) if not os_path_exists(saving_dir): os_mkdir(saving_dir) self.saving_dir = os_path_join(saving_dir, self.phase.capitalize()) if not os_path_exists(self.saving_dir): os_mkdir(self.saving_dir)
[docs] def save_hdf(self, df: DataFrame = None, df_slices: DataFrame = None, tc_name: str = None): """ Save the HDF dataframes to disk in the TransportCoefficient folder. Parameters ---------- df: pandas.DataFrame() Multi-index dataframe containing the mean and std of the transport coefficient. df_slices: pandas.DataFrame() Multi-index dataframe containing the transport coefficient of each slice. tc_name: str Name of the transport coefficient. See :attr:`~.tc_names` for options. Returns ------- df: pandas.DataFrame() Sorted multi-index dataframe containing the mean and std of the transport coefficient. df_slices: pandas.DataFrame() Sorted multi-index dataframe containing the transport coefficient of each slice. """ df_slices.columns = MultiIndex.from_tuples([tuple(c.split("_")) for c in df_slices.columns]) df_slices = df_slices.sort_index() df_slices.to_hdf( os_path_join(self.saving_dir, tc_name + "_slices_" + self.job_id + ".h5"), mode="w", key=tc_name, index=False ) df.columns = MultiIndex.from_tuples([tuple(c.split("_")) for c in df.columns]) df = df.sort_index() df.to_hdf(os_path_join(self.saving_dir, tc_name + "_" + self.job_id + ".h5"), mode="w", key=tc_name, index=False) return df, df_slices
[docs] def parse(self, observable, tc_name): """Read the HDF files containing the transport coefficients. Parameters ---------- observable : :class:`sarkas.tools.observables.Observable` Observable object containing the ACF whose time integral leads to the electrical conductivity. tc_name : str Transport Coefficient name. Raises ------ ValueError Wrong ``tc_name``. """ tc_name = tc_name.lower() if tc_name in self.tc_names: if tc_name == "electricalconductivity": self.conductivity_df = read_hdf( os_path_join(self.saving_dir, tc_name + "_" + self.job_id + ".h5"), mode="r", index_col=False ) self.conductivity_df_slices = read_hdf( os_path_join(self.saving_dir, tc_name + "_slices_" + self.job_id + ".h5"), mode="r", index_col=False ) elif tc_name == "diffusion": self.diffusion_df = read_hdf( os_path_join(self.saving_dir, tc_name + "_" + self.job_id + ".h5"), mode="r", index_col=False ) self.diffusion_df_slices = read_hdf( os_path_join(self.saving_dir, tc_name + "_slices_" + self.job_id + ".h5"), mode="r", index_col=False ) elif tc_name == "interdiffusion": self.interdiffusion_df = read_hdf( os_path_join(self.saving_dir, tc_name + "_" + self.job_id + ".h5"), mode="r", index_col=False ) self.interdiffusion_df_slices = read_hdf( os_path_join(self.saving_dir, tc_name + "_slices_" + self.job_id + ".h5"), mode="r", index_col=False ) elif tc_name == "viscosities": self.viscosity_df = read_hdf( os_path_join(self.saving_dir, tc_name + "_" + self.job_id + ".h5"), mode="r", index_col=False ) self.viscosity_df_slices = read_hdf( os_path_join(self.saving_dir, tc_name + "_slices_" + self.job_id + ".h5"), mode="r", index_col=False ) else: raise ValueError("Wrong tc_name. Please choose amongst ", self.tc_names) self.phase = observable.phase self.no_slices = observable.no_slices self.slice_steps = observable.slice_steps self.dump_step = observable.dump_step # Print some info self.pretty_print(tc_name=tc_name)
[docs] def plot_tc(self, time, acf_data, tc_data, acf_name, tc_name, figname, show: bool = False): """ Make dual plots with ACF and transport coefficient. Parameters ---------- time : numpy.ndarray Time array. acf_data: numpy.ndarray Mean and Std of the ACF. \n Shape = (:attr:`sarkas.tools.observables.Observable.slice_steps`, 2). tc_data: numpy.ndarray Mean and Std of the transport coefficient. \n Shape = (:attr:`sarkas.tools.observables.Observable.slice_steps`, 2). acf_name: str y-Label of the ACF plot. tc_name: str y-label of the transport coefficient plot. figname: str Name with which to save the plot. show: bool Flag for displaying the plot if using IPython or terminal. Returns ------- fig : matplotlib.figure.Figure Figure object. (ax1, ax2, ax3, ax4) : tuple Tuple with the axes handles. \n ax1 = ACF axes, ax2 = transport coefficient axes, ax3 = ax1.twiny(), ax4 = ax2.twiny() """ # Make the plot fig, (ax1, ax2) = subplots(1, 2, figsize=(16, 7)) ax3 = ax1.twiny() ax4 = ax2.twiny() # Calculate axis multipliers and labels xmul, ymul, _, _, xlbl, ylbl = plot_labels(time, tc_data[:, 0], "Time", tc_name, self.units) # ACF ax1.plot(xmul * time, acf_data[:, 0] / acf_data[0, 0]) ax1.fill_between( xmul * time, (acf_data[:, 0] - acf_data[:, 1]) / (acf_data[0, 0] - acf_data[0, 1]), (acf_data[:, 0] + acf_data[:, 1]) / (acf_data[0, 0] + acf_data[0, 1]), alpha=0.2, ) # Coefficient ax2.plot(xmul * time, ymul * tc_data[:, 0]) ax2.fill_between( xmul * time, ymul * (tc_data[:, 0] - tc_data[:, 1]), ymul * (tc_data[:, 0] + tc_data[:, 1]), alpha=0.2 ) xlims = (xmul * time[1], xmul * time[-1] * 1.5) ax1.set(xlim=xlims, xscale="log", ylabel=acf_name, xlabel=r"Time difference" + xlbl) ax2.set(xlim=xlims, xscale="log", ylabel=tc_name + ylbl, xlabel=r"$\tau$" + xlbl) # ax1.legend(loc='best') # ax2.legend(loc='best') # Finish the index axes for axi in [ax3, ax4]: axi.grid(alpha=0.1) axi.set(xlim=(1, self.slice_steps * 1.5), xscale="log", xlabel="Index") fig.tight_layout() fig.savefig(os_path_join(self.saving_dir, figname)) if show: fig.show() return fig, (ax1, ax2, ax3, ax4)
[docs] def pretty_print(self, tc_name): """ Print to screen the location where data is stored and other relevant information. Parameters ---------- tc_name: str Name of Transport coefficient to calculate. """ print("Data saved in: \n", os_path_join(self.saving_dir, tc_name + "_" + self.job_id + ".h5")) print(os_path_join(self.saving_dir, tc_name + "_slices_" + self.job_id + ".h5")) print("\nNo. of slices = {}".format(self.no_slices)) print("No. dumps per slice = {}".format(int(self.slice_steps / self.dump_step))) print( "Time interval of autocorrelation function = {:.4e} [s] ~ {} w_p T".format( self.dt * self.slice_steps * self.dump_step, int(self.dt * self.slice_steps * self.dump_step * self.total_plasma_frequency), ) )
[docs] def electrical_conductivity(self, observable, plot: bool = True, display_plot: bool = False): """ Calculate the transport coefficient from the Green-Kubo formula .. math:: \\sigma = \\frac{\\beta}{V} \\int_0^{\\tau} dt J(t). where :math:`\\beta = 1/k_B T` and :math:`V` is the volum of the simulation box. Data is retrievable at :attr:`~.conductivity_df` and :attr:`~.conductivity_df_slices`. Parameters ---------- observable : :class:`sarkas.tools.observables.ElectricCurrent` Observable object containing the ACF whose time integral leads to the electrical conductivity. plot : bool, optional Flag for making the dual plot of the ACF and transport coefficient. \n Default = True. display_plot : bool, optional Flag for displaying the plot if using the IPython. Default = False. """ print("\n\n{:=^70} \n".format(" Electrical Conductivity ")) self.conductivity_df = DataFrame() self.conductivity_df_slices = DataFrame() # Check that the phase and no_slices is the same from the one computed in the Observable observable.parse() self.phase = observable.phase self.no_slices = observable.no_slices self.slice_steps = observable.slice_steps self.dump_step = observable.dump_step # Print some info self.pretty_print(tc_name="ElectricalConductivity") # to_numpy creates a 2d-array, hence the [:,0] time = observable.dataframe_acf["Time"].iloc[:, 0].to_numpy() self.conductivity_df["Time"] = time.copy() self.conductivity_df_slices["Time"] = time.copy() jc_str = "Electric Current ACF" sigma_str = "Electrical Conductivity" const = self.beta / observable.box_volume if not observable.magnetized: for isl in tqdm(range(observable.no_slices), disable=not observable.verbose): integrand = array(observable.dataframe_acf_slices[(jc_str, "Total", "slice {}".format(isl))]) self.conductivity_df_slices[sigma_str + "_slice {}".format(isl)] = const * fast_integral_loop( time, integrand ) col_str = [sigma_str + "_slice {}".format(isl) for isl in range(observable.no_slices)] self.conductivity_df[sigma_str + "_Mean"] = self.conductivity_df_slices[col_str].mean(axis=1) self.conductivity_df[sigma_str + "_Std"] = self.conductivity_df_slices[col_str].std(axis=1) else: for isl in tqdm(range(observable.no_slices), disable=not observable.verbose): # Parallel par_str = (jc_str, "Z", "slice {}".format(isl)) sigma_par_str = sigma_str + "_Parallel_slice {}".format(isl) integrand = observable.dataframe_acf_slices[par_str].to_numpy() self.conductivity_df_slices[sigma_par_str] = const * fast_integral_loop(time, integrand) # Perpendicular x_col_str = (jc_str, "X", "slice {}".format(isl)) y_col_str = (jc_str, "Y", "slice {}".format(isl)) perp_integrand = 0.5 * ( observable.dataframe_acf_slices[x_col_str].to_numpy() + observable.dataframe_acf_slices[y_col_str].to_numpy() ) sigma_perp_str = sigma_str + "_Perpendicular_slice {}".format(isl) self.conductivity_df_slices[sigma_perp_str] = const * fast_integral_loop(time, perp_integrand) par_col_str = [(jc_str, "Z", "slice {}".format(isl)) for isl in range(self.no_slices)] observable.dataframe_acf[(jc_str, "Parallel", "Mean")] = observable.dataframe_acf_slices[par_col_str].mean( axis=1 ) observable.dataframe_acf[(jc_str, "Parallel", "Std")] = observable.dataframe_acf_slices[par_col_str].std( axis=1 ) x_col_str = [(jc_str, "X", "slice {}".format(isl)) for isl in range(self.no_slices)] y_col_str = [(jc_str, "Y", "slice {}".format(isl)) for isl in range(self.no_slices)] perp_jc = 0.5 * ( observable.dataframe_acf_slices[x_col_str].to_numpy() + observable.dataframe_acf_slices[y_col_str].to_numpy() ) observable.dataframe_acf[(jc_str, "Perpendicular", "Mean")] = perp_jc.mean(axis=1) observable.dataframe_acf[(jc_str, "Perpendicular", "Std")] = perp_jc.std(axis=1) # Save the updated dataframe observable.save_hdf() # Average and std of transport coefficient. col_str = [sigma_str + "_Parallel_slice {}".format(isl) for isl in range(observable.no_slices)] self.conductivity_df[sigma_str + "_Parallel_Mean"] = self.conductivity_df_slices[col_str].mean(axis=1) self.conductivity_df[sigma_str + "_Parallel_Std"] = self.conductivity_df_slices[col_str].std(axis=1) # Perpendicular col_str = [sigma_str + "_Perpendicular_slice {}".format(isl) for isl in range(observable.no_slices)] self.conductivity_df[sigma_str + "_Perpendicular_Mean"] = self.conductivity_df_slices[col_str].mean(axis=1) self.conductivity_df[sigma_str + "_Perpendicular_Std"] = self.conductivity_df_slices[col_str].std(axis=1) # Endif magnetized. self.conductivity_df, self.conductivity_df_slices = self.save_hdf( df=self.conductivity_df, df_slices=self.conductivity_df_slices, tc_name="ElectricalConductivity" ) if plot or display_plot: if not observable.magnetized: acf_avg = observable.dataframe_acf[(jc_str, "Total", "Mean")].to_numpy() acf_std = observable.dataframe_acf[(jc_str, "Total", "Std")].to_numpy() tc_avg = self.conductivity_df[(sigma_str, "Mean")].to_numpy() tc_std = self.conductivity_df[(sigma_str, "Std")].to_numpy() self.plot_tc( time=time, acf_data=column_stack((acf_avg, acf_std)), tc_data=column_stack((tc_avg, tc_std)), acf_name="Electric Current ACF", tc_name="Electrical Conductivity", figname="ElectricalConductivity_Plot.png", show=display_plot, ) else: acf_avg = observable.dataframe_acf[(jc_str, "Parallel", "Mean")].to_numpy() acf_std = observable.dataframe_acf[(jc_str, "Parallel", "Std")].to_numpy() tc_avg = self.conductivity_df[(sigma_str, "Parallel", "Mean")].to_numpy() tc_std = self.conductivity_df[(sigma_str, "Parallel", "Std")].to_numpy() self.plot_tc( time=time, acf_data=column_stack((acf_avg, acf_std)), tc_data=column_stack((tc_avg, tc_std)), acf_name="Electric Current ACF Parallel", tc_name="Electrical Conductivity Parallel", figname="ElectricalConductivity_Parallel_Plot.png", show=display_plot, ) acf_avg = observable.dataframe_acf[(jc_str, "Perpendicular", "Mean")].to_numpy() acf_std = observable.dataframe_acf[(jc_str, "Perpendicular", "Std")].to_numpy() tc_avg = self.conductivity_df[(sigma_str, "Perpendicular", "Mean")].to_numpy() tc_std = self.conductivity_df[(sigma_str, "Perpendicular", "Std")].to_numpy() self.plot_tc( time=time, acf_data=column_stack((acf_avg, acf_std)), tc_data=column_stack((tc_avg, tc_std)), acf_name="Electric Current ACF Perpendicular", tc_name="Electrical Conductivity Perpendicular", figname="ElectricalConductivity_Perpendicular_Plot.png", show=display_plot, )
[docs] def diffusion( self, observable, plot: bool = True, display_plot: bool = False, ): """ Calculate the transport coefficient from the Green-Kubo formula .. math:: D_{\\alpha} = \\frac{1}{3 N_{\\alpha}} \\sum_{i}^{N_{\\alpha}} \\int_0^{\\tau} dt \\, \\langle \\mathbf v^{(\\alpha)}_{i}(t) \\cdot \\mathbf v^{(\\alpha)}_{i}(0) \\rangle. where :math:`\\mathbf v_{i}^{(\\alpha)}(t)` is the velocity of particle :math:`i` of species :math:`\\alpha`. Notice that the diffusion coefficient is averaged over all :math:`N_{\\alpha}` particles. Data is retrievable at :attr:`~.diffusion_df` and :attr:`~.diffusion_df_slices`. Parameters ---------- observable : :class:`sarkas.tools.observables.VelocityAutoCorrelationFunction` Observable object containing the ACF whose time integral leads to the self diffusion coefficient. plot : bool, optional Flag for making the dual plot of the ACF and transport coefficient. Default = True. display_plot : bool, optional Flag for displaying the plot if using the IPython. Default = False. """ print("\n\n{:=^70} \n".format(" Diffusion Coefficient ")) self.diffusion_df = DataFrame() self.diffusion_df_slices = DataFrame() # Check that the phase and no_slices is the same from the one computed in the Observable observable.parse() self.phase = observable.phase self.no_slices = observable.no_slices self.slice_steps = observable.slice_steps self.dump_step = observable.dump_step # Print some info self.pretty_print(tc_name="Diffusion") # to_numpy creates a 2d-array, hence the [:,0] time = observable.dataframe_acf["Time"].iloc[:, 0].to_numpy() self.diffusion_df["Time"] = time.copy() self.diffusion_df_slices["Time"] = time.copy() vacf_str = "VACF" const = 1.0 / self.dimensions if not observable.magnetized: # Loop over time slices for isl in tqdm(range(self.no_slices), disable=not observable.verbose): # Iterate over the number of species for i, sp in enumerate(observable.species_names): sp_vacf_str = f"{sp} " + vacf_str # Grab vacf data of each slice integrand = array(observable.dataframe_acf_slices[(sp_vacf_str, "Total", f"slice {isl}")]) df_str = f"{sp} Diffusion_slice {isl}" self.diffusion_df_slices[df_str] = const * fast_integral_loop(time=time, integrand=integrand) # Average and std of each diffusion coefficient. for isp, sp in enumerate(observable.species_names): col_str = [f"{sp} Diffusion_slice {isl}" for isl in range(observable.no_slices)] self.diffusion_df[f"{sp} Diffusion_Mean"] = self.diffusion_df_slices[col_str].mean(axis=1) self.diffusion_df[f"{sp} Diffusion_Std"] = self.diffusion_df_slices[col_str].std(axis=1) else: # Loop over time slices for isl in tqdm(range(observable.no_slices), disable=not observable.verbose): # Iterate over the number of species for i, sp in enumerate(observable.species_names): sp_vacf_str = f"{sp} " + vacf_str # Parallel par_vacf_str = (sp_vacf_str, "Z", "slice {}".format(isl)) integrand_par = observable.dataframe_acf_slices[par_vacf_str].to_numpy() par_slice_str = "{} Diffusion_Parallel_slice {}".format(sp, isl) self.diffusion_df_slices[par_slice_str] = fast_integral_loop(time=time, integrand=integrand_par) # Perpendicular x_vacf_str = (sp_vacf_str, "X", "slice {}".format(isl)) y_vacf_str = (sp_vacf_str, "Y", "slice {}".format(isl)) perp_slice_str = "{} Diffusion_Perpendicular_slice {}".format(sp, isl) integrand_perp = 0.5 * ( observable.dataframe_acf_slices[x_vacf_str].to_numpy() + observable.dataframe_acf_slices[y_vacf_str].to_numpy() ) self.diffusion_df_slices[perp_slice_str] = fast_integral_loop(time=time, integrand=integrand_perp) # Add the average and std of perp and par VACF to its dataframe for isp, sp in enumerate(observable.species_names): sp_vacf_str = "{} ".format(sp) + vacf_str sp_diff_str = "{} ".format(sp) + "Diffusion" par_col_str = [(sp_vacf_str, "Z", "slice {}".format(isl)) for isl in range(self.no_slices)] observable.dataframe_acf[(sp_vacf_str, "Parallel", "Mean")] = observable.dataframe_acf_slices[ par_col_str ].mean(axis=1) observable.dataframe_acf[(sp_vacf_str, "Parallel", "Std")] = observable.dataframe_acf_slices[ par_col_str ].std(axis=1) x_col_str = [(sp_vacf_str, "X", "slice {}".format(isl)) for isl in range(self.no_slices)] y_col_str = [(sp_vacf_str, "Y", "slice {}".format(isl)) for isl in range(self.no_slices)] perp_vacf = 0.5 * ( observable.dataframe_acf_slices[x_col_str].to_numpy() + observable.dataframe_acf_slices[y_col_str].to_numpy() ) observable.dataframe_acf[(sp_vacf_str, "Perpendicular", "Mean")] = perp_vacf.mean(axis=1) observable.dataframe_acf[(sp_vacf_str, "Perpendicular", "Std")] = perp_vacf.std(axis=1) # Average and std of each diffusion coefficient. par_col_str = [sp_diff_str + "_Parallel_slice {}".format(isl) for isl in range(self.no_slices)] perp_col_str = [sp_diff_str + "_Perpendicular_slice {}".format(isl) for isl in range(self.no_slices)] self.diffusion_df[sp_diff_str + "_Parallel_Mean"] = self.diffusion_df_slices[par_col_str].mean(axis=1) self.diffusion_df[sp_diff_str + "_Parallel_Std"] = self.diffusion_df_slices[par_col_str].std(axis=1) self.diffusion_df[sp_diff_str + "_Perpendicular_Mean"] = self.diffusion_df_slices[perp_col_str].mean( axis=1 ) self.diffusion_df[sp_diff_str + "_Perpendicular_Std"] = self.diffusion_df_slices[perp_col_str].std(axis=1) # Save the updated dataframe observable.save_hdf() # Endif magnetized. self.diffusion_df, self.diffusion_df_slices = self.save_hdf( df=self.diffusion_df, df_slices=self.diffusion_df_slices, tc_name="diffusion" ) if plot or display_plot: if observable.magnetized: for isp, sp in enumerate(observable.species_names): sp_vacf_str = f"{sp} " + vacf_str sp_diff_str = f"{sp} Diffusion" # Parallel acf_avg = observable.dataframe_acf[(sp_vacf_str, "Parallel", "Mean")].to_numpy() acf_std = observable.dataframe_acf[(sp_vacf_str, "Parallel", "Std")].to_numpy() tc_avg = self.diffusion_df[(sp_diff_str, "Parallel", "Mean")].to_numpy() tc_std = self.diffusion_df[(sp_diff_str, "Parallel", "Std")].to_numpy() self.plot_tc( time=time, acf_data=column_stack((acf_avg, acf_std)), tc_data=column_stack((tc_avg, tc_std)), acf_name=sp_vacf_str + " Parallel", tc_name=sp_diff_str + " Parallel", figname="{}_Parallel_Diffusion_Plot.png".format(sp), show=display_plot, ) # Perpendicular acf_avg = observable.dataframe_acf[(sp_vacf_str, "Perpendicular", "Mean")] acf_std = observable.dataframe_acf[(sp_vacf_str, "Perpendicular", "Std")] tc_avg = self.diffusion_df[(sp_diff_str, "Perpendicular", "Mean")] tc_std = self.diffusion_df[(sp_diff_str, "Perpendicular", "Std")] self.plot_tc( time=time, acf_data=column_stack((acf_avg, acf_std)), tc_data=column_stack((tc_avg, tc_std)), acf_name=sp_vacf_str + " Perpendicular", tc_name=sp_diff_str + " Perpendicular", figname="{}_Perpendicular_Diffusion_Plot.png".format(sp), show=display_plot, ) else: for isp, sp in enumerate(observable.species_names): sp_vacf_str = f"{sp} " + vacf_str acf_avg = observable.dataframe_acf[(sp_vacf_str, "Total", "Mean")].to_numpy() acf_std = observable.dataframe_acf[(sp_vacf_str, "Total", "Std")].to_numpy() d_str = f"{sp} Diffusion" tc_avg = self.diffusion_df[(d_str, "Mean")].to_numpy() tc_std = self.diffusion_df[(d_str, "Std")].to_numpy() self.plot_tc( time=time, acf_data=column_stack((acf_avg, acf_std)), tc_data=column_stack((tc_avg, tc_std)), acf_name=sp_vacf_str, tc_name=d_str, figname="{}_Diffusion_Plot.png".format(sp), show=display_plot, )
[docs] def interdiffusion(self, observable, plot: bool = True, display_plot: bool = False): """ Calculate the transport coefficient from the Green-Kubo formula .. math:: D_{\\alpha} = \\frac{1}{3Nx_1x_2} \\int_0^\\tau dt \\langle \\mathbf {J}_{\\alpha}(0) \\cdot \\mathbf {J}_{\\alpha}(t) \\rangle, where :math:`x_{1,2}` are the concentration of the two species and :math:`\\mathbf {J}_{\\alpha}(t)` is the diffusion current calculated by the :class:`sarkas.tools.observables.DiffusionFlux` class. Data is retrievable at :attr:`~.interdiffusion_df` and :attr:`~.interdiffusion_df_slices`. Parameters ---------- observable : :class:`sarkas.tools.observables.DiffusionFlux` Observable object containing the ACF whose time integral leads to the interdiffusion coefficient. plot : bool, optional Flag for making the dual plot of the ACF and transport coefficient. Default = True. display_plot : bool, optional Flag for displaying the plot if using the IPython. Default = False """ print("\n\n{:=^70} \n".format(" Interdiffusion Coefficient ")) self.interdiffusion_df = DataFrame() self.interdiffusion_df_slices = DataFrame() # Check that the phase and no_slices is the same from the one computed in the Observable observable.parse() self.phase = observable.phase self.no_slices = observable.no_slices self.slice_steps = observable.slice_steps self.dump_step = observable.dump_step # Print some info self.pretty_print(tc_name="Interdiffusion") # to_numpy creates a 2d-array, hence the [:,0] time = observable.dataframe_acf["Time"].to_numpy()[:, 0] self.interdiffusion_df["Time"] = time.copy() self.interdiffusion_df_slices["Time"] = time.copy() no_fluxes_acf = observable.no_fluxes_acf # Normalization constant const = 1.0 / (3.0 * observable.total_num_ptcls * observable.species_concentrations.prod()) df_str = "Diffusion Flux ACF" id_str = "Inter Diffusion Flux" for isl in tqdm(range(self.no_slices), disable=not self.verbose): # D_ij = zeros((no_fluxes_acf, jc_acf.slice_steps)) for ij in range(no_fluxes_acf): acf_df_str = (df_str + " {}".format(ij), "Total", "slice {}".format(isl)) integrand = observable.dataframe_acf_slices[acf_df_str].to_numpy() id_df_str = id_str + " {}_slice {}".format(ij, isl) self.interdiffusion_df_slices[id_df_str] = const * fast_integral_loop(time=time, integrand=integrand) # Average and Std of slices for ij in range(no_fluxes_acf): col_str = [id_str + " {}_slice {}".format(ij, isl) for isl in range(self.no_slices)] self.interdiffusion_df[id_str + " {}_Mean".format(ij)] = self.interdiffusion_df_slices[col_str].mean(axis=1) self.interdiffusion_df[id_str + " {}_Std".format(ij)] = self.interdiffusion_df_slices[col_str].std(axis=1) self.interdiffusion_df, self.interdiffusion_df_slices = self.save_hdf( df=self.interdiffusion_df, df_slices=self.interdiffusion_df_slices, tc_name="Interdiffusion" ) if plot or display_plot: for flux in range(observable.no_fluxes_acf): flux_str = df_str + " {}".format(flux) acf_avg = observable.dataframe_acf[(flux_str, "Total", "Mean")].to_numpy() acf_std = observable.dataframe_acf[(flux_str, "Total", "Std")].to_numpy() d_str = id_str + " {}".format(flux) tc_avg = self.interdiffusion_df[(d_str, "Mean")].to_numpy() tc_std = self.interdiffusion_df[(d_str, "Std")].to_numpy() self.plot_tc( time=time, acf_data=column_stack((acf_avg, acf_std)), tc_data=column_stack((tc_avg, tc_std)), acf_name=flux_str, tc_name=d_str, figname="InterDiffusion_{}_Plot.png".format(flux), show=display_plot, )
[docs] def viscosity(self, observable, plot: bool = True, display_plot: bool = False): """ Calculate the transport coefficient from the Green-Kubo formula. The shear viscosity is obtained from .. math:: \\eta = \\frac{\\beta V}{6} \\sum_{\\alpha} \\sum_{\\gamma \\neq \\alpha} \\int_0^{\\tau} dt \\, \\left \\langle \\mathcal P_{\\alpha\\gamma}(t) \\mathcal P_{\\alpha\\gamma}(0) \\right \\rangle where :math:`\\beta = 1/k_B T`, :math:`\\alpha,\\gamma = {x, y, z}` and :math:`\\mathcal P_{\\alpha\\gamma}(t)` is the element of the Pressure Tensor calculated with :class:`sarkas.tools.observables.PressureTensor`. The bulk viscosity is obtained from .. math:: \\eta_V = \\beta V \\int_0^{\\tau}dt \\, \\left \\langle \\delta \\mathcal P(t) \\delta \\mathcal P(0) \\right \\rangle, where .. math:: \\delta \\mathcal P(t) = \\mathcal P(t) - \\left \\langle \\mathcal P \\right \\rangle is the deviation of the scalar pressure. Parameters ---------- observable : :class:`sarkas.tools.observables.PressureTensor` Observable object containing the ACF whose time integral leads to the viscsosity coefficients. plot : bool, optional Flag for making the dual plot of the ACF and transport coefficient. Default = True. display_plot : bool, optional Flag for displaying the plot if using the IPython. Default = False """ print("\n\n{:=^70} \n".format(" Viscosity Coefficient ")) self.viscosity_df = DataFrame() self.viscosity_df_slices = DataFrame() # Check that the phase and no_slices is the same from the one computed in the Observable observable.parse() self.phase = observable.phase self.no_slices = observable.no_slices self.slice_steps = observable.slice_steps self.dump_step = observable.dump_step # Print some info self.pretty_print(tc_name="Viscosity") # to_numpy creates a 2d-array, hence the [:,0] time = observable.dataframe_acf["Time"].iloc[:, 0].to_numpy() self.viscosity_df["Time"] = time.copy() self.viscosity_df_slices["Time"] = time.copy() dim_lbl = ["x", "y", "z"] pt_str_list = [ "Pressure Tensor Kinetic ACF", "Pressure Tensor Potential ACF", "Pressure Tensor Kin-Pot ACF", "Pressure Tensor Pot-Kin ACF", "Pressure Tensor ACF", ] eta_str_list = [ "Shear Viscosity Tensor Kinetic", "Shear Viscosity Tensor Potential", "Shear Viscosity Tensor Kin-Pot", "Shear Viscosity Tensor Pot-Kin", "Shear Viscosity Tensor Total", ] start_steps = 0 end_steps = 0 for isl in tqdm(range(self.no_slices), disable=not observable.verbose): end_steps += observable.slice_steps const = observable.box_volume * self.beta # Calculate Bulk Viscosity # It is calculated from the fluctuations of the pressure eq. 2.124a Allen & Tilsdeley integrand = observable.dataframe_acf_slices[("Delta Pressure ACF", "slice {}".format(isl))].to_numpy() self.viscosity_df_slices["Bulk Viscosity_slice {}".format(isl)] = const * fast_integral_loop(time, integrand) # Calculate the Shear Viscosity Elements for _, ax1 in enumerate(dim_lbl): for _, ax2 in enumerate(dim_lbl): for _, (pt_str, eta_str) in enumerate(zip(pt_str_list, eta_str_list)): pt_str_temp = (pt_str + " {}{}{}{}".format(ax1, ax2, ax1, ax2), "slice {}".format(isl)) integrand = observable.dataframe_acf_slices[pt_str_temp].to_numpy() eta_str_temp = eta_str + " {}{}_slice {}".format(ax1, ax2, isl) self.viscosity_df_slices[eta_str_temp] = const * fast_integral_loop(time, integrand) start_steps += observable.slice_steps # Now average the slices col_str = ["Bulk Viscosity_slice {}".format(isl) for isl in range(observable.no_slices)] self.viscosity_df["Bulk Viscosity_Mean"] = self.viscosity_df_slices[col_str].mean(axis=1) self.viscosity_df["Bulk Viscosity_Std"] = self.viscosity_df_slices[col_str].std(axis=1) for _, ax1 in enumerate(dim_lbl): for _, ax2 in enumerate(dim_lbl): for _, eta_str in enumerate(eta_str_list): col_str = [eta_str + " {}{}_slice {}".format(ax1, ax2, isl) for isl in range(observable.no_slices)] self.viscosity_df[eta_str + " {}{}_Mean".format(ax1, ax2)] = self.viscosity_df_slices[col_str].mean( axis=1 ) self.viscosity_df[eta_str + " {}{}_Std".format(ax1, ax2)] = self.viscosity_df_slices[col_str].std( axis=1 ) list_coord = ["xy", "xz", "yx", "yz", "zx", "zy"] col_str = [eta_str + " {}_Mean".format(coord) for coord in list_coord] self.viscosity_df["Shear Viscosity_Mean"] = self.viscosity_df[col_str].mean(axis=1) self.viscosity_df["Shear Viscosity_Std"] = self.viscosity_df[col_str].std(axis=1) self.viscosity_df, self.viscosity_df_slices = self.save_hdf( df=self.viscosity_df, df_slices=self.viscosity_df_slices, tc_name="Viscosities" ) plot_quantities: list = ["Bulk Viscosity", "Shear Viscosity"] shear_list_coord = ["xyxy", "xzxz", "yxyx", "yzyz", "zxzx", "zyzy"] if plot or display_plot: # Make the plot for ipq, pq in enumerate(plot_quantities): if pq == "Bulk Viscosity": acf_str = "Delta Pressure ACF" acf_avg = observable.dataframe_acf[("Delta Pressure ACF", "Mean")] acf_std = observable.dataframe_acf[("Delta Pressure ACF", "Std")] elif pq == "Shear Viscosity": # The axis are the last two elements in the string acf_strs = [("Pressure Tensor ACF {}".format(coord), "Mean") for coord in shear_list_coord] acf_avg = observable.dataframe_acf[acf_strs].mean(axis=1) acf_std = observable.dataframe_acf[acf_strs].std(axis=1) tc_avg = self.viscosity_df[(pq, "Mean")] tc_std = self.viscosity_df[(pq, "Std")] self.plot_tc( time=time, acf_data=column_stack((acf_avg, acf_std)), tc_data=column_stack((tc_avg, tc_std)), acf_name=acf_str, tc_name=pq, figname="{}_Plot.png".format(pq), show=display_plot, )