# TODO: something against noise in the beginning --> weights?
import os
import numpy as np
from scipy.linalg import svd
from scipy.integrate import odeint
import matplotlib.pyplot as plt
import trtoolbox.pclasses as pclasses
[docs]class Results(pclasses.Data):
""" Object containing fit results.
Attributes
----------
type : str
Results object type.
taus: np.array
Time constants used for constructing D-matrix.
dmatrix : np.array
D-matrix.
alphas : np.array
Used alpha values for computation (tik method).
lmatrix : np.array
L-matrix (tik method).
lcurve : np.array
L-curve.
k : int
Used SVD Components (tsvd method).
method : string
Used method (tik or tsvd).
x_k : np.array
Resulting exponential pre-factors.
fitdata : np.array
Constructed data (dmatrix.dot(x_k))
"""
def __init__(self):
super().__init__()
self.type = 'lda'
self.taus = np.array([])
self.dmatrix = np.array([])
self.alphas = np.array([])
self.lmatrix = np.array([])
self.lcurve = np.array([])
self.k = None
self.method = ''
self.x_k = np.array([])
self.fitdata = np.array([])
[docs] def get_alpha(self, alpha=-1, index_alpha=-1):
""" Gets alpha value and index.
Parameters
----------
alpha : float
Plot for the closest alpha as specified.
index_alpha : int
Plot for specified alpha at index.
Returns
-------
alpha : float
Alpha value.
index_alpha : int
Index of alpha value.
"""
if index_alpha == -1 and alpha == -1:
# if no alpha is specified take the middle of the alpha array
index_alpha = int(np.ceil(self.alphas.size/2))
elif alpha != -1:
# search for closest alpha value
index_alpha = (np.abs(self.alphas - alpha)).argmin()
alpha = self.alphas[index_alpha]
return alpha, index_alpha
[docs] def get_xk(self, alpha=-1, index_alpha=-1):
""" Gets selected LDA map.
Parameters
----------
alpha : float
Plot for the closest alpha as specified.
index_alpha : int
Plot for specified alpha at index.
Returns
-------
x_k : np.array
LDA map.
title : str
Title for figure.
"""
# check for used method
if len(self.x_k.shape) == 3:
_, index_alpha = self.get_alpha(alpha, index_alpha)
x_k = self.x_k[:, :, index_alpha]
title = 'LDA map at alpha = %f' % (self.alphas[index_alpha])
else:
x_k = self.x_k
title = 'LDA map using TSVD'
return x_k, title
[docs] def plot_fitdata(self, alpha=-1, index_alpha=-1, interpolate=False, step=.5):
""" Plots a nice looking heatmap of fitted data.
Parameters
----------
alpha : float
Plot for the closest alpha as specified.
index_alpha : int
Plot for specified alpha at index.
interpolate : boolean
True for interpolation
step : float
Step size for frequency interpolation.
"""
self.init_phelper()
if self.fitdata.size == 0:
print('First start a LDA.')
return
if len(self.fitdata.shape) == 3:
_, index_alpha = self.get_alpha(alpha, index_alpha)
fitdata = self.fitdata[:, :, index_alpha]
title = 'Fitted data at alpha = %f' % (self.alphas[index_alpha])
else:
fitdata = self.fitdata
title = 'Fitted data using TSVD'
self._phelper.plot_heatmap(
fitdata, self.time, self.wn,
title=title, newfig=True,
interpolate=interpolate, step=step
)
plt.ylabel('%s / %s' % (self.wn_name, self.wn_unit))
plt.xlabel('%s / %s' % (self.time_name, self.time_unit))
plt.title(title)
[docs] def plot_fitdata_3d(self, alpha=-1, index_alpha=-1, interpolate=False, step=.5):
""" Plots a 3D surface of fitted data.
Parameters
----------
alpha : float
Plot for the closest alpha as specified.
index_alpha : int
Plot for specified alpha at index.
interpolate : boolean
True for interpolation
step : float
Step size for frequency interpolation.
"""
self.init_phelper()
if self.fitdata.size == 0:
print('First start a LDA.')
return
if len(self.fitdata.shape) == 3:
_, index_alpha = self.get_alpha(alpha, index_alpha)
fitdata = self.fitdata[:, :, index_alpha]
title = 'Fitted data at alpha = %f' % (self.alphas[index_alpha],)
else:
fitdata = self.fitdata
title = 'Fitted data using TSVD'
self._phelper.plot_surface(
fitdata, self.time, self.wn,
title=title,
interpolate=interpolate, step=step
)
plt.ylabel('%s / %s' % (self.wn_name, self.wn_unit))
plt.xlabel('%s / %s' % (self.time_name, self.time_unit))
plt.title(title)
[docs] def plot_ldamap(self, alpha=-1, index_alpha=-1):
""" Plots a nice looking contourmap.
Parameters
----------
alpha : float
Plot for the closest alpha as specified.
index_alpha : int
Plot for specified alpha at index.
"""
self.init_phelper()
if self.x_k.size == 0:
print('First start a LDA.')
return
x_k, title = self.get_xk(alpha, index_alpha)
self._phelper.plot_contourmap(
x_k, self.taus, self.wn,
title=title, newfig=True)
plt.ylabel('%s / %s' % (self.wn_name, self.wn_unit))
plt.xlabel('%s / %s' % ('tau', self.time_unit))
plt.title(title)
[docs] def plot_traces(self, alpha=-1, index_alpha=-1):
""" Plots interactive time traces.
"""
self.init_phelper()
self._phelper.plot_traces(self, alpha, index_alpha)
[docs] def plot_spectra(self, alpha=-1, index_alpha=-1):
""" Plots interactive spectra.
"""
self.init_phelper()
self._phelper.plot_spectra(self, alpha, index_alpha)
[docs] def plot_lcurve(self):
""" Plots L-curve.
"""
plt.figure()
plt.plot(self.lcurve[:, 0], self.lcurve[:, 1], 'o-', markersize=2)
plt.xlabel('res. norm (||Dx-b||)')
plt.ylabel('smooth norm (||Lx||)')
[docs] def plot_solutionvector(self, alpha=-1, index_alpha=-1):
""" Plots the sum of amplitudes over time constants.
Parameters
----------
alpha : float
Plot for the closest alpha as specified.
index_alpha : int
Plot for specified alpha at index.
"""
self.init_phelper()
self._phelper.plot_solutionvector(self, alpha, index_alpha)
[docs] def plot_results(self):
""" Plots interactive contourmaps of original and LDA data,
"""
self.init_phelper()
self._phelper.plot_ldaresults(self)
[docs] def save_to_files(self, path, alpha=-1, index_alpha=-1, comment=''):
""" Saving results to .dat files.
Parameters
----------
path : str
Path for saving.
alpha : float
Plot for the closest alpha as specified.
index_alpha : int
Plot for specified alpha at index.
comment : str
Personal comment.
"""
if os.path.exists(path) is False:
answer = input('Path not found. Create (y/n)? ')
if answer == 'y':
os.mkdir(path)
else:
return
to_save = ['alphas', 'data', 'lcurve', 'taus']
for k, i in vars(self).items():
if k in to_save:
fname = k + '.dat'
print('Writing ' + fname)
np.savetxt(
os.path.join(path, fname),
i,
delimiter=',',
fmt='%.4e'
)
fname = 'fitdata.dat'
print('Writing ' + fname)
alpha, index_alpha = self.get_alpha(alpha, index_alpha)
fitdata = self.fitdata[:, :, index_alpha]
np.savetxt(
os.path.join(path, fname),
fitdata,
delimiter=',',
fmt='%.4e'
)
fname = 'ldamap.dat'
print('Writing ' + fname)
x_k, _ = self.get_xk(alpha, index_alpha)
np.savetxt(
os.path.join(path, fname),
x_k,
delimiter=',',
fmt='%.4e'
)
if comment != '':
comment = '----------------------\n\nComment:\n' + comment
f = open(os.path.join(path, '00_comments.txt'), 'w')
print('Writing 00_comments.txt')
f.write('Created with trtoolbox\n' +
'----------------------\n\n' +
'Time constants limits: %.2e to %.2e\n'
% (self.taus[0, 0], self.taus[0, -1]) +
'Alpha value limits: %.2f to %.2f\n'
% (self.alphas[0], self.alphas[-1]) +
'Chosen alpha value: %.2f\n' % alpha +
'----------------------\n\n' +
'Files:\n' +
'\t- alphas.dat (Alpha values)\n' +
'\t- data.dat (Raw data)\n' +
'\t- fitdata.dat (Fitted data)\n' +
'\t- lcurve.dat (L-curve)\n' +
'\t- taus.dat (Used time constants)' +
comment
)
f.close()
[docs]def gen_taus(t1, t2, n):
""" Generates logarithmic spaced time constants.
Parameters
----------
t1 : float
Bottom limit.
t2 : float
Upper limit.
n : int
Number of time constants.
Returns
-------
taus : np.array
Generated time constants.
"""
if t1 == t2:
raise ValueError('Choose different limits!')
if t1 > t2:
t = t1
t1 = t2
t2 = t
if n <= 0:
print('Choose positive number. Take 100 now...')
n = 100
taus = np.logspace(np.log10(t1), np.log10(t2), n)
return taus
[docs]def gen_dmatrix(time, taus, seqmodel=False):
""" Generates D-matrix.
Parameters
----------
time : np.array
Time array.
taus : np.array
Time constants array
seqmodel : boolean
Sets a sequential model.
Returns
-------
dmatrix : np.array
D-matrix
"""
if seqmodel is True:
def model(s, time, ks):
arr = [-ks[0] * s[0], ks[0] * s[0] - ks[1] * s[1]]
return arr
dmatrix = np.zeros([time.size, taus.size])
for i in range(len(taus)):
if i > 0 and seqmodel is True:
# dmatrix[:, i] = \
# (-1*dmatrix[:, i-1] + np.exp(-time/taus[i])).reshape(-1)
ks = 1./taus[np.array([i-1, i])]
res = odeint(model, [1, 0], time.flatten(), (ks,))
dmatrix[:, i] = res[:, 1]
else:
dmatrix[:, i] = (np.exp(-time/taus[i])).reshape(-1)
return dmatrix
[docs]def gen_lmatrix(dmatrix):
""" Generates L-matrix
Parameters
----------
dmatrix : np.array
D-matrix
Returns
-------
lamtrix : np.array
L-matrix
"""
lmatrix = np.identity(np.shape(dmatrix)[1])
b = np.ones(np.shape(dmatrix)[1])
np.fill_diagonal(lmatrix[:, 1:], -b)
return lmatrix
[docs]def gen_alphas(a1, a2, n, space='log'):
""" Generates logarihmic spaced alpha values.
Adds [1e-5, 1e-4, 1e-3, 1e-2] and [10, 40, 70, 100]
for a better visualization of the L-curve.
Parameters
----------
a1 : float
Bottom limit.
a2 : float
Upper limit.
n : int
Number of alpha values.
space : str
*log* for logarithmic and *lin* for linear spaced
Returns
-------
alphas : np.array
Generated alpha values.
"""
if a1 == a2:
raise ValueError('Choose different limits!')
if a1 > a2:
a = a1
a1 = a2
a2 = a
if n <= 0:
print('Choose positive number. Take 100 now...')
n = 100
if space == 'log':
alphas = np.logspace(np.log10(a1), np.log10(a2), n)
elif space == 'lin':
alphas = np.linspace(a1, a2, n)
# code snippet to append alpha values for a better lcurce representation
# if a1 > 1e-2:
# alphas = np.insert(alphas, 0, [1e-5, 1e-4, 1e-3, 1e-2])
# if a2 < 10:
# alphas = np.append(alphas, [10, 40, 50, 100])
return alphas
[docs]def inversesvd(dmatrix, k=-1):
""" Returns the inverse of matrix computed via SVD.
Parameters
----------
dmatrix : np.array
Matrix to be inversed
k : int
Point of truncation. If *-1* then all singular values are used.
Returns
-------
v : np.array
Inverse of input matrix.
"""
# noinspection PyTupleAssignmentBalance
u, s, vt = svd(dmatrix, full_matrices=False)
if k == -1:
k = len(s)
s = 1/s
sigma = np.array([s[i] if i < k else 0 for i in range(len(s))])
sigma = np.diag(sigma)
ut = np.transpose(u)
v = np.transpose(vt)
return v.dot(sigma).dot(ut)
[docs]def tik(data, dmatrix, alpha):
""" Function for Tikhonov regularization:
min_x ||Dx - A|| + alpha*||Lx||
D-matrix contains exponential profiles,
x are prefactors/amplitudes,
A is the dataset,
alpha is the regularization factor and
L is the identity matrix.
Details can be found in
Dorlhiac, Gabriel F. et al.
"PyLDM-An open source package for lifetime density analysis
of time-resolved spectroscopic data."
PLoS computational biology 13.5 (2017)
Parameters
----------
data : np.array
Data matrix to be analyzed
dmatrix : np.array
D-matrix
alpha : float
Regularization factor
Returns
-------
x_k : np.array
Expontential prefactors/amplitudes.
"""
lmatrix = gen_lmatrix(dmatrix)
# constructing augmented D- and A-matrices.
# d_aug = (D, sqrt(alpha)*L)
# a_aug = (A, zeros)
if alpha != 0:
d_aug = np.concatenate((dmatrix, alpha ** 0.5 * lmatrix))
a_aug = np.concatenate(
(data, np.zeros([np.shape(data)[0], len(lmatrix)])),
axis=1)
else:
d_aug = dmatrix
a_aug = data
d_tilde = inversesvd(d_aug)
x_k = d_tilde.dot(np.transpose(a_aug))
return x_k
[docs]def tiks(data, dmatrix, alphas):
""" Wrapper for computing LDA for various alpha values.
Parallelization makes execution actually slower. I suspect that the svd numpy method already optimizes
CPU usage.
Parameters
----------
data : np.array
Data matrix to be analyzed.
dmatrix : np.array
D-matrix.
alphas : np.array
Array of regularization factors.
Returns
-------
x_k : np.array
3D matrix of expontential prefactors/amplitudes.
"""
x_ks = np.empty([np.shape(dmatrix)[1], np.shape(data)[0], len(alphas)])
for i, alpha in enumerate(alphas):
x_k = tik(data, dmatrix, alpha)
x_ks[:, :, i] = x_k
return x_ks
[docs]def calc_lcurve(data, dmatrix, lmatrix, x_ks):
""" Calculates L-curve.
Parameters
----------
data : np.array
Data matrix.
dmatrix : np.array
D-matrix.
lmatrix : np.array
L-matrix.
x_ks : np.array
LDA maps.
Returns
-------
lcurve : np.array
First column is resdiual norm.
Second column is smoothed norm.
"""
lcurve = np.empty((np.shape(x_ks)[2], 2))
for i in range(np.shape(x_ks)[2]):
lcurve[i, 0] = np.sum(
(dmatrix.dot(x_ks[:, :, i])-np.transpose(data))**2) ** 0.5
lcurve[i, 1] = np.sum((lmatrix.dot(x_ks[:, :, i]))**2) ** 0.5
return lcurve
[docs]def tik_lstsq(data, dmatrix, alpha):
""" Different implementation of the tik function.
Uses the ordinary lstsq solver of numpy.
Parameters
----------
data : np.array
Data matrix to be analyzed.
dmatrix : np.array
D-matrix.
alpha : float
Regularization factor.
Returns
-------
res : np.array
Expontential prefactors/amplitudes.
"""
lmatrix = gen_lmatrix(dmatrix)
if alpha != 0:
d_aug = np.concatenate((dmatrix, alpha ** 2 * lmatrix))
a_aug = np.concatenate(
(data, np.zeros([np.shape(data)[0], len(lmatrix)])),
axis=1)
else:
d_aug = dmatrix
a_aug = data
res = np.linalg.lstsq(d_aug, np.transpose(a_aug), rcond=None)
return res[0]
[docs]def tsvd(data, dmatrix, k):
""" Truncated SVD for LDA. Similar to Tikhonov regularization
but here we have a clear cut-off after a specified singular value.
Details can be found in
Hansen PC.
The truncated SVD as a method for regularization.
Bit. 1987
Parameters
----------
data : np.array
Data matrix to be analyzed.
dmatrix : np.array
D-matrix.
k : int
Cut-off for singular values.
Returns
-------
x_k : np.array
Expontential prefactors/amplitudes.
"""
d_tilde = inversesvd(dmatrix, k)
x_k = d_tilde.dot(np.transpose(data))
return x_k
[docs]def dolda(
data,
time,
wn,
tlimits=[],
tnum=100,
alimits=[0.1, 5],
anum=100,
method='tik',
seqmodel=False,
k=5,
prompt=False):
""" Wrapper for doing a LDA.
Parameters
----------
data : np.array
Data matrix subjected to SVD. Assuming *m x n* with m as frequency
and n as time. But it is actually not important.
time : np.array
Time array.
wn : np.array
Frequency array.
tlimits : list
Limits for time constants.
tnum : int
Number of time constants.
alimits : list
Limits for alpha values.
anum : int
Number if alpha values.
method : str
Chosen method for LDA. Either 'tik' or 'tsvd'.
seqmodel : boolean
True for constructing the D-matrix assuming a sequential model.
k : int
Just used for 'tsvd'. Specifies the position of truncation.
prompt : boolean
True for user prompts.
Returns
-------
res : *lda.results*
Results object.
"""
data, time, wn = pclasses.check_input(data, time, wn)
if prompt is False:
if not tlimits:
tlimits = [time[0, 0], time[0, -1]]
elif prompt is True:
method = input('Which method (tik or tsvd)? ')
if method == 'tik':
t1 = float(input('Bottom limit for time constants: '))
t2 = float(input('Upper limit for time constants: '))
tlimits = [t1, t2]
tnum = int(input('Number of time constants: '))
a1 = float(input('Bottom limit for alpha values: '))
a2 = float(input('Upper limit for alpha values: '))
alimits = [a1, a2]
anum = int(input('Number of alpha values: '))
elif method == 'tsvd':
k = int(input('How many singular values? '))
taus = gen_taus(tlimits[0], tlimits[1], tnum)
dmatrix = gen_dmatrix(time, taus, seqmodel=seqmodel)
lmatrix = gen_lmatrix(dmatrix)
res = Results()
res.data = data
res.time = time
res.wn = wn
res.taus = taus.reshape((1, taus.size))
res.dmatrix = dmatrix
res.method = method
if method == 'tik':
res.alphas = gen_alphas(alimits[0], alimits[1], anum)
x_k = tiks(data, dmatrix, res.alphas)
res.lmatrix = gen_lmatrix(dmatrix)
res.lcurve = calc_lcurve(data, dmatrix, res.lmatrix, x_k)
fitdata = np.empty(data.shape + (np.shape(x_k)[2], ))
for i in range(np.shape(x_k)[2]):
fitdata[:, :, i] = np.transpose(dmatrix.dot(x_k[:, :, i]))
res.fitdata = fitdata
res.lcurve = calc_lcurve(data, dmatrix, lmatrix, x_k)
elif method == 'tsvd':
res.k = k
x_k = tsvd(data, dmatrix, k)
res.fitdata = np.transpose(dmatrix.dot(x_k))
res.x_k = np.swapaxes(x_k, 0, 1)
return res
#############################################
# Functions for lcurve curverature
# (currently not working!)
#############################################
# def calc_k(lcurve, alphas):
# a = alphas[4:-4]
# x = medfilt(np.log10(lcurve[3:-3,0]))
# y = medfilt(np.log10(lcurve[3:-3,1]))
# x = x[1:-1]
# y = y[1:-1]
# x_new = np.linspace(x[1], x[-1], 1000)
# #da = np.gradient(a)
# da = np.arange(x_new.size)
# f = interp1d(x, y, kind='cubic')
# plt.figure(); plt.plot(x_new, f(x_new))
# dx = np.gradient(x_new)
# dy = np.gradient(f(x_new))
# dx2 = np.gradient(dx)
# dy2 = np.gradient(dy)
# k = 2*(dx*dy2 - dx2*dy) / (dx**2 + dy**2)**(1.5)
# return k
# def calc_k_angle(lcurve, alphas):
# #x = medfilt(np.log10(lcurve[:,0]), 7)
# x = np.log(lcurve[:,0])
# y = np.log(lcurve[:,1])
# f = interp1d(x[0::10], y[0::10], kind='cubic')
# npoints = 100
# x_espaced = np.linspace(x[0], x[-10], npoints)
# xy = np.transpose([x_espaced, f(x_espaced)])
# plt.figure(); plt.plot(xy[:,0], xy[:,1])
# angle = np.zeros([npoints, 1])
# diff = np.zeros([npoints, 1])
# max_diff = 0
# knee = 0
# for i in range(1,npoints-1):
# v = xy[i,:]-xy[i-1,:]
# w = xy[i+1,:]-xy[i,:]
# angle[i] = np.arccos(v.dot(w)/(np.linalg.norm(v)*np.linalg.norm(w)))
# a = angle[i-1]
# a1 = angle[i-2]
# d1 = a1 - a
# a2 = angle[i]
# d2 = a2 - a
# diff[i] = d1 + d2
# if diff[i] > max_diff:
# max_diff = diff[i]
# knee = lcurve[i-1,:]
# return angle, diff, knee