Source code for curie.decay_chain

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import pandas as pd
import datetime as dtm
pd.options.mode.chained_assignment = None

from scipy.optimize import curve_fit

from .data import _get_connection
from .plotting import _init_plot, _draw_plot, colormap
from .isotope import Isotope
from .spectrum import Spectrum

[docs] class DecayChain(object): """Radioactive Decay Chain Uses the Bateman equations to calculate the activities and number of decays from a radioactive decay chain as a function of time, both in production and decay. Also, initial isotope activities and production rates can be fit to observed count data, or directly fit to HPGe spectra using the `get_counts()` function. Parameters ---------- parent_isotope : str Parent isotope in the chain. R : array_like, dict, str or pd.DataFrame Production rate for each isotope in the decay chain as a function of time. If a Nx2 np.ndarray, element n gives the production rate R_n up until time t_n for the parent isotope. E.g. If the production rate of the parent is 5 for 1 hour, and 8 for 3 hours, the array will be [[5, 1], [8, 4]]. If instead time intervals are preferred to a monotonically increasing grid of timestamps, set 'timestamp=False'. In this case the production rate array will be [[5, 1], [8, 3]]. (R=5 for 1 hour, R=8 for 3 hours). If R is a dict, it specifies the production rate for multiple isotopes, where the keys are the isotopes and the values are type np.ndarray. If R is a pd.DataFrame, it must have columns 'R' and 'time', and optionally 'isotope' if R>0 for any isotopes other than the parent. If R is a str, it must be a path to a file where the same data is provided. Supported file types are .csv, .json and .db files, where .json files must be in the 'records' format, and .db files must have a table named 'R'. Also, each isotope must have the same time grid, for both timestamp=True and timestamp=False. A0 : float or dict Initial activity. If a float, the initial activity of the parent isotope. If a dict, the keys are the isotopes for which the values represent the initial activity. units : str, optional Units of time for the chain. Options are 'ns', 'us', 'ms', 's', 'm', 'h', 'd', 'y', 'ky', 'My', 'Gy'. Default is 's'. timestamp : bool, optional Determines if the 'time' variable in R is to be read as a timestamp-like grid, i.e. in monotonically increasing order, or as a series of time intervals. Default is `True`. Attributes ---------- R : pd.DataFrame Production rate as a function of time, for each isotope in the chain. This will be modified if `fit_R()` is called. A0 : dict Initial activity of each isotope in the chain. isotopes : list List of isotopes in the decay chain. counts : pd.DataFrame Observed counts from isotopes in the decay chain, which can be used to determine the initial activities or average production rates using the `fit_R()` or `fit_A0()` functions. R_avg : pd.DataFrame Time-averaged production rate for each isotope where R>0. This will be modified if `fit_R()` is called. Examples -------- >>> dc = ci.DecayChain('Ra-225', R=[[1.0, 1.0], [0.5, 1.5], [2.0, 6]], units='d') >>> print(dc.isotopes) ['225RAg', '225ACg', '221FRg', '217ATg', '213BIg', '217RNg', '209TLg', '213POg', '209PBg', '209BIg'] >>> print(dc.R_avg) R_avg isotope 0 1.708333 225RAg >>> dc = ci.DecayChain('152EU', A0=3.7E3, units='h') >>> print(ci.isotopes) ['152EUg', '152GDg', '152SMg'] """ def __init__(self, parent_isotope, R=None, A0=None, units='s', timestamp=True): if units.lower() in ['hr','min','yr','sec']: units = {'hr':'h','min':'m','yr':'y','sec':'s'}[units.lower()] self.units = units istps = [Isotope(parent_isotope)] self.isotopes = [istps[0].name] self.R, self.A0, self._counts = None, {self.isotopes[0]:0.0}, None self._chain = [[istps[0].decay_const(units), [], []]] stable_chain = [False] while not all(stable_chain): for n in [n for n,ch in enumerate(stable_chain) if not ch]: stable_chain[n] = True for prod in istps[n].decay_products: br = istps[n].decay_products[prod] I = Isotope(prod) if I.name in self.isotopes: self._chain[self.isotopes.index(I.name)][1].append(br) self._chain[self.isotopes.index(I.name)][2].append(n) else: istps.append(I) self.isotopes.append(I.name) self._chain.append([I.decay_const(units), [br], [n]]) stable_chain.append(self._chain[-1][0]<1E-12) if not stable_chain[-1]: self.A0[self.isotopes[-1]] = 0.0 self._chain = np.array(self._chain, dtype=object) self._branches = self._generate_branches() if A0 is not None: if type(A0)==float or type(A0)==int: self.A0[self.isotopes[0]] = float(A0) else: for i in A0: self.A0[self._filter_name(i)] = float(A0[i]) if R is not None: if type(R)==str: if R.endswith('.json'): self.R = pd.DataFrame(json.loads(open(R).read())) elif R.endswith('.csv'): self.R = pd.read_csv(R, header=0).fillna(method='ffill') elif R.endswith('.db'): self.R = pd.read_sql('SELECT * FROM R', _get_connection(R)) if 'isotope' not in self.R.columns.to_list(): self.R['isotope'] = self.isotopes[0] elif type(R)==pd.DataFrame: self.R = R.copy(deep=True) if 'isotope' not in self.R.columns.to_list(): self.R['isotope'] = self.isotopes[0] elif type(R)==dict: self.R = pd.DataFrame({'isotope':[], 'R':[], 'time':[]}) for ip in R: rate = np.array(R[ip]) rt = pd.DataFrame({'isotope':self._filter_name(ip),'R':rate[:,0],'time':rate[:,1]}) self.R = pd.concat([self.R, rt], ignore_index=True).reset_index(drop=True) elif type(R)==list or type(R)==np.ndarray: R = np.asarray(R) self.R = pd.DataFrame({'isotope':self.isotopes[0], 'R':R[:,0], 'time':R[:,1]}) self.R['isotope'] = [self._filter_name(i) for i in self.R['isotope']] if not timestamp: for ip in pd.unique(self.R['isotope']): self.R.loc[self.R['isotope']==ip,'time'] = np.cumsum(self.R.loc[self.R['isotope']==ip,'time']) time = np.insert(np.unique(self.R['time']), 0, [0.0]) for n,dt in enumerate(time[1:]-time[:-1]): _R_dict = {p:self.R[self.R['isotope']==p].iloc[n]['R'] for p in pd.unique(self.R['isotope'])} self.A0 = {p:self.activity(p, dt, _R_dict=_R_dict) for p in self.A0} def __str__(self): return self.isotopes[0] def _filter_name(self, istp): return Isotope(istp).name def _index(self, istp): return self.isotopes.index(self._filter_name(istp)) def _generate_branches(self): daughters = {i:[] for i in range(len(self._chain))} for i,pars in enumerate(self._chain[1:,2]): for p in pars: daughters[p].append(i+1) branches = [[0]] stable_chain = [len(daughters[br[-1]])==0 for br in branches] while not all(stable_chain): for par in list(set([b[-1] for b in branches])): ds = daughters[par] if len(ds)==0: continue if len(ds)>1: to_dup = [br for br in branches if br[-1]==par] for m in range(len(ds)-1): for b in to_dup: branches.append(b+[ds[m+1]]) for br in branches: if br[-1]==par: br.append(ds[0]) else: for br in branches: if br[-1]==par: br.append(ds[0]) stable_chain = [len(daughters[br[-1]])==0 for br in branches] br_ratios = [] for br in branches: r = [] for n,i in enumerate(br[:-1]): r.append(self._chain[br[n+1]][1][self._chain[br[n+1]][2].index(i)]) br_ratios.append(r+[0.0]) return branches, br_ratios def _get_branches(self, istp): if self._filter_name(istp) not in self.isotopes: return [], [] m = self._index(istp) branches, br_ratios = [], [] for n,br in enumerate(self._branches[0]): if m in br: k = br.index(m) new_br = np.array(br[:k+1]) if not any([np.array_equal(b, new_br) for b in branches]): branches.append(new_br) br_ratios.append(np.array(self._branches[1][n][:k] + [0.0])) return br_ratios, branches def _r_lm(self, units=None, r_half_conv=False): if units is None: return 1.0 if units.lower() in ['hr','min','yr','sec']: units = {'hr':'h','min':'m','yr':'y','sec':'s'}[units.lower()] half_conv = {'ns':1E-9, 'us':1E-6, 'ms':1E-3, 's':1.0, 'm':60.0, 'h':3600.0, 'd':86400.0, 'y':31557.6E3, 'ky':31557.6E6, 'My':31557.6E9, 'Gy':31557.6E12} if r_half_conv: return half_conv[units] return half_conv[units]/half_conv[self.units]
[docs] def activity(self, isotope, time, units=None, _R_dict=None, _A_dict=None): """Activity of an isotope in the chain Computes the activity of a given isotope in the decay chain at a given time. Units of activity are in Bq. Units of time must be either the units for the DecayChain (default 's'), or specified by the `units` keyword. Parameters ---------- isotope : str Isotope for which the activity is calculated. time : array_like Time to calculate the activity. Units of time must be the same as the decay chain, or be given by `units`. Note that if R!=0, time=0 is defined as the end of production time. Else, if A0!=0, time=0 is defined as the time at which the specified activities equaled A0. t<0 is not allowed. units : str, optional Units of time, if different from the units of the decay chain. Returns ------- activity : np.ndarray Activity of the given isotope in Bq. Examples -------- >>> dc = ci.DecayChain('152EU', A0=3.7E3, units='h') >>> print(dc.activity('152EU', time=0)) 3700.0 >>> print(dc.activity('152EU', time=13.537, units='y')) 1849.999906346199 """ time = np.asarray(time) A = np.zeros(len(time)) if time.shape else np.array(0.0) finished = [] for m,(BR, chain) in enumerate(zip(*self._get_branches(isotope))): lm = self._r_lm(units)*self._chain[chain, 0] L = len(chain) for i in range(L): sub = ''.join(map(str, chain[i:])) if sub in finished: continue finished.append(sub) # if i==L-1 and m>0: # only add A0 of end isotope once # continue ip = self.isotopes[chain[i]] A0 = self.A0[ip] if _A_dict is None else _A_dict[ip] if A0==0.0 and _R_dict is None: continue A_i = lm[-1]*(A0/lm[i]) B_i = np.prod(lm[i:-1]*BR[i:-1]) for j in range(i, L): K = np.arange(i, L) d_lm = lm[K[K!=j]]-lm[j] C_j = np.prod(np.where(np.abs(d_lm)>1E-12, d_lm, 1E-12*np.sign(d_lm))) A += A_i*B_i*np.exp(-lm[j]*time)/C_j if _R_dict is not None: if ip in _R_dict: if lm[j]>1E-12: A += _R_dict[ip]*lm[-1]*B_i*(1.0-np.exp(-lm[j]*time))/(lm[j]*C_j) else: A += _R_dict[ip]*lm[-1]*B_i*time/C_j return A
[docs] def decays(self, isotope, t_start, t_stop, units=None, _A_dict=None): """Number of decays in a given time interval Computes the number of decays from a given isotope in the decay chain in the time interal t_start to t_stop. The units of t_start and t_stop must be either the same units as the decay chain, or be specified by the `units` keyword. Parameters ---------- isotope : str Isotope for which the number of decays is calculated. t_start : array_like Time of the start of the interval. t_stop : array_like Time of the end of the interval. units : str, optional Units of time, if different from the units of the decay chain. Returns ------- decays : np.ndarray Number of decays Examples -------- >>> dc = ci.DecayChain('152EU', A0=3.7E3, units='h') >>> print(dc.decays('152EU', t_start=1, t_stop=2)) 13319883.293399204 >>> print(dc.decays('152EU', t_start=50, t_stop=50.1, units='y')) 900151618.5228329 """ t_start, t_stop = np.asarray(t_start), np.asarray(t_stop) D = np.zeros(len(t_start)) if t_start.shape else (np.zeros(len(t_stop)) if t_stop.shape else np.array(0.0)) for m,(BR, chain) in enumerate(zip(*self._get_branches(isotope))): lm = self._r_lm(units)*self._chain[chain,0] L = len(chain) for i in range(L): if i==L-1 and m>0: continue ip = self.isotopes[chain[i]] A0 = self.A0[ip] if _A_dict is None else _A_dict[ip] A_i = lm[-1]*(A0/lm[i]) B_i = np.prod(lm[i:-1]*BR[i:-1]) for j in range(i, len(chain)): K = np.arange(i, len(chain)) d_lm = lm[K[K!=j]]-lm[j] C_j = np.prod(np.where(np.abs(d_lm)>1E-12, d_lm, 1E-12*np.sign(d_lm))) if lm[j]>1E-12: D += A_i*B_i*(np.exp(-lm[j]*t_start)-np.exp(-lm[j]*t_stop))/(lm[j]*C_j) else: D += A_i*B_i*(t_stop-t_start)/C_j return D*self._r_lm((self.units if units is None else units), True)
@property def counts(self): return self._counts @counts.setter def counts(self, N_c): if N_c is not None: if type(N_c)==pd.DataFrame: self._counts = N_c.copy(deep=True) self._counts['isotope'] = [self._filter_name(ip) for ip in self._counts['isotope']] elif type(N_c)!=dict: N_c = np.asarray(N_c) self._counts = pd.DataFrame({'isotope':self.isotopes[0], 'start':N_c[:,0], 'stop':N_c[:,1], 'counts':N_c[:,2], 'unc_counts':N_c[:,3]}) else: self._counts = pd.DataFrame({'isotope':[],'start':[],'stop':[],'counts':[],'unc_counts':[]}) for ip in N_c: ct = np.array(N_c[ip]) if len(ct.shape)==1: ct = np.array([ct]) ct = pd.DataFrame({'isotope':self._filter_name(ip), 'start':ct[:,0], 'stop':ct[:,1], 'counts':ct[:,2], 'unc_counts':ct[:,3]}) self._counts = pd.concat([self._counts, ct], ignore_index=True).reset_index(drop=True) self._counts['activity'] = [p['counts']*self.activity(p['isotope'], p['start'])/self.decays(p['isotope'], p['start'], p['stop']) for n,p in self._counts.iterrows()] self._counts['unc_activity'] = self._counts['unc_counts']*self.counts['activity']/self._counts['counts']
[docs] def get_counts(self, spectra, EoB, peak_data=None): """Retrieves the number of measured decays Takes the number of measured decays from one of the following: a list of spectra, a file with peak data, or a pandas DataFrame with peak data. Parameters ---------- spectra : list or str List of ci.Spectrum objects, or str of spectra filenames. If list of str, peak_data **must** be specified. In this case the filenames must be an exact match of the filenames in `peak_data`. If spectra is a str, it is assumed to be a regex match for the filenames in `peak_data`. EoB : str or datetime.datetime Date/time of end-of-bombardment (t=0). Must be a datetime object or a string in the format '%m/%d/%Y %H:%M:%S'. This is used to calculate the decay time for the count. peak_data : str or pd.DataFrame, optional Either a file path to a file that was created using `ci.Spectrum.saveas()` or a DataFrame with the same structure as `ci.Spectrum.peaks`. Examples -------- >>> sp = ci.Spectrum('eu_calib_7cm.Spe') >>> sp.isotopes = ['152EU'] >>> sp.saveas('test_spec.json') >>> dc = ci.DecayChain('152EU', A0=3.7E3, units='h') >>> dc.get_counts([sp], EoB='01/01/2016 08:39:08') >>> dc.get_counts(['eu_calib_7cm.Spe'], EoB='01/01/2016 08:39:08', peak_data='test_spec.json') >>> print(dc.counts) """ counts = [] if type(EoB)==str: EoB = dtm.datetime.strptime(EoB, '%m/%d/%Y %H:%M:%S') if peak_data is not None: if type(peak_data)==str: if peak_data.endswith('.json'): peak_data = pd.read_json(peak_data, orient='records') elif peak_data.endswith('.csv'): peak_data = pd.read_csv(peak_data, header=0) elif peak_data.endswith('.db'): peak_data = pd.read_sql('SELECT * FROM peaks', _get_connection(peak_data)) else: peak_data = pd.DataFrame(peak_data) if type(spectra)==str and peak_data is not None: df = peak_data['filename'] spectra = list(set(map(str, df[df.str.contains(spectra)].to_list()))) for sp in spectra: if type(sp)==str: if peak_data is not None: df = peak_data[peak_data['filename']==sp] df['isotope'] = [self._filter_name(i) for i in df['isotope']] df = df[df['isotope'].isin(self.isotopes)] if len(df): start_time = df.iloc[0]['start_time'] if type(start_time)==str or type(start_time)==unicode: start_time = dtm.datetime.strptime(start_time, '%m/%d/%Y %H:%M:%S') start = (start_time-EoB).total_seconds()*self._r_lm('s') stop = start+(df.iloc[0]['real_time']*self._r_lm('s')) else: raise ValueError('peak_data must be specified if type(spectra)==str') else: if peak_data is not None: df = peak_data[peak_data['filename']==sp.filename] else: df = sp.peaks.copy() df['isotope'] = [self._filter_name(i) for i in df['isotope']] df = df[df['isotope'].isin(self.isotopes)] if len(df): start = (sp.start_time-EoB).total_seconds()*self._r_lm('s') stop = start+(sp.real_time*self._r_lm('s')) if len(df): counts.append(pd.DataFrame({'isotope':df['isotope'], 'start':start, 'stop':stop, 'counts':df['decays'], 'unc_counts':df['unc_decays']})) self.counts = pd.concat(counts, sort=True, ignore_index=True).sort_values(by=['start']).reset_index(drop=True)
@property def R_avg(self): df = [] for ip in np.unique(self.R['isotope']): time = np.insert(np.unique(self.R['time']), 0, [0.0]) df.append({'isotope':ip, 'R_avg':np.average(self.R[self.R['isotope']==ip]['R'], weights=time[1:]-time[:-1])}) return pd.DataFrame(df)
[docs] def fit_R(self, max_error=0.4, min_counts=1): """Fit the production rate to count data Fits a scalar multiplier to the production rate (as a function of time) for each isotope specified in self.R. The fit minimizes to the number of measured decays (self.counts) as a function of time, rather than the activity, because the activity at each time point may be sensitive to the shape of the decay curve. Parameters ---------- max_error : float, optional The maximum relative error of a count datum to include in the fit. E.g. 0.25=25% (ony points with less than 25% error will be shown). Default, 0.4 (40%). min_counts : float or int, optional The minimum number of counts (decays) for a datum in self.counts to be included in the fit. Default, 1. Returns ------- isotopes : list List of isotopes where R>0. Same indices as fit. (i.e. isotope[0] corresponds to fit[0] and cov[0][0].) fit : np.ndarray The fitted time-averaged production rate for each isotope where R>0. cov : np.ndarray Covariance matrix on the fit. Examples -------- >>> sp = ci.Spectrum('eu_calib_7cm.Spe') >>> sp.isotopes = ['152EU'] >>> dc = ci.DecayChain('152EU', R=[[3E5, 36.0]], units='d') >>> dc.get_counts([sp], EoB='01/01/2016 08:39:08') >>> print(dc.fit_R()) (array(['152EUg'], dtype=object), array([1291584.51735774]), array([[1.67412376e+09]])) """ if self.R is None: raise ValueError('Cannot fit R: R=0.') X = [] R_isotopes = [i for i in self.isotopes if i in pd.unique(self.R['isotope'])] time = np.insert(np.unique(self.R['time']), 0, [0.0]) filter_counts = self.counts[(self.counts['counts']>min_counts)&(self.counts['unc_counts']<max_error*self.counts['counts'])] for ip in R_isotopes: A0 = {p:0.0 for p in self.A0} for n,dt in enumerate(time[1:]-time[:-1]): _R_dict = {ip:self.R[self.R['isotope']==ip].iloc[n]['R']} A0 = {p:self.activity(p, dt, _R_dict=_R_dict, _A_dict=A0) for p in self.A0} X.append([self.decays(c['isotope'], c['start'], c['stop'], _A_dict=A0) for n,c in filter_counts.iterrows()]) X = np.array(X) Y = filter_counts['counts'].to_numpy() dY = filter_counts['unc_counts'].to_numpy() wh = np.array([np.all(x>0) for x in X.T]) p0 = np.average(Y[wh]/X[:,wh], axis=1) p0 = np.where((p0>0)&(np.isfinite(p0)), p0, 1.0) func = lambda X_f, *R_f: np.dot(np.asarray(R_f), X_f) fit, cov = curve_fit(func, X, Y, sigma=dY, p0=p0, bounds=(0.0*p0, np.inf*p0)) for n,ip in enumerate(R_isotopes): df_sub = self.R[self.R['isotope']==ip] self.R.loc[df_sub.index, 'R'] = df_sub['R']*fit[n] self.A0 = {i:0.0 for i in self.A0} for n,dt in enumerate(time[1:]-time[:-1]): _R_dict = {p:self.R[self.R['isotope']==p].iloc[n]['R'] for p in pd.unique(self.R['isotope'])} self.A0 = {p:self.activity(p, dt, _R_dict=_R_dict) for p in self.A0} self.counts = self.counts R_avg = self.R_avg R_norm = np.array([R_avg[R_avg['isotope']==i]['R_avg'].to_numpy()[0] for i in R_isotopes]) if not np.any(np.isfinite(np.diag(cov))): cov = np.ones(cov.shape)*((np.average(dY/Y))*fit)**2 return R_isotopes, R_norm, cov*(R_norm/fit)**2
[docs] def fit_A0(self, max_error=0.4, min_counts=1): """Fit the initial activity to count data Fits a scalar multiplier to the initial activity for each isotope specified in self.A0. The fit minimizes to the number of measured decays (self.counts) as a function of time, rather than the activity, because the activity at each time point may be sensitive to the shape of the decay curve. Parameters ---------- max_error : float, optional The maximum relative error of a count datum to include in the fit. E.g. 0.25=25% (ony points with less than 25% error will be shown). Default, 0.4 (40%). min_counts : float or int, optional The minimum number of counts (decays) for a datum in self.counts to be included in the fit. Default, 1. Returns ------- isotopes : list List of isotopes where A0>0. Same indices as fit. (i.e. isotope[0] corresponds to fit[0] and cov[0][0].) fit : np.ndarray The initial activity for each isotope where A0>0. cov : np.ndarray Covariance matrix on the fit. Examples -------- >>> sp = ci.Spectrum('eu_calib_7cm.Spe') >>> sp.isotopes = ['152EU'] >>> dc = ci.DecayChain('152EU', A0=3.7E4, units='d') >>> dc.get_counts([sp], EoB='01/01/2016 08:39:08') >>> print(dc.fit_A0()) (['152EUg'], array([6501.93665952]), array([[42425.53832341]])) """ if self.R is not None: raise ValueError('Cannot fit A0 when R!=0.') X = [] A0_isotopes = [i for i in self.isotopes if i in self.A0] filter_counts = self.counts[(self.counts['counts']>min_counts)&(self.counts['unc_counts']<max_error*self.counts['counts'])] for ip in A0_isotopes: A0 = {p:(self.A0[p] if p==ip else 0.0) for p in self.A0} X.append([self.decays(c['isotope'], c['start'], c['stop'], _A_dict=A0) for n,c in filter_counts.iterrows()]) X = np.array(X) Y = filter_counts['counts'].to_numpy() dY = filter_counts['unc_counts'].to_numpy() func = lambda X_f, *R_f: np.dot(np.asarray(R_f), X_f) p0 = np.ones(len(X)) fit, cov = curve_fit(func, X, Y, sigma=dY, p0=p0, bounds=(0.0*p0, np.inf*p0)) for n,ip in enumerate(A0_isotopes): self.A0[ip] *= fit[n] self.counts = self.counts A_norm = np.array([self.A0[i] for i in A0_isotopes]) return A0_isotopes, A_norm, cov*(A_norm/fit)**2
[docs] def plot(self, time=None, max_plot=10, max_label=10, max_plot_error=0.4, max_plot_chi2=10, **kwargs): """Plot the activities in the decay chain Plots the activities as a function of time for all radioactive isotopes in the decay chain. Can plot along a specified time grid, else the time will be inferred from the half-life of the parent isotope, or any count information given to self.counts. Parameters ---------- time : array_like, optional Time grid along which to plot. Units must be the same as the decay chain. max_plot : int, optional Maximum number of isotope activities to plot in the decay chain. Default, 10. max_label : int, optional Maximum number of isotope activities to label in the legend. Default, 10. max_plot_error : float, optional The maximum relative error of a count point to include on the plot. E.g. 0.25=25% (ony points with less than 25% error will be shown). Default, 0.4. max_plot_chi2 : float or int, optional Maximum chi^2 of a count point to include on the plot. Only points with a chi^2 less than this value will be shown. Default, 10. Other Parameters ---------------- **kwargs Optional keyword arguments for plotting. See the plotting section of the curie API for a complete list of kwargs. Examples -------- >>> sp = ci.Spectrum('eu_calib_7cm.Spe') >>> sp.isotopes = ['152EU'] >>> dc = ci.DecayChain('152EU', R=[[3E5, 36.0]], units='d') >>> dc.get_counts([sp], EoB='01/01/2016 08:39:08') >>> dc.fit_R() >>> dc.plot() >>> dc = ci.DecayChain('99MO', A0=350E6, units='d') >>> dc.plot() """ f, ax = _init_plot(**kwargs) if time is not None: time = np.asarray(time) elif self.counts is None: time = np.linspace(0, 5.0*np.log(2)/self._chain[0,0], 1000) else: time = np.linspace(0, 1.25*self.counts['stop'].max(), 1000) if max_plot is None: max_plot = len(self.isotopes) ordr = int(np.floor(np.log10(np.average(self.activity(self.isotopes[0], time)))/3.0)) lb_or = {-5:'f',-4:'p',-3:'n',-2:r'$\mu$',-1:'m',0:'',1:'k',2:'M',3:'G',4:'T',5:'E'}[ordr] mult = 10**(-3*ordr) if self.R is not None: A0 = {p:0.0 for p in self.A0} T = np.insert(np.unique(self.R['time']), 0, [0.0]) T_grid = np.array([]) A_grid = {p:np.array([]) for p in A0} for n,dt in enumerate(T[1:]-T[:-1]): _R_dict = {p:self.R[self.R['isotope']==p].iloc[n]['R'] for p in pd.unique(self.R['isotope'])} dT = np.linspace(0, dt, 50) T_grid = dT if n==0 else np.append(T_grid, T_grid[-1]+dT) A_grid = {p:np.append(A_grid[p], self.activity(p, dT, _R_dict=_R_dict, _A_dict=A0)) for p in A0} A0 = {p:A_grid[p][-1] for p in A0} plot_time = time if self.R is None else np.append(T_grid-T_grid[-1], time.copy()) for n,istp in enumerate(self.isotopes): if self._chain[n,0]>1E-12 and n<max_plot: A = self.activity(istp, time) if self.R is not None: A = np.append(A_grid[istp], A) label = Isotope(istp).TeX if n<max_label else None line, = ax.plot(plot_time, A*mult, label=label) if self.counts is not None: df = self.counts[self.counts['isotope']==istp] if len(df): x, y, yerr = df['start'].to_numpy(), df['activity'].to_numpy(), df['unc_activity'].to_numpy() idx = np.where((max_plot_error*y>yerr)&(yerr>0.0)&(np.isfinite(yerr))) if len(x[idx]>0): x, y, yerr = x[idx], y[idx], yerr[idx] idx = np.where((self.activity(istp, x)-y)**2/yerr**2<max_plot_chi2) if len(x[idx]>0): x, y, yerr = x[idx], y[idx], yerr[idx] ax.errorbar(x, y*mult, yerr=yerr*mult, ls='None', marker='o', color=line.get_color(), label=None) ax.set_xlabel('Time ({})'.format(self.units)) ax.set_ylabel('Activity ({}Bq)'.format(lb_or)) ax.legend(loc=0) return _draw_plot(f, ax, **kwargs)