Source code for neurodsp.rhythm.swm

"""The sliding window matching algorithm for identifying recurring patterns in a neural signal."""

import numpy as np

from neurodsp.utils.decorators import multidim

###################################################################################################
###################################################################################################

[docs]@multidim() def sliding_window_matching(sig, fs, win_len, win_spacing, max_iterations=100, window_starts_custom=None, var_thresh=None): """Find recurring patterns in a time series using the sliding window matching algorithm. Parameters ---------- sig : 1d array Time series. fs : float Sampling rate, in Hz. win_len : float Window length, in seconds. This is L in the original paper. win_spacing : float Minimum spacing between windows, in seconds. This is G in the original paper. max_iterations : int, optional, default: 100 Maximum number of iterations of potential changes in window placement. window_starts_custom : 1d array, optional, default: None Custom pre-set locations of initial windows. var_thresh: float, optional, default: None Removes initial windows with variance below a set threshold. This speeds up runtime proportional to the number of low variance windows in the data. Returns ------- windows : 2d array Putative patterns discovered in the input signal. window_starts : 1d array Indices at which each window begins for the final set of windows. Notes ----- - This algorithm is originally described in [1]_. This re-implementation is a minimal, modified version of original ([2]_), which has more available options. - The `win_len` parameter should be chosen to be about the size of the motif of interest. The larger this window size, the more likely the pattern to reflect slower patterns. - The `win_spacing` parameter also determines the number of windows that are used. - If looking at high frequency activity, you may want to apply a highpass filter, so that the algorithm does not converge on a low frequency motif. - This version has the following changes to speed up convergence: 1. Each iteration is similar to an epoch, randomly moving all windows in random order. The original implementation randomly selects windows and does not guarantee even resampling. 2. New window acceptance is determined via increased correlation coefficients and reduced variance across windows. 3. Phase optimization / realignment to escape local minima. References ---------- .. [1] Gips, B., Bahramisharif, A., Lowet, E., Roberts, M. J., de Weerd, P., Jensen, O., & van der Eerden, J. (2017). Discovering recurring patterns in electrophysiological recordings. Journal of Neuroscience Methods, 275, 66-79. DOI: https://doi.org/10.1016/j.jneumeth.2016.11.001 .. [2] Matlab Code implementation: https://github.com/bartgips/SWM Examples -------- Search for reoccurring patterns using sliding window matching in a simulated beta signal: >>> from neurodsp.sim import sim_combined >>> components = {'sim_bursty_oscillation' : {'freq' : 20, 'phase' : 'min'}, ... 'sim_powerlaw' : {'f_range' : (2, None)}} >>> sig = sim_combined(10, fs=500, components=components, component_variances=(1, .05)) >>> windows, starts = sliding_window_matching(sig, fs=500, win_len=0.05, ... win_spacing=0.05, var_thresh=.5) """ # Compute window length and spacing in samples win_len = int(win_len * fs) win_spacing = int(win_spacing * fs) # Initialize window positions if window_starts_custom is None: window_starts = np.arange(0, len(sig) - win_len, win_spacing).astype(int) else: window_starts = window_starts_custom windows = np.array([sig[start:start + win_len] for start in window_starts]) # Compute new window bounds lower_bounds, upper_bounds = _compute_bounds(window_starts, win_spacing, 0, len(sig) - win_len) # Remove low variance windows to speed up runtime if var_thresh is not None: thresh = np.array([np.var(sig[loc:loc + win_len]) > var_thresh for loc in window_starts]) windows = windows[thresh] window_starts = window_starts[thresh] lower_bounds = lower_bounds[thresh] upper_bounds = upper_bounds[thresh] # Modified SWM procedure window_idxs = np.arange(len(windows)).astype(int) corrs, variance = _compute_cost(sig, window_starts, win_len) mae = np.mean(np.abs(windows - windows.mean(axis=0))) for _ in range(max_iterations): # Randomly shuffle order of windows np.random.shuffle(window_idxs) for win_idx in window_idxs: # Find a new, random window start _window_starts = window_starts.copy() _window_starts[win_idx] = np.random.choice(np.arange(lower_bounds[win_idx], upper_bounds[win_idx] + 1)) # Accept new window if correlation increases and variance decreases _corrs, _variance = _compute_cost(sig, _window_starts, win_len) if _corrs[win_idx].sum() > corrs[win_idx].sum() and _variance < variance: corrs = _corrs.copy() variance = _variance window_starts = _window_starts.copy() lower_bounds, upper_bounds = _compute_bounds(\ window_starts, win_spacing, 0, len(sig) - win_len) # Phase optimization _window_starts = window_starts.copy() for shift in np.arange(-int(win_len/2), int(win_len/2)): _starts = _window_starts + shift # Skip windows shifts that are out-of-bounds if (_starts[0] < 0) or (_starts[-1] > len(sig) - win_len): continue _windows = np.array([sig[start:start + win_len] for start in _starts]) _mae = np.mean(np.abs(_windows - _windows.mean(axis=0))) if _mae < mae: window_starts = _starts.copy() windows = _windows.copy() mae = _mae lower_bounds, upper_bounds = _compute_bounds(\ window_starts, win_spacing, 0, len(sig) - win_len) return windows, window_starts
def _compute_cost(sig, window_starts, win_n_samps): """Compute the cost, as correlation coefficients and variance across windows. Parameters ---------- sig : 1d array Time series. window_starts : list of int The list of window start definitions. win_n_samps : int The length of each window, in samples. Returns ------- corrs : 2d array Window correlation matrix. variance : float Sum of the variance across windows. """ windows = np.array([sig[start:start + win_n_samps] for start in window_starts]) corrs = np.corrcoef(windows) variance = windows.var(axis=1).sum() return corrs, variance def _compute_bounds(window_starts, win_spacing, start, end): """Compute bounds on a new window. Parameters ---------- window_starts : list of int The list of window start definitions. win_spacing : float Minimum spacing between windows, in seconds. start, end : int Start and end edges for the possible window. Returns ------- lower_bounds, upper_bounds : 1d array Computed upper and lower bounds for the window position. """ lower_bounds = window_starts[:-1] + win_spacing lower_bounds = np.insert(lower_bounds, 0, start) upper_bounds = window_starts[1:] - win_spacing upper_bounds = np.insert(upper_bounds, len(upper_bounds), end) return lower_bounds, upper_bounds