Waveform analysis - 1

  • Waveforms are provided in the form of numpy arrays packed into npz files, see Input/output of numpy objects)

  • let's define a scheleton analysis from scratch in order to:

    • open the input file
    • analyse the waveform and find light pulses
    • store the results in an output file toghether with the input data (for later comparison)
In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

Open the file and inspect the content

In [2]:
data = np.load('sample.npz')
data.files
Out[2]:
['wf', 'lstime', 'lsampl', 'htime', 'hampl', 'htype']

Access a given element:

In [3]:
data['wf'], data['wf'].shape, data['wf'].size, data['wf'].ndim 
Out[3]:
(array([-1,  3, -1, ...,  0,  0,  2], dtype=int32), (14551,), 14551, 1)

Assign a variable to the array (more convenient) and plot it:

In [4]:
wf = data['wf']
plt.plot(wf) # memo: if x is omitted is automatically set to an integer index.
plt.show()

Let's implement some trivial analysis: find every time that the signal has crossed a given threshold (e.g.: 1 standar deviation above zero baseline).

In [5]:
## begin with an absolute number and then change to something more relative

thr = wf.std()
crossings = []
for i,v in enumerate(wf):
    if i>0 and v > thr and wf[i-1]<=thr:
        crossings.append([i,v])
        
# make it a numpy array for easier handling
crossings = np.array(crossings)
In [6]:
print(crossings)
[[   53  2197]
 [  353  2670]
 [  953  2100]
 [ 1852  2097]
 [ 3053  1742]
 [ 4552  1763]
 [ 6357  1793]
 [ 8456  1823]
 [10856  1976]
 [13553  2103]]
In [7]:
fig, ax = plt.subplots(figsize=(15,5))
                
#plt.axis([0,400,0,16000])
plt.plot(wf, linewidth=1) 
plt.plot(crossings[:,0],crossings[:,1],'or' )
plt.show()

Implement in a function and set the threashold as a function argument:

In [8]:
def find_crossing(array, thr):
    crossings = []
    for i,v in enumerate(array):
        if i>0 and v > thr and array[i-1]<=thr:
            crossings.append([i,v])
    return np.array(crossings)


crossings = find_crossing(wf, 0.2*wf.std())
print(crossings)
[[   52  1095]
 [  351   605]
 [  951   432]
 [ 1851   702]
 [ 3052   932]
 [ 4551   515]
 [ 6353   580]
 [ 8453   631]
 [10852   385]
 [13551   348]]
In [9]:
fig, ax = plt.subplots(figsize=(15,5))

crossings1 = find_crossing(wf, wf.std())
crossings2 = find_crossing(wf, 0.2*wf.std())
#plt.axis([0,400,0,16000])
plt.plot(wf, linewidth=1) 
plt.plot(crossings1[:,0],crossings1[:,1],'or' )
plt.plot(crossings2[:,0],crossings2[:,1],'^b' )
plt.show()

How to define amplitude? Define a "find_pulses" function and find the max.

In [10]:
def find_pulses(array, thr):
    pulse_begin, pulse_end = [] , []
    for i,v in enumerate(array):
        if i>0:
            if v > thr and array[i-1]<=thr:
                pulse_begin.append([i,v])
            if v<thr and array[i-1]>=thr:
                pulse_end.append([i,v])
    return np.array(pulse_begin), np.array(pulse_end)


def find_peaks(array, pulse_begin, pulse_end):
    peaks=[]
    for i,p in enumerate(pulse_begin):
        pulse_times  = []
        pulse_values = []
        #print('start at: ', pulse_begin[i][0], end=' ')
        #print('stop at: ',pulse_end[i][0], end=' ')
        for j in range(pulse_begin[i][0], pulse_end[i][0]):
            pulse_times.append(j)
            pulse_values.append(array[j])
            
        pulse_times = np.array(pulse_times)
        pulse_values = np.array(pulse_values)

        #print(' ==> max is: ',pulse_values.max())
        
        peaks.append( [ pulse_times[pulse_values.argmax()] , pulse_values.max() ])
        
    return np.array(peaks)
In [11]:
wf = data['wf']
b, e = find_pulses(wf, 1000)
peaks = find_peaks(wf, b, e)
In [12]:
fig, ax = plt.subplots(figsize=(15,5))

#plt.axis([0,400,0,16000])
plt.plot(wf, linewidth=1) 
plt.plot(b[:,0],b[:,1],'or' )
plt.plot(e[:,0],e[:,1],'^b' )
plt.plot(peaks[:,0], peaks[:,1],'*g' )
plt.show()

Now pack the results and store in a file

In [13]:
original_data = {k:data[k] for k in data.files}
new_data = {'peaks_time':peaks[:,0], 'peaks_ampl':peaks[:,1]}
np.savez_compressed('outfile.npz', **original_data, **new_data)

Verify its content

In [14]:
c = np.load('outfile.npz')
c.files
Out[14]:
['wf',
 'lstime',
 'lsampl',
 'htime',
 'hampl',
 'htype',
 'peaks_time',
 'peaks_ampl']
In [ ]: