'''Derived module from spod_base.py for standard SPOD.'''
# Import standard Python packages
import os
import sys
import time
import math
import numpy as np
from numpy import linalg as la
import scipy.io.matlab as siom
# Import custom Python packages
from pyspod.spod.base import Base
import pyspod.utils.parallel as utils_par
try:
from mpi4py import MPI
except:
pass
[docs]
class Standard(Base):
'''
Class that implements a distributed batch version of the
Spectral Proper Orthogonal Decomposition algorithm to the input data.
The computation is performed on the *data* passed
to the `fit` method of the `Standard` class, derived
from the `Base` class.
'''
[docs]
def fit(self, data_list, variables = None):
'''
Class-specific method to fit the data matrix using the SPOD
batch algorithm.
:param list data_list: list containing data matrices for which
to compute the SPOD.
'''
start0 = time.time()
start = time.time()
## initialize data and variables
self._pr0(f' ')
self._pr0(f'Initialize data ...')
self._initialize(data_list, variables)
self._pr0(f'Time to initialize: {time.time() - start} s.')
start = time.time()
self._pr0(f' ')
self._pr0(f'Calculating temporal DFT (parallel)')
self._pr0(f'------------------------------------')
# check if blocks are already saved in memory
blocks_present = False
if self._reuse_blocks:
blocks_present = self._are_blocks_present(
self._n_blocks, self._n_freq, self._blocks_folder, self._comm)
# loop over number of blocks and generate Fourier realizations,
# if blocks are not saved in storage
## check if blocks already computed or not
if blocks_present:
# load blocks if present
Q_hats = {}
size_qhat = [*self._xshape, self._n_blocks]
for f in range(self._n_freq):
Q_hats[f] = np.empty(size_qhat, dtype=self._complex)
for i_blk in range(0, self._n_blocks):
print(f'Loading block {i_blk}/{self._n_blocks}')
for i_freq in range(0, self._n_freq):
file = f'fft_block{i_blk:08d}_freq{i_freq:08d}.npy'
path = os.path.join(self._blocks_folder, file)
Q_hats[i_freq][...,i_blk] = np.load(path)
for f in range(self._n_freq):
Q_hats[f] = utils_par.distribute_dimension(
data=Q_hats[f], max_axis=self._max_axis, comm=self._comm)
qhat = Q_hats[f]
shape = [qhat[...,0].size, qhat.shape[-1]]
Q_hats[f] = np.reshape(Q_hats[f], shape)
del self.data
else:
# loop over number of blocks and generate Fourier realizations
if isinstance(self.data, dict):
last_key = list(self.data)[-1]
last_val = self.data[last_key]["v"]
xvsize = last_val[0,...].size
else:
xvsize = self.data[0,...].size
Q_hats = {}
for i_blk in range(0,self._n_blocks):
st = time.time()
# compute block
qhat = np.empty([self._n_freq, xvsize], dtype=self._complex)
qhat[:], offset = self._compute_blocks(i_blk)
Q_hats[i_blk] = {}
for f in range(self._n_freq):
Q_hats[i_blk][f] = qhat[f,:].copy()
# save FFT blocks in storage memory
if self._savefft == True:
Q_blk_hat = qhat
for i_freq in range(0, self._n_freq):
Q_blk_hat_fr = Q_blk_hat[i_freq,:]
file = f'fft_block{i_blk:08d}_freq{i_freq:08d}.npy'
path = os.path.join(self._blocks_folder, file)
shape = [*self._xshape]
if self._comm: shape[self._max_axis] = -1
Q_blk_hat_fr.shape = shape
utils_par.npy_save(
self._comm, path, Q_blk_hat_fr,
axis=self._max_axis)
# print info file
self._pr0(f'block {(i_blk+1)}/{(self._n_blocks)}'
f' ({(offset)}:{(self._n_dft+offset)}); '
f'Elapsed time: {time.time() - st} s.')
del self.data
st = time.time()
# move from Q_hats[i_blk][f] to Q_hats[f]
Q_hats = self._flip_qhat(Q_hats)
self._pr0(f'- Time spent transposing Q_hats dictionaries: {time.time() - st} s.')
self._pr0(f'------------------------------------')
self._pr0(f'Time to compute DFT: {time.time() - start} s.')
if self._comm: self._comm.Barrier()
start = time.time()
# Loop over all frequencies and calculate SPOD
self._pr0(f' ')
self._pr0(f'Calculating SPOD (parallel)')
self._pr0(f'------------------------------------')
self._eigs = np.zeros([self._n_freq,self._n_blocks],
dtype=self._complex)
## compute standard spod
self._compute_standard_spod(Q_hats)
# store and save results
self._store_and_save()
self._pr0(f'------------------------------------')
self._pr0(f' ')
self._pr0(f'Results saved in folder {self._savedir_sim}')
self._pr0(f'Time to compute SPOD: {time.time() - start} s.')
self._pr0(f'------------------------------------')
self._pr0(f' ')
self._pr0(f'Total time: {time.time() - start0} s.')
if self._comm: self._comm.Barrier()
return self
def _compute_blocks(self, i_blk):
'''Compute FFT blocks.'''
# get time index for present block
offset = min(i_blk * (self._n_dft - self._n_overlap) \
+ self._n_dft, self._nt) - self._n_dft
Q_blk = self._get_block(offset, offset+self._n_dft)
# Subtract longtime or provided mean
Q_blk -= self._t_mean
# if block mean is to be subtracted,
# do it now that all data is collected
if self._mean_type.lower() == 'blockwise':
Q_blk -= np.mean(Q_blk, axis=0)
# normalize by pointwise variance
if self._normalize_data:
den = self._n_dft - 1
Q_var = np.sum((Q_blk - np.mean(Q_blk, axis=0))**2, axis=0) / den
# address division-by-0 problem with NaNs
Q_var[Q_var < 4 * np.finfo(float).eps] = 1
Q_blk /= Q_var
Q_blk *= self._window
Q_blk = self._set_dtype(Q_blk)
if self._isrealx and not self._fullspectrum:
Q_blk_hat = (self._win_weight / self._n_dft) * np.fft.rfft(Q_blk, axis=0)
else:
Q_blk_hat = (self._win_weight / self._n_dft) * np.fft.fft(Q_blk, axis=0)[0:self._n_freq,:]
return Q_blk_hat, offset
def _compute_standard_spod(self, Q_hats):
'''Compute standard SPOD.'''
comm = self._comm
# compute inner product in frequency space, for given frequency
st = time.time()
M = [None]*self._n_freq
for f in range(0,self._n_freq):
Q_hat_f = np.squeeze(Q_hats[f])
M[f] = Q_hat_f.conj().T @ (Q_hat_f * self._weights) / self._n_blocks
del Q_hat_f
M = np.stack(M)
M = utils_par.allreduce(data=M, comm=self._comm)
self._pr0(f'- M computation: {time.time() - st} s.')
st = time.time()
## compute eigenvalues and eigenvectors
L, V = la.eig(M)
L = np.real_if_close(L, tol=1000000)
del M
# reorder eigenvalues and eigenvectors
## double non-zero freq and non-Nyquist
for f, Lf in enumerate(L):
idx = np.argsort(Lf)[::-1]
L[f,:] = L[f,idx]
vf = V[f,...]
vf = vf[:,idx]
V[f] = vf
self._pr0(f'- Eig computation: {time.time() - st} s.')
st = time.time()
# compute spatial modes for given frequency
L_diag = np.sqrt(self._n_blocks) * np.sqrt(L)
L_diag_inv = 1. / L_diag
if not self._savefreq_disk2:
for f in range(0,self._n_freq):
s0 = time.time()
## compute
phi = np.matmul(Q_hats[f], V[f,...] * L_diag_inv[f,None,:])
phi = phi[...,0:self._n_modes_save]
del Q_hats[f]
sstime = time.time()
## save modes
if self._savefreq_disk:
filename = f'freq_idx_{f:08d}.npy'
p_modes = os.path.join(self._modes_dir, filename)
shape = [*self._xshape,self._nv,self._n_modes_save]
if comm:
shape[self._max_axis] = -1
phi.shape = shape
utils_par.npy_save(self._comm, p_modes, phi, axis=self._max_axis)
self._pr0(
f'freq: {f+1}/{self._n_freq}; (f = {self._freq[f]:.5f}); '
f'Elapsed time: {(time.time() - s0):.5f} s.')
####################################
####################################
####################################
else: # savefreq_disk2
####################################
####################################
####################################
assert self._reader._flattened, "savefreq_disk2 currently only works with flattened data"
rank = comm.rank
ftype = MPI.C_FLOAT_COMPLEX if self._complex==np.complex64 else MPI.C_DOUBLE_COMPLEX
cum_cctime = 0
cum_sstime = 0
phi_dict = {}
for f in range(0,self._n_freq):
s0 = time.time()
phi_dict[f] = {}
phi = np.matmul(Q_hats[f], V[f,...] * L_diag_inv[f,None,:])[:,:self._n_modes_save]
Q_hats[f] = None
cum_cctime += time.time() - s0
s1 = time.time()
for m in range(0,self._n_modes_save):
phi_dict[f][m] = phi[:,m].copy() # make sure modes beyond n_modes_save can be deallocated
del phi
cum_sstime += time.time() - s1
self._pr0(
f'freq: {f+1}/{self._n_freq}; (f = {self._freq[f]:.5f}); '
f'Elapsed time: {(time.time() - s0):.5f} s.')
del V
del Q_hats
sstime = time.time()
# get max phi shape
phi0_max = comm.allreduce(phi_dict[0][0].shape[0], op=MPI.MAX)
phi_dtype = phi_dict[0][0].dtype
mpi_dtype = ftype.Create_contiguous(phi0_max).Commit()
local_elements = np.array(phi_dict[0][0].shape[0])
recvcounts = np.zeros(comm.size, dtype=np.int64)
comm.Allgather(local_elements, recvcounts)
total_files = self._n_freq * self._n_modes_save
for ipass in range(0,math.ceil(total_files/comm.size)):
write_s = ipass * comm.size
write_e = min((ipass+1) * comm.size, total_files)
write = None
data = np.zeros(phi0_max*comm.size, dtype=phi_dtype)
s_msgs = {}
reqs_r = []
reqs_s = []
for i in range(write_s, write_e):
f = i // self._n_modes_save
m = i % self._n_modes_save
writer = i % comm.size
s_msgs[i] = [np.zeros(phi0_max, dtype=phi_dtype), mpi_dtype]
s_msgs[i][0][0:phi_dict[f][m].shape[0]] = phi_dict[f][m][:] # phi0_max-shaped and 0-padded
del phi_dict[f][m]
reqs_s.append(comm.Isend(s_msgs[i], dest=writer))
if rank == writer:
write = (f,m)
for irank in range(comm.size):
reqs_r.append(comm.Irecv([data[phi0_max*irank:],mpi_dtype],source=irank))
MPI.Request.Waitall(reqs_s)
s_msgs = {}
if write:
f, m = write
xtime = time.time()
MPI.Request.Waitall(reqs_r)
self._pr0(f' Waitall({len(reqs_r)}) {time.time()-xtime} seconds')
for irank in range(comm.size):
start = irank*phi0_max
end = start+recvcounts[irank]
start_nopad = np.sum(recvcounts[:irank])
end_nopad = np.sum(recvcounts[:irank+1])
data[start_nopad:end_nopad,...] = data[start:end,...]
# write to disk
data = data[:np.sum(recvcounts)].reshape(self._xshape+(self._nv,))
filename = f'freq_idx_f{f:08d}_m{m:08d}.npy'
print(f'rank {rank} saving {filename}')
p_modes = os.path.join(self._modes_dir, filename)
np.save(p_modes, data)
mpi_dtype.Free()
cum_sstime += time.time() - sstime
self._pr0(f'- Modes computation {cum_cctime} s. Saving: {cum_sstime} s.')
## correct Fourier for one-sided spectrum
if self._isrealx:
L[1:-1,:] = 2 * L[1:-1,:]
# get eigenvalues and confidence intervals
self._eigs = np.abs(L)
fac_lower = 2 * self._n_blocks / self._xi2_lower
fac_upper = 2 * self._n_blocks / self._xi2_upper
self._eigs_c[...,0] = self._eigs * fac_lower
self._eigs_c[...,1] = self._eigs * fac_upper
def _get_block(self, start, end):
if isinstance(self.data, dict):
last_key = list(self.data)[-1]
last_val = self.data[last_key]["v"]
Q_blk = np.empty((self._n_dft,)+last_val.shape[1:],dtype=last_val.dtype)
cnt = 0
bstart = start
for k,v in self.data.items():
v_s = v["s"]
v_e = v["e"]
read_here_s = max(v_s, bstart)
read_here_e = min(v_e, end)
read_here_cnt = read_here_e - read_here_s
if read_here_cnt > 0:
vals = v["v"]
Q_blk[cnt:cnt+read_here_cnt,...] = vals[read_here_s-v_s:read_here_e-v_s,...]
cnt += read_here_cnt
bstart += read_here_cnt
assert cnt == end-start, f'Not enough data read: cnt ({cnt}) != end-start ({end-start})'
# delete blocks that are no longer needed
keys_to_del = []
for k,v in self.data.items():
v_s = v["s"]
v_e = v["e"]
if start > v_e:
keys_to_del.append(k)
for k in keys_to_del:
del self.data[k]
Q_blk = Q_blk.reshape(self._n_dft, last_val[0,...].size)
return Q_blk
else:
Q_blk = self.data[start:end,...].copy()
Q_blk = Q_blk.reshape(self._n_dft, self.data[0,...].size)
return Q_blk
def _flip_qhat(self, Q_hats):
last_blk = list(Q_hats)[-1]
last_frq = Q_hats[last_blk]
last_val = last_frq[list(last_frq)[-1]]
xvsize = last_val.size
Q_hat_f = {}
for f in range(0,self._n_freq):
Q_hat_f[f] = np.zeros((xvsize, self._n_blocks),dtype=last_val.dtype)
for b,v in Q_hats.items():
Q_hat_f[f][:,b] = v[f][:]
for b,_ in Q_hats.items():
del Q_hats[b][f]
return Q_hat_f