Waveforms are provided in the form of numpy arrays packed into npz files, see Input/output of numpy objects)
let's define a scheleton analysis from scratch in order to:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
Open the file and inspect the content
data = np.load('sample.npz')
data.files
Access a given element:
data['wf'], data['wf'].shape, data['wf'].size, data['wf'].ndim
Assign a variable to the array (more convenient) and plot it:
wf = data['wf']
plt.plot(wf) # memo: if x is omitted is automatically set to an integer index.
plt.show()
Let's implement some trivial analysis: find every time that the signal has crossed a given threshold (e.g.: 1 standar deviation above zero baseline).
## begin with an absolute number and then change to something more relative
thr = wf.std()
crossings = []
for i,v in enumerate(wf):
if i>0 and v > thr and wf[i-1]<=thr:
crossings.append([i,v])
# make it a numpy array for easier handling
crossings = np.array(crossings)
print(crossings)
fig, ax = plt.subplots(figsize=(15,5))
#plt.axis([0,400,0,16000])
plt.plot(wf, linewidth=1)
plt.plot(crossings[:,0],crossings[:,1],'or' )
plt.show()
Implement in a function and set the threashold as a function argument:
def find_crossing(array, thr):
crossings = []
for i,v in enumerate(array):
if i>0 and v > thr and array[i-1]<=thr:
crossings.append([i,v])
return np.array(crossings)
crossings = find_crossing(wf, 0.2*wf.std())
print(crossings)
fig, ax = plt.subplots(figsize=(15,5))
crossings1 = find_crossing(wf, wf.std())
crossings2 = find_crossing(wf, 0.2*wf.std())
#plt.axis([0,400,0,16000])
plt.plot(wf, linewidth=1)
plt.plot(crossings1[:,0],crossings1[:,1],'or' )
plt.plot(crossings2[:,0],crossings2[:,1],'^b' )
plt.show()
How to define amplitude? Define a "find_pulses" function and find the max.
def find_pulses(array, thr):
pulse_begin, pulse_end = [] , []
for i,v in enumerate(array):
if i>0:
if v > thr and array[i-1]<=thr:
pulse_begin.append([i,v])
if v<thr and array[i-1]>=thr:
pulse_end.append([i,v])
return np.array(pulse_begin), np.array(pulse_end)
def find_peaks(array, pulse_begin, pulse_end):
peaks=[]
for i,p in enumerate(pulse_begin):
pulse_times = []
pulse_values = []
#print('start at: ', pulse_begin[i][0], end=' ')
#print('stop at: ',pulse_end[i][0], end=' ')
for j in range(pulse_begin[i][0], pulse_end[i][0]):
pulse_times.append(j)
pulse_values.append(array[j])
pulse_times = np.array(pulse_times)
pulse_values = np.array(pulse_values)
#print(' ==> max is: ',pulse_values.max())
peaks.append( [ pulse_times[pulse_values.argmax()] , pulse_values.max() ])
return np.array(peaks)
wf = data['wf']
b, e = find_pulses(wf, 1000)
peaks = find_peaks(wf, b, e)
fig, ax = plt.subplots(figsize=(15,5))
#plt.axis([0,400,0,16000])
plt.plot(wf, linewidth=1)
plt.plot(b[:,0],b[:,1],'or' )
plt.plot(e[:,0],e[:,1],'^b' )
plt.plot(peaks[:,0], peaks[:,1],'*g' )
plt.show()
Now pack the results and store in a file
original_data = {k:data[k] for k in data.files}
new_data = {'peaks_time':peaks[:,0], 'peaks_ampl':peaks[:,1]}
np.savez_compressed('outfile.npz', **original_data, **new_data)
Verify its content
c = np.load('outfile.npz')
c.files