# -*- coding: utf-8 -*-
"""
This module contains all different kinds of tools to be used in the other main
modules.
"""
import numpy as np
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
from tqdm import tqdm
import multiprocessing
from copy import deepcopy
import os, json
import pickle #_pickle as pickle
import time
from collections.abc import Iterable
from collections import namedtuple
Results_OBEs_rateeqs = namedtuple("Results_OBEs_rateeqs", ["vals", "iters"])
#%%
[docs]
def save_object(obj,filename=None):
"""Save any python object as a ``.pkl`` pickle file.
This is especially suitable for saving the :class:`MoleCool.System.System`
object with all its attributes.
Parameters
----------
obj : object
The object you want to save.
filename : str, optional
the filename to save the data. The extension '.pkl' will be added for
saving the file. If no filename is provided, it is set to the attribute
`description` of the object and if the object does not have this
attribute, the filename is set to the name of the class belonging to
the object.
"""
if filename == None:
if hasattr(obj,'description'): filename = obj.description
else: filename = type(obj).__name__ # instance is set to name of its class
if type(obj).__name__ == 'System':
if 'args' in obj.__dict__:
if 'return_fun' in obj.args:
del obj.args['return_fun'] #problem when an external function is tried to be saved
if 'intensity_func_sum' in obj.lasers.__dict__:
obj.lasers.__dict__['intensity_func_sum'] = None
if 'intensity_func' in obj.lasers.__dict__:
obj.lasers.__dict__['intensity_func'] = None
with open(filename+'.pkl','wb') as output:
pickle.dump(obj,output,protocol=4)
[docs]
def open_object(filename):
"""Open or load a saved python object from a ``.pkl`` file.
Parameters
----------
filename : str
filename without the '.pkl' extension.
Returns
-------
output : Object
"""
with open(filename+'.pkl','rb') as input:
output = pickle.load(input)
return output
[docs]
def return_fun_default(system):
"""Default function defining which values are returned after an evaluation of
:meth:`~.System.System.calc_OBEs` or :meth:`~.System.System.calc_rateeqs`.
This is especially important to collect important quantities when
simulating these internal dynamics for many times configurations
using :mod:`multiprocessing` module.
Parameters
----------
system : :class:`MoleCool.System.System`
instance of System after simulating the internal dynamics.
Returns
-------
dic : dict
dictionary with important quantities, such as
- execution time ``exectime`` as ``system.exectime``
- scattered photons ``photons`` as ``system.photons``
- success message ``success`` as ``system.success``
- mean force ``F``
- mean excited state population ``Ne``
"""
NeNum = system.levels.uNum
dic = dict(
exectime = system.exectime,
photons = system.photons, #np.squeeze()
success = system.success,
)
if 'steadystate' in system.args and system.args['steadystate']:
dic.update(
F = system.F.mean(axis=-1),
Ne = system.N[-NeNum:,:].sum(axis=0).mean(),
steps = system.step+1,
)
else:
tNum = system.t.size//10
dic.update(
F = system.F[:,-tNum:].mean(axis=-1),
Ne = system.N[-NeNum:,-tNum:].sum(axis=0).mean(),
)
return dic
# return dict(system=system)
#%%
[docs]
def get_constants_dict(name=''):
"""Load the level system constants / properties from a ``.json`` file and
return it as a dictionary.
Parameters
----------
name : str, optional
filename of the json file without the `.json`. The default is ''.
Returns
-------
dict
dictionary with all constants / properties.
"""
def openjson(root_dir):
with open(os.path.join(root_dir, f"{name}.json"), "r") as read_file:
data = json.load(read_file)
return data
if name:
try:
return openjson(".")
except FileNotFoundError:
try:
script_dir = os.path.dirname(os.path.abspath(__file__)) #directory where this script is stored.
constants_dir = os.path.join(os.path.dirname(script_dir), "constants")
return openjson(constants_dir)
except FileNotFoundError:
try:
return openjson("./constants/")
except FileNotFoundError:
return openjson("./mymodules/constants/")
else:
return {}
[docs]
def make_axes_invisible(axes,xaxis=False,yaxis=False,
invisible_spines=['top','bottom','left','right']):
"""For advanced plotting: This function makes certain properties of an
matplotlib axes object invisible. By default everything of a new created
axes object is invisible.
Parameters
----------
axes : matplotlib.axes.Axes object or iterable of objects
axes for which properties should be made inivisible.
xaxis : bool, optional
If xaxis is made invisible. The default is False.
yaxis : bool, optional
If yaxis is made invisible. The default is False.
invisible_spines : list of strings, optional
spines to be made invisible. The default is ['top','bottom','left','right'].
"""
if not isinstance(axes,Iterable): axes = [axes]
for ax in axes:
ax.axes.get_xaxis().set_visible(xaxis)
ax.axes.get_yaxis().set_visible(yaxis)
for pos in invisible_spines:
ax.spines[pos].set_visible(False)
[docs]
def auto_subplots(nplots, ratio=2/1, axs=[], xlabel='', ylabel='',**subplots_kwargs):
"""
Generate rows and cols for a subplot layout
aiming for a given rows/cols ratio (default 2:1).
"""
if len(axs):
if not isinstance(axs, Iterable):
axs = [axs]
axs = np.array(axs).ravel()
if len(axs) != nplots:
raise Exception(
(f"length of given axes {len(axs)} doesn't match "
f"the number of subplots {nplots} to be generated"))
# x- and y-labels on each subplot
for ax in axs:
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
else:
# start with a square-ish grid
cols = math.ceil(math.sqrt(nplots / ratio))
rows = math.ceil(nplots / cols)
fig,axs = plt.subplots(rows,cols,**subplots_kwargs,squeeze=False)
# draw one xlabel (ylabel) on each column (row)
if xlabel:
for axs_col in axs[-1]:
axs_col.set_xlabel(xlabel)
if ylabel:
for axs_row in axs:
axs_row[0].set_ylabel(ylabel)
# Flatten axes to easily iterate
axs = np.array(axs).ravel()
# Hide unused axes (if any)
for j in range(nplots, len(axs)):
axs[j].set_visible(False)
return axs
#%%
[docs]
def multiproc(obj,kwargs):
#___problem solving with keyword arguments
for kwargs2 in kwargs['kwargs']:
kwargs[kwargs2] = kwargs['kwargs'][kwargs2]
del kwargs['self']
del kwargs['kwargs']
#no looping through magnetic field strength or direction for rateeqs so far
if obj.calcmethod == 'rateeqs': obj.Bfield.reset()
#___recursive function to loop through all iterable laser variables and append result objects
def recursive(_laser_iters,index):
if not _laser_iters:
for i,dic in enumerate(laser_list):
for key,value in dic.items():
# if kwargs['verbose']: print('Laser {}: key {} is set to {}'.format(i,key,value[index[key]]))
obj.lasers[i].__dict__[key] = value[index[key]]
#or more general here: __setattr__(self, attr_name, value)
# result_objects.append(pool.apply_async(np.sum,args=(np.arange(3),)))
if obj.calcmethod == 'OBEs':
result_objects.append(pool.apply_async(deepcopy(obj).calc_OBEs,kwds=(kwargs)))
elif obj.calcmethod == 'rateeqs':
result_objects.append(pool.apply_async(deepcopy(obj).calc_rateeqs,kwds=(kwargs)))
# print('next evaluation..')
else:
for l1 in range(laser_iters_N[ _laser_iters[0] ]):
index[_laser_iters[0]] = l1
recursive(_laser_iters[1:],index)
#___Parallelizing using Pool.apply()
pool = multiprocessing.Pool(obj.multiprocessing['processes'],
maxtasksperchild=obj.multiprocessing['maxtasksperchild']) #Init multiprocessing.Pool()
result_objects = []
# identify which parameters to loop through (including Bfield, v0, r0, laser parameters)
laser_iters_N, laser_list = obj.lasers._identify_iter_params()
iters_dict = dict(obj._identify_iter_params(), **laser_iters_N)
#___expand dimensions of v0, r0 in order to be able to loop through them
r0_arr = np.atleast_2d(obj.r0)
v0_arr = np.atleast_2d(obj.v0)
#if v0_arr and r0_arr have the same length they should be varied at the same time and not all combinations should be calculated.
if len(r0_arr) == len(v0_arr) and len(r0_arr) > 1:
del iters_dict['v0']
#___looping through all iterable parameters of system and laser
for b1,strength in enumerate(np.atleast_1d(obj.Bfield.strength)):
for b2,direction in enumerate(np.atleast_2d(obj.Bfield.direction)):
obj.Bfield.turnon(strength,direction)
for b3,r0 in enumerate(r0_arr):
obj.r0 = r0
for b4,v0 in enumerate(v0_arr):
if (len(r0_arr) == len(v0_arr)) and (b3 != b4): continue
obj.v0 = v0
recursive(list(laser_iters_N.keys()),{})
if kwargs['verbose']: print('starting calculations for iterations: {}'.format(iters_dict))
time.sleep(.5)
# print( [r.get() for r in result_objects])
# results = [list(r.get().values()) for r in result_objects]
# keys = result_objects[0].get().keys() #switch this task with the one above?
results, keys = [], []
if obj.multiprocessing['show_progressbar']:
iterator = tqdm(result_objects,smoothing=0.0)
else:
iterator = result_objects
for r in iterator:
results.append(list(r.get().values()))
keys = result_objects[0].get().keys() #switch this task with the one above?
pool.close() # Prevents any more tasks from being submitted to the pool.
pool.join() # Wait for the worker processes to exit.
out = {}
for i,key in enumerate(keys):
first_el = np.array(results[0][i])
if first_el.size == 1:
out[key] = np.squeeze(np.reshape(np.concatenate(
np.array(results,dtype=object)[:,i], axis=None), tuple(iters_dict.values())))
else:
out[key] = np.squeeze(np.reshape(np.concatenate(
np.array(results,dtype=object)[:,i], axis=None), tuple([*iters_dict.values(),*(first_el.shape)])))
return Results_OBEs_rateeqs(out, iters_dict)
#%%
[docs]
def vtoT(v,mass=157):
"""function to convert a velocity v in m/s to a temperatur in K."""
from scipy.constants import k, u
return v**2 * (mass*u)/k #v**2 * 0.5*(mass*u)/k
[docs]
def Ttov(T,mass=157):
"""function to convert a temperatur in K to a velocity v in m/s."""
from scipy.constants import k, u
return np.sqrt(k*T/(mass*u)) #np.sqrt(k*T*2/(mass*u))
[docs]
def gaussian(x, a=1.0, x0=0.0, std=1.0, y_off=0):
"""Standard Gaussian function :math:`a \exp(-0.5(x-x_0)^2/std^2)+y_{off}`."""
return a * np.exp(-0.5 * ((x - x0) / std)**2) + y_off
[docs]
def FWHM2sigma(FWHM):
"""Convert full width at half maximum (FWHM) to standard deviation
of a Gaussian (sigma)."""
return FWHM/2.3548200450309493
[docs]
def sigma2FWHM(sigma):
"""Convert standard deviation of a Gaussian (sigma) to full width
at half maximum (FWHM)."""
return sigma*2.3548200450309493
#%%
[docs]
def get_results(fname, Z_keys='F', XY_keys=[],
Z_data_fmt={}, XY_data_fmt={}, XYY_inds=[],
add_v0=False, add_flip_v=False, add_I0=False, scale_F='N'):
"""Extract :class:`Results_OBEs_rateeqs` results from an .pkl file as observables
of a high-dimensional parameter space in a certain order of parameters
and their values as numpy arrays.
Optionally, one can add e.g. zero force values, manually.
Parameters
----------
fname : str or ~System.System
filename where the instance of System is saved or the instance itself.
Z_keys : str, optional
Observable calculated in the results. The default is 'F'.
XY_keys : list of str, optional
Parameters that the results are dependent on, e.g. ``['v0','I']``.
The default is [].
Z_data_fmt : dict of func, optional
Dictionary with the observables as keys and functions as their respective
values. The argument of these functions is the individual calculated value
from the result :class:`Results_OBEs_rateeqs` object for each parameter
combination.
When the calculated value is e.g. a force array ``np.array([0,0,2.4])``,
then only the 'z' component can be exctracted using ``{'F': lambda x: x[2]}``.
The default is {}.
XY_data_fmt : dict of func, optional
Dictionary with iteration parameters as keys and functions as their
respective values. These functions take the loaded system as argument
to return the actual XY data of the iteration parameters to be plotted,
e.g. ``{'strength': lambda system: system.Bfield.strength}``.
The default is {} and includes e.g. ``{'I': lambda x: x.lasers.I_sum,
'v0': lambda x: x.v0[:,2]}``.
XYY_inds : list of inds, optional
list of indices to choose values of further parameters included in the
results dataset for more than 3 dimensions. The default is [].
add_v0 : bool, optional
Manually adding velocity v=0 and zero force. The default is True.
add_flip_v : bool, optional
add a flip velocity axis to get the full range. The default is True.
add_I0 : bool, optional
adding intensity I=0 where the force is zero. The default is True.
scale_F : str, optional
scale F to either 'N' or 'hbar*k*Gamma/2'. Do nothing if ``scale_F==''``,
The default is 'N'.
Returns
-------
Z : np.ndarray
observables or results as shape of both axes of XY_keys.
XY : dict
names and data of X axis and Y axis as str and np.ndarray
(length of both axis together equal the shape of Z).
XYY : dict
key and single values of the parameters of further parameters included
in the results dataset for more than 3 dimensions.
"""
# Format values for iterating parameters
XY_data_fmt_default = {'I': lambda x: x.lasers.I_sum,
'v0': lambda x: x.v0[:,2],
'strength': lambda x: x.Bfield.strength}
XY_data_fmt_default.update(XY_data_fmt)
XY_data_fmt = XY_data_fmt_default
# loading system instance
if isinstance(fname, str):
s4 = open_object(fname)
fname_bn= os.path.basename(fname)
print(f"File <{fname_bn}> with iteration variables: {s4.results[1]}")
else:
s4 = fname
print(f"Instance <{fname_bn}> loaded with iteration variables: {s4.results[1]}")
# iterating over multiple Z_keys:
if isinstance(Z_keys, str): Z_keys = [Z_keys]
Z_dict = dict()
for Z_key in Z_keys:
# initiating the parameters that are shown on the X and Y axes
iterinds = dict(zip(s4.results[1].keys(),range(len(s4.results[1].keys()))))
iterinds_keys = list(iterinds.keys())
if not XY_keys:
if 'v0' in iterinds_keys:
iterinds_keys.remove('v0')
XY_keys = ['v0',iterinds_keys[0]]
else:
XY_keys = iterinds_keys[:2]
else:
for XY_key in XY_keys:
if XY_key not in iterinds:
raise ValueError(f'Value <{XY_key}> not included in results from file {fname_bn}!')
if len(XY_keys) != 2:
iterinds_keys.remove(XY_keys[0])
XY_keys.append(iterinds_keys[0])
# actual X,Y axes data arrays
XY_data = []
for XY_key in XY_keys:
if XY_key not in XY_data_fmt:
raise Exception(f'XY_data_fmt must be given for XY_key {XY_key}')
XY_data.append(np.array(XY_data_fmt[XY_key](s4)))
# loading the Z data (can be 2D or high-dimensional)
if Z_key not in s4.results[0].keys():
raise ValueError("Z_key '{}' not included in results, try one of {}".format(
Z_key, list(s4.results[0].keys())))
Z = s4.results[0][Z_key]
if Z_key in Z_data_fmt:
Z = np.apply_along_axis(Z_data_fmt[Z_key], -1, Z)
if Z.ndim != len(s4.results[1]):
raise Exception((
f"The number of iteration parameters ({len(s4.results[1])}) does "
f"not match the dimension of the calculated value ({Z.ndim}). "
"Use argument <Z_data_fmt> to reduce the dimensionality."
))
Z = Z.transpose((iterinds.pop(XY_keys[0]),
iterinds.pop(XY_keys[1]),
*iterinds.values()))
# values of further parameters in dataset if Z has more than 2 dimensions
XYY = dict()
if len(iterinds) != 0: # pick single value of the parameter of higher dimensions
if not XYY_inds:
XYY_inds = [0]*len(iterinds)
elif len(XYY_inds) != len(iterinds):
raise ValueError(f'len of XYY_inds not equal to number of further dims ({len(iterinds)})')
for XYY_key, XYY_ind in zip(iterinds.keys(), XYY_inds):
XYY[XYY_key] = XY_data_fmt[XYY_key](s4)[XYY_ind]
for XYY_ind in XYY_inds[::-1]:
Z = Z[...,XYY_ind]
# add manually zero forces e.g. for zero intensity or zero velocity
if (add_flip_v or add_v0) and ('v0' in XY_keys) and Z_key in ['F','Ne']:
axis = XY_keys.index('v0')
Z = np.moveaxis(Z, axis, 0) # move old axis to zero axis
v_arr = XY_data[axis]
if add_flip_v:
v_arr = np.array([*v_arr,*(-np.flip(v_arr))])
if Z_key == 'F':
Z = np.array([*Z,*(-np.flip(Z,axis=0))])
elif Z_key == 'Ne':
Z = np.array([*Z,*(+np.flip(Z,axis=0))])
if add_v0:
v_arr = np.array([*v_arr,0])
Z = np.array([*Z, 0*Z[0]])
v_inds = np.argsort(v_arr)
Z = Z[v_inds]
Z = np.moveaxis(Z, 0, axis) # move zero axis back to old axis
XY_data[axis] = v_arr[v_inds]
if add_I0 and ('I' in XY_keys):
axis = XY_keys.index('I')
Z = np.moveaxis(Z, axis, 0) # move old axis to zero axis
I_arr = XY_data[axis]
Z = np.array([0*Z[0], *Z])
Z = np.moveaxis(Z, 0, axis) # move zero axis back to old axis
XY_data[axis] = np.array([0.,*I_arr])
if scale_F and (Z_key == 'F'):
# Further scaling of the Z data, i.e. the force to more intuitive unit
unit = s4.hbarkG2
if (scale_F == 'hbar*k*Gamma/2') and (abs(Z.max()) < 100*unit):
Z /= unit
elif (scale_F == 'N') and (abs(Z.max()) > 100*unit):
Z *= unit
else:
print('No Scaling of the force applied.')
Z_dict[Z_key] = Z
if not np.all(np.array([arr.shape for arr in Z_dict.values()])==Z_dict[Z_keys[0]].shape):
raise Exception('Shapes of generated Z data is not the same for every Z_key!')
return Z_dict, dict(zip(XY_keys,XY_data)), XYY
[docs]
def plot_results(fname, Z_keys=['F'], XY_data_fmt={}, scale_F='hbar*k*Gamma/2',
XY_labels={}, Z_labels={}, cmap='RdBu', levels = 12,
Xlim=[], Ylim=[], Zlim={}, Z_percent=['F','Ne'],
axs=[], figname='', savefig=False, **kwargs):
"""plot results for calculating one or multiple observables in a high-dimensional
parameter space.
Parameters
----------
fname : str or ~System.System
see function get_results.
Z_keys : str or list of str, optional
names of calculated observables to be plotted (see function get_results).
The default is ['F'].
XY_data_fmt : dict, optional
see function get_results. The default is {}.
XY_labels : dict, optional
dictionary to convert the names of certain parameters to other more suitable
axis labels. The default is {}.
Z_labels : dict, optional
dictionary to convert the names of the observables to other more suitable
axis labels. The default is {}.
cmap : str, optional
color map from matplotlib. The default is 'RdBu'.
levels : int, optional
number of color levels. The default is 12.
Xlim : tuple, optional
lower and upper limit. The default is [].
Ylim : tuple, optional
lower and upper limit. The default is [].
Zlim : tuple, optional
lower and upper limit. The default is {}.
Z_percent : list, optional
list with the names of observables to be plotted in percent.
The default is ['F','Ne'].
axs : list of ``matplotlib.pyplot.axis`` objects, optional
axis/axes to put the plot(s) on. The default is [].
figname : str, optional
name of the figure. The default is ''.
savefig : bool or str, optional
if True the figure is saved. If str, the figure is saved using the
provided string. The default is False.
**kwargs : keyword arguments
keyword arguments for the function get_results.
"""
# defining some default dictionaries for axes labels, and formatters for the axes data
Z_labels_default = dict(F='Force $F$ ($\hbar k \Gamma/2$)',
Ne='Ex. state population $n_e$',
exectime='Execution time (s)',
steps='Iter. steps till steady state')
XY_data_fmt_default = {'I': lambda x: x.lasers.I_sum/1000,
'strength': lambda x: x.Bfield.strength*1e4}
XY_labels_default = {'I':'Intensity $I_{tot}$ (kW/m$^2$)',
'v0':'Velocity $v$ (m/s)',
'strength':'Bfield strength (G)'}
Z_labels_default.update(Z_labels)
XY_labels_default.update(XY_labels)
XY_data_fmt_default.update(XY_data_fmt)
Z_labels = Z_labels_default
XY_labels = XY_labels_default
XY_data_fmt = XY_data_fmt_default
if isinstance(Z_keys, str): Z_keys = [Z_keys] # make sure that Z_keys is a list of str
# Initializing figure
axs = auto_subplots(len(Z_keys), ratio=3/1, axs=axs, xlabel='', ylabel='',
sharex=True, num=figname if figname else None)
# iterating over keys for Z (actual results), e.g. Force F, excited state fraction Ne
for i,(ax,Z_key) in enumerate(zip(axs,Z_keys)):
# load results and make meshgrid for plotting
Z_dict, XY, XYY = get_results(fname, Z_keys=Z_key, scale_F=scale_F,
XY_data_fmt=XY_data_fmt,**kwargs)
Z = Z_dict[Z_key]
X,Y = np.meshgrid(*XY.values())
# update axes labels for undefined values:
for XY_key in [*list(XY.keys()),*(XYY.keys())]:
if XY_key not in XY_labels:
XY_labels.update({XY_key:XY_key})
if Z_key in Z_percent:
Z *= 1e2 # in percent
# ======== PLOTTING ========
# set title
if i == 0 and len(XYY) != 0:
title = ',\n'.join([f'{XY_labels[key]}: {data:4g}'
for key,data in XYY.items()])
ax.set_title(title)
# set x, y labels
if i == len(Z_keys)-1:
ax.set_xlabel(XY_labels[list(XY.keys())[0]])
if i == len(Z_keys) //2:
ax.set_ylabel(XY_labels[list(XY.keys())[1]])
# set axes limits
if Xlim: ax.set_xlim(*Xlim)
if Ylim: ax.set_ylim(*Ylim)
vmin,vmax = None, None
if Z_key in Zlim:
vmin, vmax= Zlim[Z_key]
# draw 2D countour data and colourbar
CS = ax.contourf(X,Y,Z.T,levels=levels,cmap=cmap,vmin=vmin,vmax=vmax)#np.linspace(1.,2.,11))
CS2= ax.contour(CS, levels=[0.0], colors='k', origin='lower',linestyles='dashed')
cbar = plt.colorbar(CS, ax=ax, aspect=14, pad=0.01, shrink=0.90)
cbar.add_lines(CS2)
cbar.ax.set_ylabel(Z_labels[Z_key])
if Z_key in Z_percent: # showing Z data in percent
cbar.ax.yaxis.set_major_formatter(mtick.PercentFormatter())
# saving figure
if savefig:
if isinstance(savefig, str):
figname = savefig
elif not figname:
figname = f"{fname}_{'-'.join(Z_keys)}"
plt.savefig(figname)