In this example a peak search is performed just using internal numpy functions (no for loops).
List of useful functions:
import numpy as np
import matplotlib.pyplot as plt
wf = np.load('sample_expo300.npz')['wf']
# wf = np.load('data/poiss500.npz')['wf']
%matplotlib inline
fig, ax = plt.subplots(figsize=(18,4))
ax.plot(wf[:2000], color='k')
plt.show()
Search of the pulses using the 1st derivative
wfDER = np.diff(wf, n=1)
fig, ax = plt.subplots(figsize=(18,4))
ax.plot(wf[:2000], color='k', alpha=0.4)
ax2 = ax.twinx()
ax2.tick_params(axis ='y', labelcolor = 'b')
ax2.plot(wfDER[:2000], color='b')
plt.show()
Search of the start: beginning of the light pulse. It happens when the 1st derivative goes beyond a given threshold
Search of the stop: end of the light pulse. It happens when the 1st derivative goes below a given threshold
thrUP = 25
thrDOWN = 0
crossesUP = (wfDER>thrUP) & (np.roll(wfDER,1)<=thrUP)
start, = np.where(crossesUP)
crossesDOWN = (wfDER<thrDOWN) & (np.roll(wfDER,1)>=thrDOWN)
stop, = np.where(crossesDOWN)
time = np.arange(wf.size) # simple time array
fig, ax = plt.subplots(figsize=(18,4))
ax.plot(wf, color='k', alpha=0.4)
ax2 = ax.twinx()
ax2.tick_params(axis ='y', labelcolor = 'b')
ax2.plot(wfDER, color='b')
ax2.plot(start, np.ones(start.size)*110, '>g', label='Start')
ax2.axhline(thrUP, color='g', linestyle='--', linewidth=1)
ax2.plot(stop, np.ones(stop.size)*90, '<r', label='Stop')
ax2.axhline(thrDOWN, color='r', linestyle='--', linewidth=1)
plt.legend()
plt.show()
Zoom
fig, ax = plt.subplots(figsize=(18,4))
ax.plot(wf[:500], color='k', alpha=0.4)
ax2 = ax.twinx()
ax2.tick_params(axis ='y', labelcolor = 'b')
ax2.plot(wfDER[:500], color='b')
ax2.plot(start[0], np.ones(1)*110, '>g', label='Start')
ax2.axhline(thrUP, color='g', linestyle='--', linewidth=1)
ax2.plot(stop[:100], np.ones(100)*90, '<r', label='Stop')
ax2.axhline(thrDOWN, color='r', linestyle='--', linewidth=1)
plt.legend()
plt.show()
A possible problem is that there could be more stops than starts.
This problem can be solved by using a function that scan along time the waveform:
def scan_and_find_pulses(wf, thrUP=30, thrDOWN=0):
starts, stops = [], []
pulseON = False
for i in range(wfDER.size):
if i>0:
if (wfDER[i]>thrUP) & (wfDER[i-1]<=thrUP) & (not pulseON):
starts.append(i)
pulseON = True
if (wfDER[i]<thrDOWN) & (wfDER[i-1]>=thrDOWN) & pulseON:
stops.append(i)
pulseON = False
starts = np.array(starts)
stops = np.array(stops)
return np.vstack((starts, stops))
scan_and_find_pulses(wfDER, 30, 0)
starts = np.array([1, 10, 30, 50, 95])
stops = np.array([6, 15, 19, 35, 39, 67, 110])
Graphical representation
plt.subplots(figsize=(12,1))
plt.plot(starts, np.ones(starts.size)*5, '>g')
for i in range(starts.size):
plt.text(starts[i], 0,f'{starts[i]}', {'color':'g', 'ha':'center'})
plt.plot(stops, np.ones(stops.size)*4, '<r')
for i in range(stops.size):
plt.text(stops[i], 0,f'{stops[i]}', {'color':'r', 'ha':'center'})
plt.ylim(-3,6)
plt.axis('off')
plt.show()
The numpy searchsorted() function can be used. For each element of the second array (stops), find where it should be inserted in the first array to be properly sorted.
The return value is an index array with the same shape of stops
np.searchsorted(starts, stops)
Notice that where the stops are not folloing a start, the index does not change. The, if a np.diff() is calculated on this array (with a prepend=0 argument), a series of 1s and 0s will be returned where 1s indicate the position of a "real" stop.
np.diff( np.searchsorted(starts, stops), prepend=0)
In order to have the list of index where this happens the np.where() function can be used:
np.where( np.diff( np.searchsorted(starts, stops), prepend=0) )
Visually:
real_stops, = np.where( np.diff( np.searchsorted(starts, stops), prepend=0) )
plt.subplots(figsize=(12,1))
plt.plot(starts, np.ones(starts.size)*5, '>g')
for i in range(starts.size):
plt.text(starts[i], 0,f'{starts[i]}', {'color':'g', 'ha':'center'})
plt.plot(stops, np.ones(stops.size)*4, '<r')
for i in range(stops.size):
plt.text(stops[i], 0,f'{stops[i]}', {'color':'r', 'ha':'center'})
# plt.text(stops[i], -2,f'{np.searchsorted(starts, stops)[i]}', {'color':'r', 'ha':'center'})
plt.text(stops[i], -4,f'{np.diff( np.searchsorted(starts, stops), prepend=0)[i]}', {'color':'r', 'ha':'center'})
plt.ylim(-3,6)
for p in stops[real_stops]:
plt.gca().add_artist(plt.Circle((p, 0.5), 2, fill=False, edgecolor='red'))
plt.axis('off')
plt.show()
What happen if one start is not followed by a stop?
starts = np.array([1, 10, 30, 50, 56, 95, 100])
stops = np.array([6, 15, 19, 35, 39, 67, 110])
np.searchsorted(starts, stops)
np.diff( np.searchsorted(starts, stops), prepend=0)
There is a number greater that one in the diff array. But how can a "start without a stop" be detected?
Using the searchsorted the other way around and spotting for repeated numbers...
np.searchsorted(stops, starts)
... and using the diff to highlight them (whenever a start is preceded by another start, the first is the one to be kept):
np.diff(np.searchsorted(stops, starts), prepend=-1)
real_starts, = np.where(np.diff(np.searchsorted(stops, starts), prepend=-1))
real_starts
real_stops, = np.where( np.diff( np.searchsorted(starts, stops), prepend=0) )
plt.subplots(figsize=(12,1))
plt.plot(starts, np.ones(starts.size)*5, '>g')
for i in range(starts.size):
plt.text(starts[i], 0,f'{starts[i]}', {'color':'g', 'ha':'center'})
plt.text(starts[i], -4,f'{np.diff( np.searchsorted(stops, starts), prepend=-1)[i]}', {'color':'g', 'ha':'center'})
plt.plot(stops, np.ones(stops.size)*4, '<r')
for i in range(stops.size):
plt.text(stops[i], 0,f'{stops[i]}', {'color':'r', 'ha':'center'})
# plt.text(stops[i], -2,f'{np.searchsorted(starts, stops)[i]}', {'color':'r', 'ha':'center'})
plt.text(stops[i], -4,f'{np.diff( np.searchsorted(starts, stops), prepend=0)[i]}', {'color':'r', 'ha':'center'})
plt.ylim(-3,6)
for p in stops[real_stops]:
plt.gca().add_artist(plt.Circle((p, 0.5), 2, fill=False, edgecolor='red'))
for p in starts[real_starts]:
plt.gca().add_artist(plt.Circle((p, 0.5), 2, fill=False, edgecolor='green'))
plt.axis('off')
plt.show()
Then the two arrays of "unique" coupled starts and stops can be vstacked
np.vstack((starts[real_starts], stops[real_stops]))
Let's build a function which returs a 2D array of matched starts and stops
def get_pulses(wf, thrUP=30, thrDOWN=0):
starts, = np.where((wfDER>thrUP) & (np.roll(wfDER,1)<=thrUP) )
stops, = np.where((wfDER<thrDOWN) & (np.roll(wfDER,1)>=thrDOWN) )
real_starts, = np.where(np.diff(np.searchsorted(stops, starts), prepend=-1))
real_stops, = np.where( np.diff( np.searchsorted(starts, stops), prepend=0) )
return np.vstack((starts[real_starts], stops[real_stops]))
pulses = get_pulses(wfDER, 50, 0)
pulses.shape
Now that the pulses' location is available, a way of measuring the pulse amplitude can be a sum in that precise ranges. For example the 1st derivative integral can be used.
We can do it "manually" by a for loop:
pulses_ampl = []
# loop on all pulses
for i in range(pulses.shape[1]):
tot = 0
# loop on the pulse range
p_start = pulses[0,i]
p_stop = pulses[1,i]
for j in range(p_start, p_stop + 1):
tot += wfDER[j]
pulses_ampl.append(tot)
Or we can implement a faster function using the numpy.reduceat() function which extend an operation (e.g. "add") to a given set of indeces.
Before, the arrays of starts and stops must be merged "inline" in a single array:
# creating an empty array with 2 x start shape (better to force the dtype since it has to be used later as index array)
alternated = np.zeros(pulses.shape[1]*2, dtype='int')
# filling the array alternatively with start and stop, start and stop, ...
alternated[::2] = pulses[0,:]
alternated[1::2] = pulses[1,:]
print(pulses.shape, alternated.shape)
The same result can be obtained by using numpy.flatten() with "F" ordering:
alternated = pulses.flatten('F')
Now the "add" operation can be applied repeat repetitively across an array of index, taking the results every other index ([::2])
amplitudes = np.add.reduceat(wfDER, alternated)[::2]
amplitudes.shape
# wf = np.load('sample_expo300.npz')['wf']
wf = np.load('data/expo500.npz')['wf']
wfDER = np.diff(wf, n=1)
def find_pulses_explicit(wfDER, thrUP=30, thrDOWN=0):
starts, stops = [], []
pulseON = False
for i in range(wfDER.size):
if i>0:
if (wfDER[i]>thrUP) & (wfDER[i-1]<=thrUP) & (not pulseON):
starts.append(i)
pulseON = True
if (wfDER[i]<thrDOWN) & (wfDER[i-1]>=thrDOWN) & pulseON:
stops.append(i)
pulseON = False
starts = np.array(starts)
stops = np.array(stops)
return np.vstack((starts, stops))
def find_amplitudes_explicit(wfDER, thrUP=30, thrDOWN=0):
pulses_ampl = []
pulses = find_pulses_explicit(wfDER, thrUP, thrDOWN)
# loop on all pulses
for i in range(pulses.shape[1]):
tot = 0
# loop on the pulse range
p_start = pulses[0,i]
p_stop = pulses[1,i]
for j in range(p_start, p_stop + 1):
tot += wfDER[j]
pulses_ampl.append(tot)
return np.vstack((np.array(pulses_ampl), pulses))
%time ps1 = find_amplitudes_explicit(wfDER, thrUP=50, thrDOWN=0)
def find_pulses_numpy(wfDER, thrUP=30, thrDOWN=0):
'''
This function returns a (2,N) array with N being the number of the found pulses and:
[0,:] = start of the pulse
[1,:] = stop of the pulse
'''
starts, = np.where((wfDER>thrUP) & (np.roll(wfDER,1)<=thrUP) )
stops, = np.where((wfDER<thrDOWN) & (np.roll(wfDER,1)>=thrDOWN) )
real_starts, = np.where(np.diff(np.searchsorted(stops, starts), prepend=-1))
real_stops, = np.where( np.diff( np.searchsorted(starts, stops), prepend=0) )
return np.vstack((starts[real_starts], stops[real_stops]))
def find_amplitudes_numpy(wfDER, thrUP=30, thrDOWN=0):
'''
This function returns a (3,N) array with N being the number of the found pulses and:
[0,:] = amplitude of the pulse
[1,:] = start of the pulse
[2,:] = stop of the pulse
'''
pulses = find_pulses_numpy(wfDER, thrUP, thrDOWN)
ampl = np.add.reduceat(wfDER, pulses.flatten('F'))[::2]
return np.vstack((ampl, pulses))
%time ps2 = find_amplitudes_numpy(wfDER, thrUP=50, thrDOWN=0)
print(ps1.shape, ps1.dtype, ps2.shape, ps2.dtype)
Found amplitudes
fig, ax = plt.subplots(1,2,figsize=(12,5))
ax[0].hist(ps1[0,:], 50)
ax[1].hist(ps1[0,:], 50)
plt.show()
Found deltat
fig, ax = plt.subplots(1,2,figsize=(12,5))
ax[0].hist(np.diff(ps1[1,:]), 50, log=True)
ax[1].hist(np.diff(ps2[1,:]), 50, log=True)
plt.show()
Correlation between deltat and amplitude
fig, ax = plt.subplots(1,2,figsize=(12,5))
ax[0].hist2d(ps1[0,:], np.diff(ps1[1,:], prepend=0), 50)
ax[1].hist2d(ps1[0,:], np.diff(ps2[1,:], prepend=0), 50)
plt.show()
Further debug, start and stop comparison
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection
start, stop = 1000, 1100
fig, ax = plt.subplots(figsize=(14,3))
ax.plot(np.arange(start, stop),wfDER[start:stop], '-k', marker='.', lw=1)
ax.axhline(0, ls='--', color='r', lw=1)
ax.axhline(30, ls='--', color='r', lw=1)
ybot, ytop = ax.get_ylim()
# get pulses within the choosen range
rlist1=[]
for i in range(ps1.shape[1]):
if (ps1[1,i]>start) & (ps1[1,i]<stop):
rlist1.append( Rectangle((ps1[1,i], ybot), ps1[2,i]-ps1[1,i], (ytop-ybot)/2 ) )
pc1 = PatchCollection(rlist1, facecolor='cyan', alpha=0.1)
rlist2=[]
for i in range(ps2.shape[1]):
if (ps2[1,i]>start) & (ps2[1,i]<stop):
rlist2.append( Rectangle((ps2[1,i], ybot+(ytop-ybot)/2), ps2[2,i]-ps2[1,i], (ytop-ybot)/2 ) )
pc2 = PatchCollection(rlist2, facecolor='yellow', alpha=0.1)
plt.gca().add_collection(pc1)
plt.gca().add_collection(pc2)
plt.show()