'''Derived module from spod_base.py for streaming SPOD.'''
# import standard python packages
import os
import time
import numpy as np
from numpy import linalg as la
import pyspod.utils.parallel as utils_par
from pyspod.spod.base import Base
[docs]
class Streaming(Base):
'''
Class that implements a distributed streaming version of the
Spectral Proper Orthogonal Decomposition algorithm to the input data.
The computation is performed on the data passed
to the `fit` method of the `Streaming` class, derived
from the `Base` class.
'''
[docs]
def fit(self, data_list):
'''
Class-specific method to fit the data matrix using the SPOD
streaming algorithm.
:param list data_list: list containing data matrices for which
to compute the SPOD.
'''
start = time.time()
## if user forgets to pass list for single data list,
## make it to be a list
if not isinstance(data_list, list): data_list = [data_list]
## initialize data and variables
self._initialize(data_list, streaming = True)
## sqrt of weights
sqrt_w = np.sqrt(self._weights)
## separation between adjacent blocks
dn = self._n_dft - self._n_overlap
## number of blocks being updated in parallel if segments overlap
n_blocks_par = int(np.ceil(self._n_dft / dn))
## sliding, relative time index for each block
t_idx = np.zeros([n_blocks_par,1], dtype=int)
for block_i in range(0,n_blocks_par):
t_idx[block_i] = t_idx[block_i] - (block_i) * dn
self._pr0(f' ')
self._pr0(f'Calculating temporal DFT (streaming)')
self._pr0(f'------------------------------------')
## obtain first snapshot to determine data size
data = self._reader.get_data(ts=0)
assert data.shape[0] == 1, 'Data returned should have been a single snapshot'
flat_dim = int(data[0,...].size)
n_m_save = self._n_modes_save
n_freq = self._n_freq
x_new = data[0,...]
x_new = np.reshape(x_new,(flat_dim,1))
## allocate data arrays
mu = np.zeros([flat_dim,1], dtype=self._complex)
x_hat = np.zeros([flat_dim,n_freq],dtype=self._complex)
x_sum = np.zeros([flat_dim,n_freq,n_blocks_par],dtype=self._complex)
phi = np.zeros([flat_dim,n_freq,n_m_save],dtype=self._complex)
u_hat = np.zeros([flat_dim,n_freq,n_m_save],dtype=self._complex)
self._eigs = np.zeros([n_m_save,n_freq],dtype=self._complex)
## dft matrix
dft = np.fft.fft(np.identity(self._n_dft))
## check if real for frequency axis
if self._isrealx:
dft[:,1:n_freq-1] = 2 * dft[:,1:n_freq-1]
if self._fullspectrum:
freq_idx = np.arange(0, int(self._n_dft), 1)
else:
freq_idx = np.arange(0, int(self._n_dft/2+1))
dft = dft[:,freq_idx]
# ## convergence tests
# mse_prev = np.empty([int(1e3),n_m_save,n_freq],dtype=complex) * np.nan
# proj_prev = np.empty([n_freq,int(1e3),n_m_save],dtype=complex) * np.nan
# S_hat_prev = np.zeros([n_m_save,n_freq],dtype=complex)
## initialize counters
block_i = 0
ti = -1
z = np.zeros([1,n_m_save], dtype=self._float)
while True:
ti = ti + 1
## get new snapshot and abort if data stream runs dry
if ti > 0:
try:
x_new = self._reader.get_data(ti)
x_new = np.reshape(x_new,(flat_dim,1))
except:
self._pr0(f'--> Data stream ended.')
break
## update sample mean
mu_old = mu
mu = (ti * mu_old + x_new) / (ti + 1)
## update incomplete dft sums, eqn (17)
update = False
window = self._window
for block_j in range(0,n_blocks_par):
if t_idx[block_j] > -1:
x_sum[:,:,block_j] = \
x_sum[:,:,block_j] + window[t_idx[block_j]] * \
dft[t_idx[block_j],:] * x_new
## check if sum is completed, and if so, initiate update
if t_idx[block_j] == self._n_dft - 1:
update = True
x_hat = x_sum[:,:,block_j].copy()
x_sum[:,:,block_j] = 0
t_idx[block_j] = min(t_idx) - dn
else:
t_idx[block_j] = t_idx[block_j] + 1
del x_new
## update basis if a dft sum is completed
if update:
block_i = block_i + 1
## subtract mean contribution to dft sum
for row_idx in range(0,self._n_dft):
x_hat = x_hat - (window[row_idx] * dft[row_idx,:]) * mu
## correct for windowing function and apply
## 1/self._n_dft factor
x_hat = self._win_weight / self._n_dft * x_hat
if block_i == 0:
## initialize basis with first vector
self._pr0(
f'--> Initializing left singular vectors; '
f' Time {str(ti)} / block {str(block_i)}')
u_hat[:,:,0] = x_hat * sqrt_w
self._eigs[0,:] = np.sum(abs(u_hat[:,:,0]**2))
else:
## update basis
self._pr0(
f'--> Updating left singular vectors; '
f' Time {str(ti)} / block {str(block_i)}')
# S_hat_prev = self._eigs.copy()
for i_freq in range(0,n_freq):
## new data (weighted)
x = x_hat[:,[i_freq]] * sqrt_w[:]
## old basis
U = np.squeeze(u_hat[:,i_freq,:])
## old singular values
S = np.diag(np.squeeze(self._eigs[:,i_freq]))
## product U^H*x needed in eqns. (27,32)
Ux = np.matmul(U.conj().T, x)
Ux = utils_par.allreduce(Ux, comm=self._comm)
## orthogonal complement to U, eqn. (27)
u_p = x - np.matmul(U, Ux)
## norm of orthogonal complement
abs_up = np.matmul(u_p.conj().T, u_p)
abs_up = utils_par.allreduce(abs_up, comm=self._comm)
abs_up = np.sqrt(abs_up)
## normalized orthogonal complement
u_new = u_p / abs_up
del u_p
## build K matrix and compute its SVD, eqn. (32)
K_1 = np.hstack((np.sqrt(block_i+2) * S, Ux))
del Ux
K_2 = np.hstack((z, abs_up))
K = np.vstack((K_1, K_2))
del K_1, K_2
K = np.sqrt((block_i+1) / (block_i+2)**2) * K
## calculate partial svd
Up, Sp, _ = la.svd(K, full_matrices=False)
del K
## update U as in eqn. (33)
## for simplicity, we could not rotate here and instead
## update U<-[U p] and Up<-[Up 0;0 1]*Up and rotate
## later; see Brand (LAA ,2006, section 4.1)
U_tmp = np.hstack((U, u_new))
U = np.dot(U_tmp, Up)
del U_tmp
## best rank-k approximation, eqn. (37)
u_hat[:,i_freq,:] = U[:,0:self._n_modes_save]
self._eigs[:,i_freq] = Sp[0:self._n_modes_save]
## reset dft sum
x_hat[:,:] = 0
# phi_prev = phi
# phi = u_hat * (1 / sqrt_w[:,:,np.newaxis])
# ## convergence
# for i_freq in range(0,n_freq):
# proj_i_freq = (np.squeeze(phi_prev[:,i_freq,:]) * \
# self._weights).conj().T @ np.squeeze(phi[:,i_freq,:])
# proj_prev[i_freq,block_i,:] = \
# np.amax(np.abs(proj_i_freq), axis=0)
# mse_prev[block_i,:,:] = (np.abs(S_hat_prev**2 - \
# self._eigs**2)**2) / (S_hat_prev**2)
## rescale such that <U_i,U_j>_E = U_i^H * W * U_j = delta_ij
phi = u_hat[:,:,0:n_m_save] * (1 / sqrt_w[:,:,np.newaxis])
# ## shuffle and reshape
phi = np.einsum('ijk->jik', phi)
## save modes
for f in range(0,n_freq):
filename = f'freq_idx_{f:08d}.npy'
path_modes = os.path.join(self._modes_dir, filename)
shape = [*self._xshape, self._nv, self._n_modes_save]
if self._comm:
shape[self._max_axis] = -1
phif = phi[f,...]
phif.shape = shape
utils_par.npy_save(
self._comm, path_modes, phif, axis=self._max_axis)
# ## save modes
# self._modes_dir = 'modes.npy'
# path_modes = os.path.join(self._savedir_sim, self._modes_dir)
# shape = [self._n_freq, *self._xshape, self._nv, self._n_modes_save]
# if self._comm:
# shape[self._max_axis+1] = -1
# phi.shape = shape
# utils_par.npy_save(self._comm, path_modes, phi, axis=self._max_axis+1)
## transpose eigs
self._eigs = self._eigs.T
# store and save results
self._store_and_save()
self._pr0(f'------------------------------------')
self._pr0(f' ')
self._pr0(f'Results saved in folder {self._savedir_sim}')
self._pr0(f'Time to compute SPOD: {time.time() - start} s.')
if self._comm: self._comm.Barrier()
return self