Source code for npat.irradiation

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import re
import os
import numpy as np
import sqlite3
import pandas as pd

from .plotter import colors
from .plotter import _init_plot
from .plotter import _close_plot
from .dbmgr import get_cursor
from .isotope import Isotope

global ZG_CONNECTIONS_DICT
ZG_CONNECTIONS_DICT = {}

#### TODO: allow new compound definitions as {'El':wt_pct} and accept .json stack

[docs]class Ziegler(object): """Method for solving energy loss within stacked-target foil experiment ... Parameters ---------- x : type Description of parameter `x`. Attributes ---------- Methods ------- Notes ----- References ---------- Examples -------- """ def __init__(self, stack, beam=None, **kwargs): ### stack is list of dicts, which must have 'compound' specified. ### Areal density specified by either 'ad' (mg/cm^2), both mass (g) and area (cm^2), ### 'thickness' (mm) or both 'density' (g/cm^3) and 'thickness' (mm) zdb = get_cursor('ziegler') self.protons = {int(i[0]):list(map(float,i[1:])) for i in zdb.execute('SELECT * FROM protons')} self.helium = {int(i[0]):list(map(float,i[1:])) for i in zdb.execute('SELECT * FROM helium')} self.ionization = {int(i[0]):list(map(float,i[1:])) for i in zdb.execute('SELECT * FROM ionization')} self.weights = {int(i[0]):list(map(float,i[1:3])) for i in zdb.execute('SELECT * FROM weights')} self.compounds = {str(i[0]):[[int(h.split(':')[0]), float(h.split(':')[1])] for h in i[2].split(',')] for i in zdb.execute('SELECT * FROM compounds')} self.compounds = {cm:[[i[0],i[1]/sum([m[1] for m in self.compounds[cm]])] for i in self.compounds[cm]] for cm in self.compounds} self.densities = {str(i[0]):float(i[1]) for i in zdb.execute('SELECT * FROM compounds')} self.elements = sorted([[str(i[0]), int(i[2].split(':')[0])] for i in zdb.execute('SELECT * FROM compounds') if len(i[2].split(':'))==2], key=lambda h:len(h[0]), reverse=True) self._meta = {} self.meta = {'beam_istp':'1H', 'E0':33.0, 'dE0':0.3, 'N':10000, 'dp':1.0, 'chunk_size':1E7, 'threads':1, 'solved':False, 'accuracy':0.01, 'min_steps':2, 'max_steps':50} self._stack = [] if beam is not None: print('Keyword `beam` deprecated: see documentation for proper input.') self.meta = beam self.meta = kwargs if type(stack)==str: stack = pd.read_csv(stack) stack = [{i:r[i] for i in stack.columns if not (np.isnan(r[i]) if type(r[i])==float else False)} for n,r in stack.iterrows()] self.stack = stack def check_db(self, db=None): if db is not None: path, fnm = os.path.split(db) if path in ['',' ']: path = os.getcwd() if fnm in ['',' ']: raise ValueError('Invalid db Filename: {}'.format(db)) db_fnm = os.path.join(path, fnm) global ZG_CONNECTIONS_DICT if os.path.exists(db_fnm): if db_fnm not in ZG_CONNECTIONS_DICT: ZG_CONNECTIONS_DICT[db_fnm] = sqlite3.connect(db_fnm) self.db_connection = ZG_CONNECTIONS_DICT[db_fnm] self.db = self.db_connection.cursor() else: print('WARNING: DB {} does not exist, creating new file.'.format(fnm)) from sqlite3 import Error try: self.db_connection = sqlite3.connect(db_fnm) ZG_CONNECTIONS_DICT[db_fnm] = self.db_connection self.db = self.db_connection.cursor() except Error as e: print(e) @property def meta(self): return self._meta @meta.setter def meta(self, meta_dict): for nm in meta_dict: self._meta[nm] = meta_dict[nm] if nm in ['beam_istp', 'istp']: _ITP = Isotope(meta_dict[nm]) self._meta['Z'] = _ITP.Z self._meta['amu'] = _ITP.mass if self._meta['min_steps']>self._meta['max_steps']: self._meta['max_steps'] = self._meta['min_steps']+1 print('WARNING: min_steps > max_steps, setting max_steps to {}'.format(self._meta['max_steps'])) self._meta['solved'] = False def __getitem__(self, key): if type(key)==str: for s in self.stack: if s['name']==key: return s return None else: return self.stack[int(key)] @property def stack(self): if not self.meta['solved']: self._solve() return self._stack @stack.setter def stack(self, _stack): self._stack = list(_stack) self._meta['solved'] = False for s in self._stack: if 'name' not in s: s['name'] = None if 'compound' not in s: raise ValueError('compound must be specified') if type(s['compound'])==dict: cs = '' for c in s['compound']: print(c) cs = c self.compounds[c] = s['compound'][c] print(self.compounds[c]) s['compound'] = cs if s['compound']=='': raise ValueError('compound must be specified') if 'ad' not in s: if 'area' in s and 'mass' in s: s['ad'] = 1e3*s['mass']/s['area'] elif 'density' in s and 'thickness' in s: s['ad'] = 100.0*s['density']*s['thickness'] elif s['compound'] in self.densities and 'thickness' in s: s['ad'], s['density'] = 100.0*self.densities[s['compound']]*s['thickness'], self.densities[s['compound']] if 'density' not in s: if s['compound'] in self.densities: s['density'] = self.densities[s['compound']] if 'thickness' not in s and 'ad' in s: s['thickness'] = s['ad']/(100.0*s['density']) if 'ad' not in s: raise ValueError('Areal density either not specified or not computable: {}'.format(s)) if s['compound'] not in self.compounds: cm = s['compound'] nums = [str(i) for i in range(10)] cm_list = [] for el,Z in self.elements: if len(cm)==0: break f = cm.split(el) if len(f)>1: c = f[0] for e in f[1:]: if len(e): if e[0] in nums: cm_list.append([Z, float(e[0])]) if len(e)>1: c += e[1:] else: cm_list.append([Z, 1.0]) c += e else: cm_list.append([Z, 1.0]) cm = c if len(cm)==0 and len(cm_list): self.compounds[s['compound']] = [[i[0], i[1]/sum([i[1] for i in cm_list])] for i in cm_list] if s['compound'] not in self.compounds: raise ValueError('compound {} not known.'.format(s['compound']))
[docs] def get_S(self, E, cm): """Description ... Parameters ---------- x : type Description of parameter `x`. Returns ------- Notes ----- References ---------- Examples -------- """ # energy E in MeV , stopping power in MeV/(mg/cm2) E, r0 = np.asarray(E), False if not E.shape: E, r0 = np.array([E]), True S = np.zeros(len(E)) A_ave = sum([self.weights[z2][0]*w for z2,w in self.compounds[cm]]) for z2, w in self.compounds[cm]: S_nucl = self.get_S_nucl(E, self.meta['Z'], self.meta['amu'], z2, self.weights[z2][0]) if self.meta['Z']==1: S += w*(S_nucl+self.get_S_p(E, z2, self.meta['amu'])) elif self.meta['Z']==2: S += w*(S_nucl+self.get_S_He(E, z2, self.meta['amu'])) else: S += w*(S_nucl+self.get_S_elec(E, z2, self.meta['amu'], self.meta['Z'])) return S[0]*0.6022140857/A_ave if r0 else S*0.6022140857/A_ave
def get_S_nucl(self, E, z1, m1, z2, m2): RM = (m1+m2)*np.sqrt((z1**(2/3.0)+z2**(2/3.0))) ER = 32.53*m2*1E3*E/(z1*z2*RM) return (0.5*np.log(1.0+ER)/(ER+0.10718+ER**0.37544))*8.462*z1*z2*m1/RM def get_S_p(self, eng, z2, M1=1.00727647): S = np.zeros(len(eng)) E = 1E3*eng/M1 A = self.protons[z2] beta_sq = np.where(E>=1E3,1.0-1.0/(1.0+E/931478.0)**2,0.9) B0 = np.where(E>=1E3,np.log(A[6]*beta_sq/(1.0-beta_sq))-beta_sq,0.0) Y = np.log(E[(1E3<=E)&(E<=5E4)]) B0[np.nonzero(np.where((1E3<=E)&(E<=5E4),B0,0))] -= A[7]+A[8]*Y+A[9]*Y**2+A[10]*Y**3+A[11]*Y**4 S[E>=1E3] = (A[5]/beta_sq[E>=1E3])*B0[E>=1E3] S_low = A[1]*E[(10<=E)&(E<1E3)]**0.45 S_high = (A[2]/E[(10<=E)&(E<1E3)])*np.log(1.0+(A[3]/E[(10<=E)&(E<1E3)])+A[4]*E[(10<=E)&(E<1E3)]) S[(10<=E)&(E<1E3)] = S_low*S_high/(S_low+S_high) S[(0<E)&(E<10)] = A[0]*E[(0<E)&(E<10)]**0.5 return S def get_S_He(self,eng,z2,M1=4.003): S = np.zeros(len(eng)) E = eng*4.0015/M1 E = np.where(E>=0.001,E,0.001) A = self.helium[z2] S_low = A[0]*(1E3*E[E<=10])**A[1] S_high = (A[2]/E[E<=10])*np.log(1.0+(A[3]/E[E<=10])+A[4]*E[E<=10]) S[E<=10] = S_low*S_high/(S_low+S_high) Y = np.log(1.0/E[E>10]) S[E>10] = np.exp(A[5]+A[6]*Y+A[7]*Y**2+A[8]*Y**3) return S def get_S_elec(self, eng, z2, M1, z1): S = np.zeros(len(eng)) E_keV = 1E3*eng S[E_keV/M1<1000] = self.get_eff_Z_ratio(E_keV[E_keV/M1<1000],z1,M1)**2*self.get_S_p(eng[E_keV/M1<1000],z2,M1) Y = E_keV[E_keV/M1>=1000]/M1 beta_sq = 1.0-1.0/(1.0+Y/931478.0)**2 FX = np.log(2E6*0.511003*beta_sq/(1.0-beta_sq))-beta_sq ZHY = 1.0-np.exp(-0.2*np.sqrt(Y)-0.0012*Y-0.00001443*Y**2) Z1EFF = self.get_eff_Z_ratio(E_keV[E_keV/M1>=1000],z1,M1)*ZHY S[E_keV/M1>=1000] = 4E-1*np.pi*(1.9732857/137.03604)**2*Z1EFF**2*z2*(FX-np.log(self.ionization[z2][0]))/(0.511003*beta_sq) return S def get_eff_Z_ratio(self, E_keV, z1, M1): if z1==1: return np.ones(len(eng)) elif z1==2: Y = np.log(E_keV/M1) return z1*(1.0-np.exp(-0.7446-0.1429*Y-0.01562*Y**2+0.00267*Y**3-0.000001325*Y**8)) elif z1==3: Y = E_keV/M1 return z1*(1.0-np.exp(-0.7138-0.002797*Y-0.000001348*Y**2)) BB = -0.886*np.sqrt(0.04*E_keV/M1)/z1**(2/3.0) return z1*(1.0-np.exp(BB-0.0378*np.sin(0.5*np.pi*BB))*(1.034-0.1777*np.exp(-0.08114*z1))) def _calc_bins(self): return np.arange(0.0, self.meta['E0']+10.0*self.meta['dE0'], min([0.1, self.meta['E0']/500.0])) def _solve_chunk(self, N): E0 = self.meta['E0']+self.meta['dE0']*np.random.normal(size=int(N)) bins = self._calc_bins() hists = [] dp = self.meta['dp'] for n, sm in enumerate(self._stack): E_bar = [E0] if np.average(E0)<=0.0: hists.append(np.concatenate([[N],np.zeros(len(bins)-2)])) else: steps = int((1.0/self.meta['accuracy'])*sm['ad']*dp*self.get_S(np.average(E0), sm['compound'])/np.average(E0)) steps = min([max([self.meta['min_steps'], steps]), self.meta['max_steps']]) dr = (1.0/float(steps)) for i in range(steps): S1 = self.get_S(E0, sm['compound']) E1 = E0 - dr*dp*sm['ad']*S1 E1 = np.where(E1>0, E1, 0.0) E0 = E0 - dr*0.5*dp*sm['ad']*(S1+self.get_S(E1, sm['compound'])) E0 = np.where(E0>0, E0, 0.0) E_bar.append(E0) hists.append(np.histogram(np.concatenate(E_bar), bins=bins)[0]) return hists def _solve(self): if self.meta['solved']: return self.meta['solved'] = True dN = np.linspace(0, self.meta['N'], int(np.ceil(self.meta['N']/float(self.meta['chunk_size'])))+1, dtype=int) histos = list(map(self._solve_chunk, dN[1:]-dN[:-1])) bins = self._calc_bins() energy = 0.5*(bins[1:]+bins[:-1]) for n,sm in enumerate(self._stack): sm['flux'] = np.sum([h[n] for h in histos], axis=0) sm['flux'] = sm['flux']/np.sum(sm['flux']) sm['mu_E'] = np.sum(sm['flux']*energy) sm['sig_E'] = np.sqrt(np.sum(sm['flux']*(energy-sm['mu_E'])**2)) lh = np.where(sm['flux']>0)[0] if lh.size: if lh[0]==0: nm = sm['name'] if sm['name'] is not None else sm['compound']+str(n+1) print('WARNING: Beam stopped in foil {}'.format(nm)) sm['flux'] = sm['flux'][lh[0]:lh[-1]] sm['energy'] = energy[lh[0]:lh[-1]] sm['bins'] = bins[lh[0]:lh[-1]+1]
[docs] def saveas(self, *fnms): """Description ... Parameters ---------- x : type Description of parameter `x`. Returns ------- Notes ----- References ---------- Examples -------- """ cols = ['name','compound','thickness','density','ad','mu_E','sig_E'] stack = pd.DataFrame([{c:(sm[c] if c in sm else None) for c in cols} for sm in self.stack], columns=cols) cols = ['name','energy','flux'] fluxes = pd.concat([pd.DataFrame({c:sm[c] for c in cols}, columns=cols) for sm in self.stack if sm['name'] is not None], ignore_index=True) for fl in fnms: if any([fl.endswith(e) for e in ['.png','.pdf','.eps','.pgf','.ps','.raw','.rgba','.svg','.svgz']]): self.plot(saveas=fl, show=False) if fl.endswith('.csv'): stack.to_csv(fl.replace('.csv','_stack.csv'), index=False) fluxes.to_csv(fl.replace('.csv','_fluxes.csv'), index=False) if fl.endswith('.db'): self.check_db(fl) stack.to_sql('stack', self.db_connection, if_exists='replace', index=False) fluxes.to_sql('fluxes', self.db_connection, if_exists='replace', index=False)
[docs] def summarize(self, samples=None): """Description ... Parameters ---------- x : type Description of parameter `x`. Returns ------- Notes ----- References ---------- Examples -------- """ for n,sm in enumerate(self.stack): if sm['name'] is not None: if samples is not None: if not any([re.match(s, sm['name']) for s in samples]): continue nm = sm['name'] if sm['name'] is not None else sm['compound']+str(n+1) print(nm+': '+str(round(sm['mu_E'], 2))+' +/- '+str(round(sm['sig_E'], 2))+' (MeV)')
[docs] def plot_S(self, compound, energy=None, **kwargs): """Description ... Parameters ---------- x : type Description of parameter `x`. Returns ------- Notes ----- References ---------- Examples -------- """ if energy is None: energy = 10.0**np.arange(0.1,2.8,0.1) f,ax = _init_plot(**kwargs) ax.plot(energy, self.get_S(energy, compound), label=compound.title()) ax.set_xlabel('Energy (MeV)') ax.set_ylabel(r'Stopping Power (MeV$\cdot$mg$^{-1}\cdot$cm$^{-2}$)') ax.set_xscale('log') ax.legend(loc=0) return _close_plot(f, ax, **kwargs)
[docs] def plot(self, samples=None, **kwargs): """Description ... Parameters ---------- x : type Description of parameter `x`. Returns ------- Notes ----- References ---------- Examples -------- """ if type(samples)==str: samples = [samples] f,ax = _init_plot(**kwargs) for sm in self.stack: if sm['name'] is not None: if samples is not None: if not any([re.match(s, sm['name']) for s in samples]): continue x, y = np.array([sm['bins'][:-1],sm['bins'][1:]]).T.flatten(), np.array([sm['flux'],sm['flux']]).T.flatten() ax.plot(x,y,label=sm['name']) ax.set_xlabel('Energy (MeV)') ax.set_ylabel('Flux (a.u.)') ax.legend(loc=0) return _close_plot(f, ax, **kwargs)