"""Utils for SPOD method."""
# Import standard Python packages
import os
import sys
import math
import time
import yaml
import psutil
import warnings
import numpy as np
import scipy.io.matlab as siom
# Import custom Python packages
import pyspod.utils.parallel as utils_par
import pyspod.utils.postproc as post
CWD = os.getcwd()
B2GB = 9.313225746154785e-10
[docs]
def check_orthogonality(results_dir, mode_idx1, mode_idx2,
freq_idx, dtype='double', savedir=None, comm=None):
'''
Check orthogonality of SPOD modes.
:param str results_dir: path to results folder where to find SPOD modes.
:param int mode_idx1: id first mode used for comparison.
:param int mode_idx2: id second mode used for comparison.
:param int freq_idx: frequency id to be used.
:param str dtype: datatype to be used. Default is 'double'.
:param str savedir: path where to save the data.
:param MPI.Comm comm: MPI communicator.
:return: orthogonality check and value.
:rtype: bool, float
'''
s0 = time.time()
st = time.time()
utils_par.pr0(f'\nComputing orthogonality check', comm)
utils_par.pr0(f'-------------------------------', comm)
rank, size = _configure_parallel(comm=comm)
## get dtypes and required data and params
dt_float, dt_complex = _get_dtype(dtype)
params, eigs_freq, weights = _get_required_data(results_dir)
xdim = params['n_space_dims']
nv = params['n_variables']
freq = eigs_freq['freq']
n_freq = params['n_freq']
n_modes_save = params['n_modes_save']
weights = _set_dtype(weights, dtype)
## distribute weights and reshape
weights, max_axis, _ = utils_par.distribute(data=weights, comm=comm)
xsize = weights.size
weights = np.reshape(weights, [xsize, 1])
## get modes
phir = _get_modes(results_dir, n_freq,
freq_idx, max_axis, xsize, n_modes_save, dt_complex, comm)
phir1 = phir[:,mode_idx1]
phir2 = phir[:,mode_idx2]
del phir
utils_par.pr0(f'- retrieved modes: {time.time() - st} s.', comm)
st = time.time()
## perform orthogonality check
O = phir1.conj().T @ (weights * phir2)
O = utils_par.allreduce(data=O, comm=comm)
tol = 2e-6
if mode_idx1 == mode_idx2:
ortho_check = ((O < 1+tol) and (O>1-tol))
else:
ortho_check = ((O < 0+tol) and (O>0-tol))
utils_par.barrier(comm)
return ortho_check, O
[docs]
def compute_coeffs_op(
data, results_dir, modes_idx=None, freq_idx=None, T_lb=None, T_ub=None,
tol=1e-10, svd=False, savedir=None, dtype='double', comm=None):
'''
Compute coefficients through oblique projection.
:param numpy.ndarray data: data.
:param str results_dir: path to results folder.
:param list mode_idx: ids modes used for building coefficients.
Default is None.
:param list freq_idx: frequency ids to be used. Default is None.
:param float T_lb: lower bound period. Default is None.
:param float T_ub: upper bound period. Default is None.
:param float tol: tolerance for pseudoinverse. Default is 1e-10.
:param float svd: whether to use svd pseudoinverse. Default is None.
:param str savedir: path where to save the data. Default is None.
:param str dtype: datatype to be used. Default is 'double'.
:param MPI.Comm comm: MPI communicator. Default is None.
:return: where the file with the coefficients is saved,
and associated folder.
:rtype: str, str
'''
s0 = time.time()
st = time.time()
utils_par.pr0(f'\nComputing coefficients' , comm)
utils_par.pr0(f'------------------------------', comm)
rank, size = _configure_parallel(comm=comm)
## get dtypes and required data and params
dt_float, dt_complex = _get_dtype(dtype)
params, eigs_freq, weights = _get_required_data(results_dir)
nt = data.shape[0]
xdim = params['n_space_dims']
nv = params['n_variables']
freq = eigs_freq['freq']
n_freq = params['n_freq']
n_modes_save = params['n_modes_save']
data = _set_dtype(data, dtype)
weights = _set_dtype(weights, dtype)
## initialize frequencies
if ((T_lb is None) or (T_ub is None)) and (freq_idx is None):
f_idx_lb = 0
f_idx_ub = n_freq - 1
f_lb = freq[f_idx_lb]
f_ub = freq[f_idx_ub]
else:
f_lb, f_idx_lb = post.find_nearest_freq(freq_req=1/T_ub, freq=freq)
f_ub, f_idx_ub = post.find_nearest_freq(freq_req=1/T_lb, freq=freq)
n_freq_r = f_idx_ub - f_idx_lb + 1
if freq_idx is None:
freq_idx = np.arange(f_idx_lb, f_idx_ub + 1)
utils_par.pr0(f'- identified frequencies: {time.time() - st} s.', comm)
st = time.time()
## initialize coeffs matrix
shape_tmp = (n_freq_r*n_modes_save, nt)
coeffs = np.zeros(shape_tmp, dtype=dt_complex)
## distribute data and weights if parallel
data, max_axis, _ = utils_par.distribute_data(data, comm)
weights = utils_par.distribute_dimension(weights, max_axis, comm)
## add axis for single variable
if not isinstance(data,np.ndarray): data = data.values
if (nv == 1) and (data.ndim != xdim + 2):
data = data[...,np.newaxis]
xsize = weights.size
xshape_nv = data[0,...].shape
## flatten spatial x variable dimensions
data = np.reshape(data, [nt, xsize])
weights = np.reshape(weights, [xsize, 1])
## compute time mean and subtract from data (reuse the one from fit?)
lt_mean = np.mean(data, axis=0); data = data - lt_mean
utils_par.pr0(f'- data and time mean: {time.time() - st} s.', comm)
st = time.time()
## get modes
phir = _get_modes(results_dir, n_freq_r,
freq_idx, max_axis, xsize, n_modes_save, dt_complex, comm)
utils_par.pr0(f'- retrieved modes: {time.time() - st} s.', comm)
st = time.time()
## create coeffs folder
coeffs_dir = os.path.join(results_dir, f'coeffs_{f_idx_lb}_{f_idx_ub}')
if savedir is not None: coeffs_dir = os.path.join(coeffs_dir, savedir)
if rank == 0:
if not os.path.exists(coeffs_dir): os.makedirs(coeffs_dir)
utils_par.barrier(comm)
# evaluate the coefficients by oblique projection
coeffs = _oblique_projection(
phir, weights, data, tol=tol, svd=svd, dtype=dtype, comm=comm)
utils_par.pr0(f'- oblique projection done: {time.time() - st} s.', comm)
st = time.time()
utils_par.barrier(comm)
del data, weights
## save auxiliary files
file_phir = os.path.join(coeffs_dir, 'modes_r.npy')
file_lt_mean = os.path.join(coeffs_dir, 'ltm.npy')
shape_tmp = (*xshape_nv,n_freq_r*n_modes_save)
shape_phir = [*shape_tmp]
shape_lt_mean = [*xshape_nv]
if comm:
shape_phir[max_axis] = -1
shape_lt_mean[max_axis] = -1
phir.shape = shape_tmp
lt_mean.shape = xshape_nv
utils_par.npy_save(comm, file_phir, phir, axis=max_axis)
utils_par.npy_save(comm, file_lt_mean, lt_mean, axis=max_axis)
utils_par.barrier(comm)
del lt_mean, phir
# save coefficients
file_coeffs = os.path.join(coeffs_dir, 'coeffs.npy')
if rank == 0: np.save(file_coeffs, coeffs)
utils_par.barrier(comm)
del coeffs
## dump file with coeffs params
params['coeffs_dir' ] = str(coeffs_dir)
params['modes_idx' ] = modes_idx
if T_lb is not None: params['T_lb'] = float(T_lb)
if T_ub is not None: params['T_ub'] = float(T_ub)
params['n_freq_r' ] = int(n_freq_r)
params['freq_lb' ] = float(f_lb)
params['freq_ub' ] = float(f_ub)
params['freq_idx_lb'] = int(f_idx_lb)
params['freq_idx_ub'] = int(f_idx_ub)
params['max_axis' ] = int(max_axis)
path_params_coeffs = os.path.join(coeffs_dir, 'params_coeffs.yaml')
with open(path_params_coeffs, 'w') as f: yaml.dump(params, f)
utils_par.pr0(f'- saving completed: {time.time() - st} s.' , comm)
utils_par.pr0(f'-----------------------------------------' , comm)
utils_par.pr0(f'Coefficients saved in folder: {file_coeffs}', comm)
utils_par.pr0(f'Elapsed time: {time.time() - s0} s.' , comm)
utils_par.barrier(comm)
return file_coeffs, coeffs_dir
[docs]
def compute_coeffs_conv(
data, results_dir, modes_idx=None, freq_idx=None, T_lb=None, T_ub=None,
tol=1e-10, svd=False, savedir=None, dtype='double', comm=None):
'''
Continuously-discrete temporal expansion coefficients of SPOD
modes via convolution.
:param numpy.ndarray data: data.
:param str results_dir: path to results folder.
:param list mode_idx: ids modes used for building coefficients.
Default is None.
:param list freq_idx: frequency ids to be used. Default is None.
:param float T_lb: lower bound period. Default is None.
:param float T_ub: upper bound period. Default is None.
:param float tol: tolerance for pseudoinverse. Default is 1e-10.
:param float svd: whether to use svd pseudoinverse. Default is None.
:param str savedir: path where to save the data. Default is None.
:param str dtype: datatype to be used. Default is 'double'.
:param MPI.Comm comm: MPI communicator. Default is None.
:return: where the file with the coefficients is saved,
and associated folder.
:rtype: str, str
'''
s0 = time.time()
st = time.time()
utils_par.pr0(f'\nComputing coefficients' , comm)
utils_par.pr0(f'------------------------------', comm)
rank, size = _configure_parallel(comm=comm)
## get dtypes and required data and params
dt_float, dt_complex = _get_dtype(dtype)
params, eigs_freq, weights = _get_required_data(results_dir)
nt = data.shape[0]
xdim = params['n_space_dims']
nv = params['n_variables']
freq = eigs_freq['freq']
n_freq = params['n_freq']
n_dft = params['n_dft']
n_modes_save = params['n_modes_save']
data = _set_dtype(data, dtype)
weights = _set_dtype(weights, dtype)
# get default spectral estimation parameters and options
# define default spectral estimation parameters
if isinstance(n_dft, int):
window = _hamming_window(n_dft)
window = _set_dtype(window, dtype)
window_name = 'hamming'
else:
raise TypeError('n_dft must be an integer.')
# determine correction for FFT window gain
win_weight = 1 / np.mean(window)
window = window.reshape(window.shape[0], 1)
## initialize frequencies
if ((T_lb is None) or (T_ub is None)) and (freq_idx is None):
f_idx_lb = 0
f_idx_ub = n_freq - 1
f_lb = freq[f_idx_lb]
f_ub = freq[f_idx_ub]
else:
f_lb, f_idx_lb = post.find_nearest_freq(freq_req=1/T_ub, freq=freq)
f_ub, f_idx_ub = post.find_nearest_freq(freq_req=1/T_lb, freq=freq)
n_freq_r = f_idx_ub - f_idx_lb + 1
if freq_idx is None:
freq_idx = np.arange(f_idx_lb, f_idx_ub + 1)
utils_par.pr0(f'- identified frequencies: {time.time() - st} s.', comm)
st = time.time()
## initialize coeffs matrix
shape_tmp = (n_freq_r, nt, n_modes_save)
coeffs = np.zeros(shape_tmp, dtype=dt_complex)
## distribute data and weights if parallel
data, max_axis, _ = utils_par.distribute_data(data, comm)
weights = utils_par.distribute_dimension(weights, max_axis, comm)
## add axis for single variable
if not isinstance(data,np.ndarray): data = data.values
if (nv == 1) and (data.ndim != xdim + 2):
data = data[...,np.newaxis]
xsize = weights.size
xshape_nv = data[0,...].shape
## flatten spatial x variable dimensions
data = np.reshape(data, [nt, xsize])
weights = np.reshape(weights, [xsize, 1])
## compute time mean and subtract from data (reuse the one from fit?)
lt_mean = np.mean(data, axis=0, dtype=np.float64)
data = data - lt_mean
utils_par.pr0(f'- data and time mean: {time.time() - st} s.', comm)
st = time.time()
## get modes
# phir = _get_modes(results_dir, n_freq_r,
# freq_idx, max_axis, xsize, n_modes_save, dt_complex, comm)
phir = np.empty([n_freq, xsize, n_modes_save], dtype=dt_complex)
for i_freq in freq_idx:
phi = post.get_modes_at_freq(results_dir, freq_idx=i_freq)
phi = utils_par.distribute_dimension(phi, max_axis, comm)
phi = np.reshape(phi,[xsize,n_modes_save])
phir[i_freq,...] = phi
utils_par.pr0(f'- retrieved modes: {time.time() - st} s.', comm)
st = time.time()
## padding the data with zeroes at both ends
## probably hstack needed. (need to add two columns of zeros)
z = np.zeros([int(np.ceil(n_dft/2)), xsize])
data = np.concatenate((z, data, z), axis=0)
win_corr_fac = n_dft / win_weight;
# siom.savemat(os.path.join(CWD, 'X_tcoeffs.mat'), {'X': data})
# siom.savemat(os.path.join(CWD, 'P_tcoeffs.mat'), {'P': phir})
## create coeffs folder
coeffs_dir = os.path.join(results_dir, f'coeffs_{f_idx_lb}_{f_idx_ub}')
if savedir is not None: coeffs_dir = os.path.join(coeffs_dir, savedir)
if rank == 0:
if not os.path.exists(coeffs_dir): os.makedirs(coeffs_dir)
utils_par.barrier(comm)
## calculate expansion coefficients
for i in range(0, nt):
utils_par.pr0(f'--- time {i+1}/{nt}', comm)
Q_blk = np.fft.fft(data[i:i+n_dft,:] * window, axis=0)
Q_blk = Q_blk[freq_idx,:]
## correction for windowing of zero-padded data
thres = int(np.ceil(n_dft / 2))
if (i < thres - 2):
lb = thres - i - 1
ub = n_dft
corr = math.sqrt(win_corr_fac / np.sum(window[lb:ub]))
elif (i >= nt - thres):
lb = 0
ub = nt + thres - i - 1
corr = math.sqrt(win_corr_fac / np.sum(window[lb:ub]))
else:
corr = 1
# ## loop over freqs and modes
# for j in freq_idx:
# for l in range(0,3):
# p_tmp = np.squeeze(phir[j,:,l]).conj().T
# x_tmp = (Q_blk[j,:] * np.squeeze(weights)).T
# p_tmp = p_tmp[None,...]
# x_tmp = x_tmp[...,None]
# coeffs[j,i,l] = corr * (p_tmp @ x_tmp)
# print(f'{coeffs.shape = :}')
# print(f'{x_tmp.shape = :}')
# print(f'{p_tmp.shape = :}')
# print(f'{phir.shape = :}')
# print(f'{Q_blk.shape = :}')
## loop over freqs and modes
p_tmp = np.squeeze(phir).conj().T
x_tmp = (Q_blk * np.squeeze(weights))
# p_tmp = p_tmp[None,...]
# x_tmp = x_tmp[...]
# print(f'{coeffs.shape = :}')
# print(f'{x_tmp.shape = :}')
# print(f'{p_tmp.shape = :}')
# print(f'{phir.shape = :}')
# print(f'{Q_blk.shape = :}')
coeffs[:,i,:] = corr * np.einsum('ij,kji->ik', x_tmp, p_tmp)
utils_par.pr0(f'- calculation expansion coeffs done: '
f'{time.time() - st} s.', comm)
st = time.time()
utils_par.barrier(comm)
# siom.savemat(os.path.join(CWD, 'a_tcoeffs.mat'), {'a': coeffs})
del data, weights
## save auxiliary files
file_phir = os.path.join(coeffs_dir, 'modes_r.npy')
file_lt_mean = os.path.join(coeffs_dir, 'ltm.npy')
shape_tmp = (*xshape_nv,n_freq_r,n_modes_save)
shape_phir = [*shape_tmp]
shape_lt_mean = [*xshape_nv]
if comm:
shape_phir[max_axis] = -1
shape_lt_mean[max_axis] = -1
phir.shape = shape_tmp
lt_mean.shape = xshape_nv
utils_par.npy_save(comm, file_phir, phir, axis=max_axis)
utils_par.npy_save(comm, file_lt_mean, lt_mean, axis=max_axis)
utils_par.barrier(comm)
del lt_mean, phir
# save coefficients
file_coeffs = os.path.join(coeffs_dir, 'coeffs.npy')
if rank == 0: np.save(file_coeffs, coeffs)
utils_par.barrier(comm)
del coeffs
## dump file with coeffs params
params['coeffs_dir' ] = str(coeffs_dir)
params['modes_idx' ] = modes_idx
if T_lb is not None: params['T_lb'] = float(T_lb)
if T_ub is not None: params['T_ub'] = float(T_ub)
params['n_freq_r' ] = int(n_freq_r)
params['freq_lb' ] = float(f_lb)
params['freq_ub' ] = float(f_ub)
params['freq_idx_lb'] = int(f_idx_lb)
params['freq_idx_ub'] = int(f_idx_ub)
params['max_axis' ] = int(max_axis)
path_params_coeffs = os.path.join(coeffs_dir, 'params_coeffs.yaml')
with open(path_params_coeffs, 'w') as f: yaml.dump(params, f)
utils_par.pr0(f'- saving completed: {time.time() - st} s.' , comm)
utils_par.pr0(f'-----------------------------------------' , comm)
utils_par.pr0(f'Coefficients saved in folder: {file_coeffs}', comm)
utils_par.pr0(f'Elapsed time: {time.time() - s0} s.' , comm)
utils_par.barrier(comm)
return file_coeffs, coeffs_dir
[docs]
def compute_reconstruction(
coeffs_dir, time_idx, coeffs=None, savedir=None, filename=None,
dtype='double', comm=None):
'''
Reconstruct original data through oblique projection.
:param str coeffs_dir: path to coefficients folder.
:param list time_idx: ids of times to be used for building reconstruction.
:param list coeffs: coefficients. Default is None.
:param str savedir: path where to save the data. Default is None.
:param str filename: filename to use for saving reconstruction.
Default is None.
:param str dtype: datatype to be used. Default is 'double'.
:param MPI.Comm comm: MPI communicator. Default is None.
:return: where the file with the reconstruction is saved,
and associated folder.
:rtype: str, str
'''
s0 = time.time()
st = time.time()
utils_par.pr0(f'\nReconstructing data from coefficients' , comm)
utils_par.pr0(f'------------------------------------------', comm)
rank, size = _configure_parallel(comm=comm)
## get dtypes
dt_float, dt_complex = _get_dtype(dtype)
## load required files
coeffs_dir = os.path.join(CWD, coeffs_dir)
file_lt_mean = os.path.join(coeffs_dir, 'ltm.npy')
file_phir = os.path.join(coeffs_dir, 'modes_r.npy')
file_params = os.path.join(coeffs_dir, 'params_coeffs.yaml')
lt_mean = np.lib.format.open_memmap(file_lt_mean)
phir = np.lib.format.open_memmap(file_phir)
with open(file_params) as f:
params = yaml.load(f, Loader=yaml.FullLoader)
xshape_nv = lt_mean.shape
## try to load coefficients from file if not provided
if coeffs is None:
try:
file_coeffs = os.path.join(coeffs_dir, 'coeffs.npy')
coeffs = np.lib.format.open_memmap(file_coeffs)
except:
raise Exception('`coeffs` file not found.')
## set datatypes
coeffs = _set_dtype(coeffs, dtype)
phir = _set_dtype(phir, dtype)
# get time snapshots to be reconstructed
nt = coeffs.shape[1]
if time_idx is None:
time_idx = [0,nt%2,nt-1]
elif isinstance(time_idx, str):
if time_idx.lower() == 'all' : time_idx = np.arange(0, nt)
elif time_idx.lower() == 'half' : time_idx = np.arange(0, nt, 2)
elif time_idx.lower() == 'quarter' : time_idx = np.arange(0, nt, 4)
elif time_idx.lower() == 'tenth' : time_idx = np.arange(0, nt, 10)
elif time_idx.lower() == 'hundredth': time_idx = np.arange(0, nt, 100)
elif isinstance(time_idx, list):
time_idx = time_idx
else:
raise TypeError('`time_idx` parameter type not recognized.')
## distribute modes_r and longtime mean
max_axis = params['max_axis']
phir = utils_par.distribute_dimension(phir, max_axis, comm)
lt_mean = utils_par.distribute_dimension(lt_mean, max_axis, comm)
if coeffs.ndim == 2:
Q_rec = _compute_rec_op(phir, coeffs, lt_mean, time_idx, comm)
elif coeffs.ndim == 3:
Q_rec = _compute_rec_conv(phir, coeffs, lt_mean, time_idx, comm)
else:
raise ValueError('dimension of `coeffs` not recognized.')
## reshape and save reconstructed solution
if filename is None: filename = 'reconstructed'
if savedir is not None:
coeffs_dir = os.path.join(coeffs_dir, savedir)
if rank == 0:
if not os.path.exists(coeffs_dir): os.makedirs(coeffs_dir)
if comm: comm.Barrier()
file_dynamics = os.path.join(coeffs_dir, filename+'.npy')
shape = [*xshape_nv, len(time_idx)]
if comm:
shape[max_axis] = -1
Q_rec.shape = shape
Q_rec = np.moveaxis(Q_rec, -1, 0)
utils_par.npy_save(comm, file_dynamics, Q_rec, axis=max_axis+1)
utils_par.pr0(f'- data saved: {time.time() - st} s.' , comm)
utils_par.pr0(f'--------------------------------------------', comm)
utils_par.pr0(f'Reconstructed data saved in: {file_dynamics}', comm)
utils_par.pr0(f'Elapsed time: {time.time() - s0} s.' , comm)
utils_par.barrier(comm)
return file_dynamics, coeffs_dir
def _compute_rec_op(phir, coeffs, lt_mean, time_idx, comm=None):
st = time.time()
## phi x coeffs
Q_rec = phir @ coeffs[:,time_idx]
utils_par.pr0(f'- phi x a completed: {time.time() - st} s.', comm)
del phir, coeffs
st = time.time()
## add time mean
Q_rec = Q_rec + lt_mean[...,None]
utils_par.pr0(f'- added time mean: {time.time() - st} s.', comm)
st = time.time()
return Q_rec
def _compute_rec_conv(phir, coeffs, lt_mean, time_idx, comm=None):
st = time.time()
## reshape modes and coeffs
nt = coeffs.shape[1]
xshape_nv = phir[...,0,0].shape
n_freq_r = coeffs.shape[0]
n_modes_save = coeffs.shape[-1]
phir = np.reshape(phir, (xshape_nv + (n_modes_save * n_freq_r,)))
coeffs = np.einsum('ijk->ikj', coeffs)
coeffs = np.reshape(coeffs, [n_modes_save * n_freq_r, nt])
## phi x coeffs
Q_rec = phir @ coeffs[:,time_idx]
utils_par.pr0(f'- phi x a completed: {time.time() - st} s.', comm)
del phir, coeffs
st = time.time()
## add time mean
Q_rec = Q_rec + lt_mean[...,None]
utils_par.pr0(f'- added time mean: {time.time() - st} s.', comm)
st = time.time()
return Q_rec
def _oblique_projection(phir, weights, data, tol, svd=False,
dtype='double', comm=None):
'''Compute oblique projection for time coefficients.'''
## get dtypes
dt_float, dt_complex = _get_dtype(dtype)
data = data.T
M = phir.conj().T @ (weights * phir)
Q = phir.conj().T @ (weights * data)
del weights, data, phir
M = utils_par.allreduce(data=M, comm=comm)
Q = utils_par.allreduce(data=Q, comm=comm)
coeffs = np.zeros([Q.shape[1], Q.shape[0]])
if svd:
u, l, v = np.linalg.svd(M)
l_inv = np.zeros([len(l),len(l)], dtype=dt_complex)
l_max = np.max(l)
for i in range(len(l)):
if (l[i] > tol * l_max):
l_inv[i,i] = 1 / l[i]
M_inv = (v.conj().T @ l_inv) @ u.conj().T
coeffs = M_inv @ Q
del u, l, v
del l_inv
del l_max
del M_inv
del Q, M
else:
M_inv = np.linalg.pinv(M, tol)
coeffs = M_inv @ Q
del M_inv
del Q, M
return coeffs
# def _oblique_projection(phir, weights, data, tol, svd=False,
# dtype='double', comm=None):
# '''Compute oblique projection for time coefficients.'''
# ## get dtypes
# dt_float, dt_complex = _get_dtype(dtype)
# data = data.T
# M = phir.conj().T @ (weights * phir)
# Q = phir.conj().T @ (weights * data)
# print(f'{M.shape = :}')
# print(f'{Q.shape = :}')
# sys.exit(2)
# del weights, data, phir
# M = utils_par.allreduce(data=M, comm=comm)
# Q = utils_par.allreduce(data=Q, comm=comm)
# coeffs = np.zeros([Q.shape[1], Q.shape[0]])
# if svd:
# u, l, v = np.linalg.svd(M)
# l_inv = np.zeros([len(l),len(l)], dtype=dt_complex)
# l_max = np.max(l)
# for i in range(len(l)):
# if (l[i] > tol * l_max):
# l_inv[i,i] = 1 / l[i]
# M_inv = (v.conj().T @ l_inv) @ u.conj().T
# coeffs = M_inv @ Q
# del u, l, v
# del l_inv
# del l_max
# del M_inv
# del Q, M
# else:
# M_inv = np.linalg.pinv(M, tol)
# coeffs = M_inv @ Q
# del M_inv
# del Q, M
# return coeffs
def _hamming_window(N):
'''
Standard Hamming window of length N
'''
x = np.arange(0,N,1)
window = (0.54 - 0.46 * np.cos(2 * np.pi * x / (N-1))).T
return window
# def _slepsec(n_dft, bw, n_tapers):
# '''
# SLEPSEC Discrete prolate spheroidal sequences of length nDFT and
# time-halfbandwidth product bw
# '''
# df = bw / n_dft
# j = np.arange(0:n_dft-1)
# r1 = [df * 2 * np.pi, np.sin(2 * np.pi * df * j) ./ j]
# # S = toeplitz(r1)
# S = toeplitz(r1)
# [U,L] = np.eig(S)
# [~,idx] = np.sort(diag(L),'descend')
# U = U[:,idx]
# window = U[:,1:n_tapers]
# return window
def _configure_parallel(comm):
if comm:
rank = comm.rank
size = comm.size
else:
rank = 0
size = 1
return rank, size
def _get_required_data(results_dir):
## load required files
results_dir = os.path.join(CWD, results_dir)
file_eigs_freq = os.path.join(results_dir, 'eigs_freq.npz')
file_weights = os.path.join(results_dir, 'weights.npy')
file_params = os.path.join(results_dir, 'params_modes.yaml')
## try to load basic file from modes calculation
try: eigs_freq = np.load(file_eigs_freq)
except:
raise Exception(
'eigs_freq.npz not found. Consider running fit to '
'compute SPOD modes before computing coefficients.')
## load rest of files if found
weights = np.lib.format.open_memmap(file_weights)
with open(file_params) as f:
params = yaml.load(f, Loader=yaml.FullLoader)
return params, eigs_freq, weights
def _get_modes(results_dir, n_freq, freq_idx,
max_axis, xsize, n_modes_save, dt_complex, comm):
## order weights and modes such that each frequency contains
## all required modes (n_modes_save)
## - freq_0: modes from 0 to n_modes_save
## - freq_1: modes from 0 to n_modes_save
## ...
# initialize modes
shape = (xsize, n_freq*n_modes_save)
phir = np.zeros(shape, dtype=dt_complex)
cnt_freq = 0
for i_freq in freq_idx:
phi = post.get_modes_at_freq(results_dir, freq_idx=i_freq)
phi = utils_par.distribute_dimension(phi, max_axis, comm)
phi = np.reshape(phi,[xsize,n_modes_save])
for i_mode in range(n_modes_save):
jump_freq = n_modes_save * cnt_freq + i_mode
phir[:,jump_freq] = phi[:,i_mode]
cnt_freq = cnt_freq + 1
del phi
return phir
def _get_dtype(dtype):
if (dtype == 'double') or (dtype == np.float64):
d_float = np.float64
d_complex = np.complex128
elif (dtype == 'single') or (dtype == np.float32):
d_float = np.float32
d_complex = np.complex64
else:
raise ValueError(f'invalid dtype {dtype}.')
return d_float, d_complex
def _set_dtype(d, dtype):
## set data type
dt_float, dt_complex = _get_dtype(dtype)
if d.dtype == float : d = d.astype(dt_float )
elif d.dtype == complex: d = d.astype(dt_complex)
return d