from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import datetime as dtm
from scipy.optimize import curve_fit
from .isotope import Isotope
from .spectroscopy import Spectrum
from .plotter import colors
from .plotter import _init_plot
from .plotter import _close_plot
#### FUTURE: Implement Irradiation history class
[docs]class DecayChain(object):
"""Calculating activities within a decay chain using Bateman equations
...
Parameters
----------
x : type
Description of parameter `x`.
Attributes
----------
Methods
-------
Notes
-----
References
----------
Examples
--------
"""
def __init__(self, parent, units='s', R=None, A0=None, time=None):
self.units = units
istps = [Isotope(parent)]
self.isotopes = [istps[0].name]
self.chain = [[istps[0].decay_const(units), [], []]]
stable_chain = [False]
while not all(stable_chain):
for n in [n for n,ch in enumerate(stable_chain) if not ch]:
stable_chain[n] = True
for prod,br in istps[n].decay_products():
I = Isotope(prod)
if I.name in self.isotopes:
self.chain[self.isotopes.index(I.name)][1].append(br)
self.chain[self.isotopes.index(I.name)][2].append(n)
else:
istps.append(I)
self.isotopes.append(I.name)
self.chain.append([I.decay_const(units), [br], [n]])
stable_chain.append(self.chain[-1][0]<1E-12)
self.chain = np.array(self.chain, dtype=object)
self.set_R(R)
self.A0 = np.zeros(len(self.chain))
self.update_A0(A0)
self.time = time
self._counts = [[] for i in self.chain]
self._others = []
self._prev = []
self._R_fit = None
self._A0_fit = None
self._EoB = None
def filter_name(self, istp):
if istp[-1] in 'g0123456789':
return istp
return istp+('1' if istp.endswith('m') else 'g')
def index(self, istp):
return self.isotopes.index(self.filter_name(istp))
def _get_branches(self, istp):
if self.filter_name(istp) not in self.isotopes:
return [], []
m = self.index(istp)
BR = [[0.0]]+[[r] for r in self.chain[m,1]]
CH = [[m]]+[[n] for n in self.chain[m,2]]
while not all([c[-1]==0 for c in CH]):
BR = [BR[n]+[i] for n,c in enumerate(CH) for i in self.chain[c[-1],1]]
CH = [c+[i] for c in CH for i in self.chain[c[-1],2]]
BR = [np.array(r)[::-1] for n,r in enumerate(BR)]
CH = [np.array(c)[::-1] for n,c in enumerate(CH)]
return BR, CH
[docs] def activity(self, istp, t=None, units=None, _R=None, _A0=None):
"""Description
...
Parameters
----------
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
t = t if t is not None else self.time
if t is None:
raise ValueError('Time must be specified.')
t = np.asarray(t)
A = np.zeros(len(t)) if t.shape else 0.0
R_lm = 1.0
if units is not None:
conv = {'ns':1e-9,'us':1e-6,'ms':1e-3,'s':1.0,'m':60.0,'h':3600.0,'d':86400.0,'y':31557.6E3,'ky':31557.6E6}
R_lm = conv[units]/conv[self.units]
for m,(BR, chain) in enumerate(zip(*self._get_branches(istp))):
lm = R_lm*self.chain[chain,0]
A0 = self.A0[chain] if _A0 is None else _A0[chain]
R = self.R[chain] if _R is None else _R[chain]
n = len(chain)
for i in range(n):
if i==n-1 and m>0:
continue
A_i = lm[-1]*(A0[i]/lm[i])
B_i = np.prod(lm[i:-1]*BR[i:-1])
for j in range(i, n):
K = np.arange(i, n)
C_j = np.prod(lm[K[K!=j]]-lm[j])
if np.any((lm[K[K!=j]]-lm[j])==0):
C_j = np.where((lm[K[K!=j]]-lm[j])!=0,lm[K[K!=j]]-lm[j],1E-12)
C_j = np.prod(C_j)
A += A_i*B_i*np.exp(-lm[j]*t)/C_j
if lm[j]>1E-12:
A += R[i]*lm[-1]*B_i*(1.0-np.exp(-lm[j]*t))/(lm[j]*C_j)
else:
A += R[i]*lm[-1]*B_i*t/C_j
for ot in self._others:
if self.filter_name(istp) in ot.isotopes:
A += ot.activity(istp, t, units=(self.units if units is None else units))
return A
[docs] def decays(self, istp, t_start, t_stop, units=None, _A0=None):
"""Description
...
Parameters
----------
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
if np.any(self.R > 0):
print('WARNING: decays during production not implemented.')
t_start, t_stop = np.asarray(t_start), np.asarray(t_stop)
D = np.zeros(len(t_start)) if t_start.shape else (np.zeros(len(t_stop)) if t_stop.shape else 0.0)
R_lm = 1.0
conv = {'ns':1e-9,'us':1e-6,'ms':1e-3,'s':1.0,'m':60.0,'h':3600.0,'d':86400.0,'y':31557600.0}
if units is not None:
R_lm = conv[units]/conv[self.units]
for m,(BR, chain) in enumerate(zip(*self._get_branches(istp))):
lm = R_lm*self.chain[chain,0]
A0 = self.A0[chain] if _A0 is None else _A0[chain]
n = len(chain)
for i in range(n):
if i==n-1 and m>0:
continue
A_i = lm[-1]*(A0[i]/lm[i])
B_i = np.prod(lm[i:-1]*BR[i:-1])
for j in range(i, len(chain)):
K = np.arange(i, len(chain))
C_j = np.prod(lm[K[K!=j]]-lm[j])
if np.any((lm[K[K!=j]]-lm[j])==0):
C_j = np.where((lm[K[K!=j]]-lm[j])!=0,lm[K[K!=j]]-lm[j],1E-12)
C_j = np.prod(C_j)
if lm[j]>1E-12:
D += A_i*B_i*(np.exp(-lm[j]*t_start)-np.exp(-lm[j]*t_stop))/(lm[j]*C_j)
else:
D += A_i*B_i*(t_stop-t_start)/C_j
D = D*conv[(self.units if units is None else units)]
for ot in self._others:
if self.filter_name(istp) in ot.isotopes:
D += ot.decays(istp, t_start, t_stop, units=(self.units if units is None else units))
return D
def set_R(self, R):
self.R = np.zeros(len(self.isotopes))
if R is not None:
if type(R)==dict:
for istp in R:
if self.filter_name(istp) in self.isotopes:
self.R[self.index(istp)] += R[istp]
else:
self.R[0] += float(R)
def update_A0(self, A0):
if A0 is not None:
if type(A0)==dict:
for istp in A0:
if self.filter_name(istp) in self.isotopes:
self.A0[self.index(istp)] += A0[istp]
else:
self.A0[0] += float(A0)
@property
def EoB(self):
return self._EoB
@EoB.setter
def EoB(self, eob):
if type(eob)==str:
eob = dtm.datetime.strptime(eob, '%m/%d/%Y %H:%M:%S')
self._EoB = eob
@property
def counts(self):
return self._counts
@counts.setter
def counts(self, N_c):
if N_c is not None:
if type(N_c)!=dict:
N_c = {self.isotopes[0]:np.array(N_c)}
for istp in N_c:
if self.filter_name(istp) in self.isotopes:
x = np.array(N_c[istp])
i = self.index(istp)
if x.shape:
if len(x.shape)==1:
x = np.array([x])
if len(self._counts[i]):
self._counts[i] = np.append(self._counts[i], x, axis=0)
else:
self._counts[i] = x
@property
def time(self):
return self._time
@time.setter
def time(self, time):
self._time = None
if time is not None:
t = np.asarray(time)
if t.shape:
self._time = t
else:
self._time = np.linspace(0.0, float(t), 1000)
@property
def A_meas(self):
A_meas = []
for n,ct in enumerate(self.counts):
itp = self.isotopes[n]
if len(ct):
meas = np.array([self.calc_M(itp, itp, c[0], c[1])*c[2] for c in ct])
if len(ct[0])==4:
A_meas.append(np.column_stack((ct[:,0], meas, ct[:,3]*meas/ct[:,2])))
else:
A_meas.append(np.column_stack((ct[:,0], meas)))
else:
A_meas.append([])
return A_meas
[docs] def calc_L(self, i, t_m):
"""Description
...
Parameters
----------
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
a_0 = self.activity(i, 0.0)
a_m = self.activity(i, t_m)
if a_m>0:
return a_0/a_m
return 0.0
[docs] def calc_M(self, i, j, t_start, t_stop):
"""Description
...
Parameters
----------
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
a_n = self.activity(i, t_start)
d_m = self.decays(j, t_start, t_stop)
if d_m>0:
return a_n/d_m
return 0.0
[docs] def calc_P(self, i):
"""Description
...
Parameters
----------
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
a_0 = self.activity(i, 0.0)
R_norm = self.R_norm[self.index(i)]
if a_0>0:
return R_norm/a_0
return 0.0
[docs] def calc_Q(self, i, t_m):
"""Description
...
Parameters
----------
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
a_m = self.activity(i, t_m)
R_norm = self.R_norm[self.index(i)]
if a_m>0:
return R_norm/a_m
return 0.0
[docs] def fit_spectra(self, spectra, db=None, max_unc=0.15, EoB=None):
"""Description
...
Parameters
----------
x : type
Description of parameter `x`.
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
if EoB is not None:
self.EoB = EoB
if self.EoB is None:
print('WARNING: EoB must be set.')
return
counts = {}
conv = {'ns':1e-9,'us':1e-6,'ms':1e-3,'s':1.0,'m':60.0,'h':3600.0,'d':86400.0,'y':31557600.0}
for sp in spectra:
if type(sp)==str:
sp = Spectrum(sp, db)
start_time = (sp.meta['start_time']-self.EoB).total_seconds()/conv[self.units]
end_time = start_time+(sp.meta['real_time']/conv[self.units])
if not sp.meta['istp']:
sp.meta['istp'] = self.isotopes
for n,p in sp.peaks.iterrows():
if not (p['converged'] and p['unc_decays']<max_unc*p['decays']):
continue
if self.filter_name(p['isotope']) not in self.isotopes:
continue
if p['isotope'] in counts:
counts[p['isotope']].append([start_time, end_time, p['decays'], p['unc_decays']])
else:
counts[p['isotope']] = [[start_time, end_time, p['decays'], p['unc_decays']]]
self.counts = counts
@property
def R_norm(self):
R_p,t = [p.R for p in self._prev if p.R.any()],[p.time[-1] for p in self._prev if p.R.any()]
return np.dot(np.array(R_p).T,np.array(t))/np.sum(t)
[docs] def fit_R(self, istp=None, _update=True, unc=False):
"""Description
...
Parameters
----------
x : type
Description of parameter `x`.
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
if self._R_fit is not None:
if unc:
if istp is None:
return self._R_fit, self._unc_R_fit
return self._R_fit[self.index(istp)], self._unc_R_fit[self.index(istp)]
if istp is None:
return self._R_fit
return self._R_fit[self.index(istp)]
R_norm = self.R_norm
nz = np.where(R_norm>0)[0]
Y = []
dY = []
X = []
time = []
itp = []
for n,ct in enumerate(self.counts):
if len(ct):
Y += ct[:,2].tolist()
if len(ct[0])==4:
dY += ct[:,3].tolist()
X += [np.zeros(len(nz)) for i in ct]
time += ct[:,:2].tolist()
itp += (n*np.ones(len(ct),dtype=int)).tolist()
Y = np.array(Y)
dY = np.array(dY) if len(dY) else np.ones(len(Y))
X = np.asarray(X)
I = np.arange(len(self.isotopes))
for m,z in enumerate(nz):
A0 = np.copy(self._prev[0].A0) if len(self._prev) else np.zeros(len(self.isotopes))
for p in self._prev:
R_p = p.R/np.where(R_norm>0, R_norm, 1.0)
R_p[I!=z] = 0.0
A_old = np.copy(A0)
for n in I:
A0[n] = self.activity(self.isotopes[n], p.time[-1], _R=R_p, _A0=A_old)
X[:,m] = np.array([self.decays(self.isotopes[itp[n]], time[n][0], time[n][1], _A0=A0) for n in range(len(X))])
func = lambda X_f, *R_f: np.dot(X_f, np.asarray(R_f).T)
fit, cov = curve_fit(func, X, Y, sigma=dY, p0=R_norm[nz], bounds=([0.0 for i in nz],[np.inf for i in nz]))
self._R_fit = np.zeros(len(R_norm))
self._R_fit[nz] = fit
self._unc_R_fit = np.zeros(len(R_norm))
self._unc_R_fit[nz] = (np.sqrt(np.diag(cov)) if np.all(np.isfinite(cov)) else np.diag(cov))
if _update:
A0 = np.copy(self._prev[0].A0) if len(self._prev) else np.zeros(len(self.isotopes))
for p in self._prev:
p.A0 = np.copy(A0)
p.R = p.R*self._R_fit/np.where(R_norm>0, R_norm, 1.0)
for n in I:
A0[n] = p.activity(self.isotopes[n], p.time[-1])
self.A0 = np.copy(A0)
if unc:
if istp is None:
return self._R_fit, self._unc_R_fit
return self._R_fit[self.index(istp)], self._unc_R_fit[self.index(istp)]
if istp is None:
return self._R_fit
return self._R_fit[self.index(istp)]
[docs] def fit_A0(self, istp=None, _update=True, unc=False):
"""Description
...
Parameters
----------
x : type
Description of parameter `x`.
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
if self._A0_fit is not None:
if unc:
if istp is None:
return self._A0_fit, self._unc_A0_fit
return self._A0_fit[self.index(istp)], self._unc_A0_fit[self.index(istp)]
if istp is None:
return self._A0_fit
return self._A0_fit[self.index(istp)]
if len(self._prev):
print('WARNING: Previous activities and production rates will not be fit.')
nz = np.where(self.A0>0)[0]
Y = []
dY = []
X = []
time = []
itp = []
for n,ct in enumerate(self.counts):
if len(ct):
Y += ct[:,2].tolist()
if len(ct[0])==4:
dY += ct[:,3].tolist()
X += [np.zeros(len(nz)) for i in ct]
time += ct[:,:2].tolist()
itp += (n*np.ones(len(ct),dtype=int)).tolist()
Y = np.array(Y)
dY = np.array(dY) if len(dY) else np.ones(len(Y))
X = np.asarray(X)
I = np.arange(len(self.isotopes))
for z in nz:
A0 = np.where(I!=z, np.zeros(len(self.isotopes)), 1.0)
X[:,z] = np.array([self.decays(self.isotopes[itp[n]], time[n][0], time[n][1], _A0=A0) for n in range(len(X))])
func = lambda X_f, *R_f: np.dot(X_f, np.asarray(R_f).T)
fit, cov = curve_fit(func, X, Y, sigma=dY, p0=self.A0[nz], bounds=([0.0 for i in nz],[np.inf for i in nz]))
self._A0_fit = np.copy(self.A0)
self._A0_fit[self.A0>0] = fit
self._unc_A0_fit = np.zeros(len(self.A0))
self._unc_A0_fit[self.A0>0] = (np.sqrt(np.diag(cov)) if np.all(np.isfinite(cov)) else np.diag(cov))
if _update:
self.A0 = np.copy(self._A0_fit)
if unc:
if istp is None:
return self._A0_fit, self._unc_A0_fit
return self._A0_fit[self.index(istp)], self._unc_A0_fit[self.index(istp)]
if istp is None:
return self._A0_fit
return self._A0_fit[self.index(istp)]
def __add__(self, other):
self._others.append(other)
return self
[docs] def append(self, other, time=None):
"""Description
...
Parameters
----------
x : type
Description of parameter `x`.
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
if time is not None:
self.time = time
if self.time is None:
raise ValueError('Decay Chain time is not set.')
if self.isotopes[0]!=other.isotopes[0]:
raise ValueError('Decay chains must have same parent.')
if self.units!=other.units:
raise ValueError('Units must be the same.')
tmp = [self.R, self.A0, self.time, self._others, self.counts]
new_A0 = {i:self.activity(i, self.time[-1]) for n,i in enumerate(self.isotopes) if self.chain[n,0]>1E-12}
self.R = other.R
self.A0 = other.A0
self.update_A0(new_A0)
self.time = other.time
self._others = other._others
self._counts = other.counts
other.R = tmp[0]
other.A0 = tmp[1]
other.time = tmp[2]
other._others = tmp[3]
other._counts = tmp[4]
self._prev += other._prev
self._prev.append(other)
other._prev = []
[docs] def plot(self, time=None, N_plot=None, N_label=10, **kwargs):
"""Description
...
Parameters
----------
x : type
Description of parameter `x`.
Returns
-------
Notes
-----
References
----------
Examples
--------
"""
f, ax = _init_plot(**kwargs)
if time is not None:
self.time = time
if self.time is not None:
t = self.time
else:
t = np.linspace(0, 5.0*np.log(2)/self.chain[0,0], 1000)
if N_plot is None:
N_plot = len(self.isotopes)
A_meas = self.A_meas
ordr = int(np.floor(np.log10(np.average(self.activity(self.isotopes[0], t)))/3.0))
lb_or = {-4:'p',-3:'n',-2:r'$\mu$',-1:'m',0:'',1:'k',2:'M',3:'G',4:'T'}[ordr]
mult = 10.0**(-3*ordr)
for n,istp in enumerate(self.isotopes):
if self.chain[n,0]>1E-12 and n<N_plot:
dt = 0.0
tm, A = np.array([]),np.array([])
for p in self._prev:
dt += p.time[-1]
t_p, A_p = p.time, p.activity(istp)
tm, A = np.append(tm, t_p+(tm[-1] if len(tm) else 0.0)), np.append(A, A_p)
tm, A = np.append(tm-dt, t), np.append(A, self.activity(istp, t))
label = Isotope(istp).TeX if n<N_label else None
line, = ax.plot(tm, mult*A, label=label)
if len(A_meas[n]):
am = A_meas[n]
if len(am[0])==3:
ax.errorbar(am[:,0], mult*am[:,1], yerr=mult*am[:,2], ls='None', marker='o', color=line.get_color())
else:
ax.plot(am[:,0], mult*am[:,1], ls='None', marker='o', color=line.get_color())
ax.set_xlabel('Time ({})'.format(self.units))
ax.set_ylabel('Activity ({}Bq)'.format(lb_or))
ax.legend(loc=0)
return _close_plot(f, ax, default_log=False, **kwargs)