# -*- coding: utf-8 -*-
# Copyright (C) 2017-2023 Phillip Alday <me@phillipalday.com>
# License: BSD (3-clause)
"""MNE-based functionality not further categorized."""
from collections import namedtuple # noqa: I100
import matplotlib.pyplot as plt
import mne # noqa: F401
import numpy as np
import pandas as pd
from scipy import stats
from scipy.ndimage import center_of_mass
from scipy.signal import argrelmin, savgol_filter
IafEst = namedtuple('IAFEstimate',
['PeakAlphaFrequency', 'CenterOfGravity', 'AlphaBand'])
[docs]def savgol_iaf(raw, picks=None, # noqa: C901
fmin=None, fmax=None,
resolution=0.25,
average=True,
ax=None,
window_length=11, polyorder=5,
pink_max_r2=0.9):
"""Estimate individual alpha frequency (IAF).
Parameters
----------
raw : instance of Raw
The raw data to do these estimations on.
picks : array-like of int | None
List of channels to use.
fmin : int | None
Lower bound of alpha frequency band. If None, it will be
empirically estimated using a polynomial fitting method to
determine the edges of the central parabolic peak density,
with assumed center of 10 Hz.
fmax : int | None
Upper bound of alpha frequency band. If None, it will be
empirically estimated using a polynomial fitting method to
determine the edges of the central parabolic peak density,
with assumed center of 10 Hz.
resolution : float
The resolution in the frequency domain for calculating the PSD.
average : bool
Whether to average the PSD estimates across channels or provide
a separate estimate for each channel. Currently, only True is
supported.
ax : instance of matplotlib Axes | None | False
Axes to plot PSD analysis into. If None, axes will be created
(and plot not shown by default). If False, no plotting will be done.
window_length : int
Window length in samples to use for Savitzky-Golay smoothing of
PSD when estimating IAF.
polyorder : int
Polynomial order to use for Savitzky-Golay smoothing of
PSD when estimating IAF.
pink_max_r2 : float
Maximum R^2 allowed when comparing the PSD distribution to the
pink noise 1/f distribution on the range 1 to 30 Hz.
If this threshold is exceeded, then IAF is assumed unclear and
None is returned for both PAF and CoG.
Returns
-------
IafEst : instance of ``collections.namedtuple`` called IAFEstimate
Named tuple with fields for the peak alpha frequency (PAF),
alpha center of gravity (CoG), and the bounds of the alpha band
(as a tuple).
Notes
-----
Based on method developed by
`Andrew Corcoran <https://zenodo.org/badge/latestdoi/80904585>`_.
In addition to appropriate software citation (Zenodo DOI or
git commit), please cite:
Corcoran, A. W., Alday, P. M., Schlesewsky, M., &
Bornkessel-Schlesewsky, I. (2018). Toward a reliable, automated method
of individual alpha frequency (IAF) quantification. Psychophysiology,
e13064. doi:10.1111/psyp.13064
"""
n_fft = int(raw.info['sfreq'] / resolution)
spectrum = raw.compute_psd(method="welch", picks=picks, n_fft=n_fft,
fmin=1., fmax=30.)
psd = spectrum.get_data()
freqs = spectrum.freqs
if ax is None:
fig = plt.figure() # noqa: F841
ax = plt.gca()
if average:
psd = np.mean(psd, axis=0)
if fmin is None or fmax is None:
if fmin is None:
fmin_bound = 5
else:
fmin_bound = fmin
if fmax is None:
fmax_bound = 15
else:
fmax_bound = fmax
alpha_search = np.logical_and(freqs >= fmin_bound,
freqs <= fmax_bound)
freqs_search = freqs[alpha_search]
psd_search = savgol_filter(psd[alpha_search],
window_length=psd[alpha_search].shape[0],
polyorder=10)
# argrel min returns a tuple, so we flatten that with [0]
# then we get the last element of the resulting array with [-1]
# which is the minimum closest to the 'median' alpha of 10 Hz
if fmin is None:
try:
left_min = argrelmin(psd_search[freqs_search < 10])[0][-1]
fmin = freqs_search[freqs_search < 10][left_min]
except IndexError:
raise ValueError("Unable to automatically determine lower end of alpha band.") # noqa: 501
if fmax is None:
# here we want the first element of the array which is closest to
# the 'median' alpha of 10 Hz
try:
right_min = argrelmin(psd_search[freqs_search > 10])[0][0]
fmax = freqs_search[freqs_search > 10][right_min]
except IndexError:
raise ValueError("Unable to automatically determine upper end of alpha band.") # noqa: 501
psd_smooth = savgol_filter(psd,
window_length=window_length,
polyorder=polyorder)
alpha_band = np.logical_and(freqs >= fmin, freqs <= fmax)
slope, intercept, r, p, se = stats.linregress(np.log(freqs),
np.log(psd_smooth))
if r**2 > pink_max_r2:
paf = None
cog = None
else:
paf_idx = np.argmax(psd_smooth[alpha_band])
paf = freqs[alpha_band][paf_idx]
cog_idx = center_of_mass(psd_smooth[alpha_band])
try:
cog_idx = int(np.round(cog_idx[0]))
cog = freqs[alpha_band][cog_idx]
except ValueError:
cog = None
# set PAF to None as well, because this is a pathological case
paf = None
if ax:
plt_psd, = ax.plot(freqs, psd, label="Raw PSD")
plt_smooth, = ax.plot(freqs, psd_smooth, label="Smoothed PSD")
plt_pink, = ax.plot(freqs,
np.exp(slope * np.log(freqs) + intercept),
label='$1/f$ fit ($R^2={:0.2}$)'.format(r**2))
try:
plt_search, = ax.plot(freqs_search, psd_search,
label='Alpha-band Search Parabola')
ax.legend(handles=[plt_psd, plt_smooth, plt_search, plt_pink])
except UnboundLocalError:
# this happens when the user fully specified an alpha band
ax.legend(handles=[plt_psd, plt_smooth, plt_pink])
ax.set_ylabel("PSD")
ax.set_xlabel("Hz")
return IafEst(paf, cog, (fmin, fmax))
[docs]def attenuation_iaf(raws, picks=None, # noqa: C901
fmin=None, fmax=None,
resolution=0.25,
average=True,
ax=None,
savgol=False,
window_length=11, polyorder=5,
flat_max_r=0.98):
"""Estimate individual alpha frequency (IAF).
Parameters
----------
raws : list-like of Raw
Two Raws to calculate IAF from difference (attenuation) in PSD from.
picks : array-like of int | None
List of channels to use.
fmin : int | None
Lower bound of alpha frequency band. If None, it will be
empirically estimated using a polynomial fitting method to
determine the edges of the central parabolic peak density,
with assumed center of 10 Hz.
fmax : int | None
Upper bound of alpha frequency band. If None, it will be
empirically estimated using a polynomial fitting method to
determine the edges of the central parabolic peak density,
with assumed center of 10 Hz.
resolution : float
The resolution in the frequency domain for calculating the PSD.
average : bool
Whether to average the PSD estimates across channels or provide
a separate estimate for each channel. Currently, only True is
supported.
ax : instance of matplotlib Axes | None | False
Axes to plot PSD analysis into. If None, axes will be created
(and plot not shown by default). If False, no plotting will be done.
savgol : False | 'each' | 'diff'
Use Savitzky-Golay filtering to smooth PSD estimates -- either applied
to either each PSD estimate or to the difference (i.e. the attenuation
estimate).
window_length : int
Window length in samples to use for Savitzky-Golay smoothing of
PSD when estimating IAF.
polyorder : int
Polynomial order to use for Savitzky-Golay smoothing of
PSD when estimating IAF.
flat_max_r: float
Maximum (Pearson) correlation allowed when comparing the raw PSD
distributions to each other in the range 1 to 30 Hz.
If this threshold is exceeded, then IAF is assumed unclear and
None is returned for both PAF and CoG. Note that the sign of the
coefficient is ignored.
Returns
-------
IafEst : instance of ``collections.namedtuple`` called IAFEstimate
Named tuple with fields for the peak alpha frequency (PAF),
alpha center of gravity (CoG), and the bounds of the alpha band
(as a tuple).
Notes
-----
Based on method developed by
`Andrew Corcoran <https://zenodo.org/badge/latestdoi/80904585>`_.
In addition to appropriate software citation (Zenodo DOI or
git commit), please cite:
Corcoran, A. W., Alday, P. M., Schlesewsky, M., &
Bornkessel-Schlesewsky, I. (2018). Toward a reliable, automated method
of individual alpha frequency (IAF) quantification. Psychophysiology,
e13064. doi:10.1111/psyp.13064
"""
# TODO: check value of savgol parameter
def psd_est(r):
n_fft = int(r.info['sfreq'] / resolution)
spectrum = r.compute_psd(method="welch", picks=picks, n_fft=n_fft,
fmin=1., fmax=30.)
psd = spectrum.get_data()
freqs = spectrum.freqs
return psd, freqs
psd, freqs = zip(*[psd_est(r) for r in raws])
assert np.allclose(*freqs)
if savgol == 'each':
psd = [savgol_filter(p,
window_length=window_length,
polyorder=polyorder) for p in psd]
att_psd = psd[1] - psd[0]
if average:
att_psd = np.mean(att_psd, axis=0)
psd = [np.mean(p, axis=0) for p in psd]
att_psd = np.abs(att_psd)
att_freqs = freqs[0]
if ax is None:
fig = plt.figure() # noqa: F841
ax = plt.gca()
if fmin is None or fmax is None:
if fmin is None:
fmin_bound = 5
else:
fmin_bound = fmin
if fmax is None:
fmax_bound = 15
else:
fmax_bound = fmax
alpha_search = np.logical_and(att_freqs >= fmin_bound,
att_freqs <= fmax_bound)
freqs_search = att_freqs[alpha_search]
# set the window to the entire interval
# don't use the name window_length because that's used as a
# parameter for the function as a whole
wlen = att_psd[alpha_search].shape[0]
psd_search = savgol_filter(att_psd[alpha_search],
window_length=wlen,
polyorder=10)
# argrel min returns a tuple, so we flatten that with [0]
# then we get the last element of the resulting array with [-1]
# which is the minimum closest to the 'median' alpha of 10 Hz
if fmin is None:
try:
left_min = argrelmin(psd_search[freqs_search < 10])[0][-1]
fmin = freqs_search[freqs_search < 10][left_min]
except IndexError:
raise ValueError("Unable to automatically determine lower end of alpha band.") # noqa: 501
if fmax is None:
# here we want the first element of the array which is closest to
# the 'median' alpha of 10 Hz
try:
right_min = argrelmin(psd_search[freqs_search > 10])[0][0]
fmax = freqs_search[freqs_search > 10][right_min]
except IndexError:
raise ValueError("Unable to automatically determine upper end of alpha band.") # noqa: 501
if savgol == 'diff':
att_psd = savgol_filter(att_psd,
window_length=window_length,
polyorder=polyorder)
alpha_band = np.logical_and(att_freqs >= fmin, att_freqs <= fmax)
r, p = stats.pearsonr(psd[0], psd[1])
if np.abs(r) > np.abs(flat_max_r):
paf = None
cog = None
else:
paf_idx = np.argmax(att_psd[alpha_band])
# print(att_psd[alpha_band])
# print(paf_idx)
# print(att_freqs[alpha_band])
paf = att_freqs[alpha_band][paf_idx]
cog_idx = center_of_mass(att_psd[alpha_band])
cog_idx = int(np.round(cog_idx[0]))
cog = att_freqs[alpha_band][cog_idx]
if ax:
sgnote = '(with SG-Smoothing)' if savgol == 'each' else ''
plt_psd1, = ax.plot(freqs[0], psd[0],
label="Raw PSD #1 {}".format(sgnote))
plt_psd2, = ax.plot(freqs[1], psd[1],
label="Raw PSD #2 {}".format(sgnote))
sgnote = '(with SG-Smoothing)' if savgol == 'diff' else ''
plt_att_psd, = ax.plot(att_freqs, att_psd,
label="Attenuated PSD {}".format(sgnote))
# plt_pink, = ax.plot(att_freqs,
# np.exp(slope * np.log(att_freqs) + intercept),
# label='$1/f$ fit ($R^2={:0.2}$)'.format(r**2))
ax.text(np.max(att_freqs) * 0.5, np.max(att_psd) * 0.67,
'Raw PSD Pearson $r={:0.2}$'.format(r))
try:
plt_search, = ax.plot(freqs_search, psd_search,
label='Alpha-band Search Parabola')
ax.legend(handles=[plt_psd1, plt_psd2, plt_att_psd, plt_search])
except UnboundLocalError:
# this happens when the user fully specified an alpha band
ax.legend(handles=[plt_psd1, plt_psd2, plt_att_psd])
ax.set_ylabel("PSD")
ax.set_xlabel("Hz")
return IafEst(paf, cog, (fmin, fmax))
[docs]def abs_threshold(epochs, threshold,
eeg=True, eog=False, misc=False, stim=False):
"""Compute mask for dropping epochs based on absolute voltage threshold.
Parameters
----------
epochs : instance of Epochs
The epoched data to do threshold rejection on.
threshold : float
The absolute threshold (in *volts*) to reject at.
eeg : bool
If True include EEG channels in thresholding procedure.
eog : bool
If True include EOG channels in thresholding procedure.
misc : bool
If True include miscellaneous channels in thresholding procedure.
stim : bool
If True include stimulus channels in thresholding procedure.
Returns
-------
rej : instance of ndarray
Boolean mask for whether or not the epochs exceeded the rejection
threshold at any time point for any channel.
Notes
-----
More precise selection of channels can be performed by passing a
'reduced' Epochs instance from the various ``picks`` methods.
"""
data = epochs.pick_types(eeg=eeg, misc=misc, stim=stim).get_data()
# channels and times are last two dimension in MNE ndarrays,
# and we collapse across them to get a (n_epochs,) shaped array
rej = np.any( np.abs(data) > threshold, axis=(-1, -2) ) # noqa: E201, E202
return rej
[docs]def retrieve(epochs, windows, items=None,
summary_fnc=dict(mean=np.mean), **kwargs):
"""Retrieve summarized epoch data for further statistical analysis.
Parameters
----------
epochs : instance of Epochs
The epoched data to extract windowed summary statistics from.
windows : dict of tuples
Named tuples defining time windows for extraction (relative to
epoch-locking event). Units are dependent on the keyword argument
scale_time. Default is milliseconds.
summary_fnc : dict of functions
Functions to apply to generate summary statistics in each time
window. The keys serve as column names.
items : ndarray | None
Items corresponding to the individual epoch / trials (for
e.g. repeated measure designs). Shape should be (n_epochs,). If
None (default), then item numbers will not be included in the
generated data frame.
kwargs :
Keyword arguments to pass to Epochs.to_data_frame. Particularly
relevant are ``scalings`` and ``scale_time``.
Returns
-------
dat : instance of pandas.DataFrame
Long-format data frame of summarized data
"""
df = epochs.to_data_frame(index=['epoch', 'time'], **kwargs)
chs = [c for c in df.columns if c not in ('condition')]
# the order is important here!
# otherwise the shortcut with items later won't work
factors = ['epoch', 'condition']
sel = factors + chs
df = df.reset_index()
id_vars = ['epoch', 'condition', 'win', 'wname']
if items is not None:
id_vars += ['item']
dat = pd.DataFrame(columns=id_vars)
for fnc_name, fnc in summary_fnc.items():
d = []
for w in windows:
temp = df[ df.time >= windows[w][0] ] # noqa: E201, E202
dfw = temp[ temp.time <= windows[w][1] ] # noqa: E201, E202
dfw_summary = dfw[sel].groupby(factors).apply(fnc)
if items is not None:
dfw_summary["item"] = items
dfw_summary["win"] = "{}..{}".format(*windows[w])
dfw_summary["wname"] = w
d.append(dfw_summary)
d = pd.concat(d)
# get rid of epoch and condition if they're already columns
# before we can move them from the index to columns
d.drop('epoch', axis=1, inplace=True, errors='ignore')
d.drop('condition', axis=1, inplace=True, errors='ignore')
d.reset_index(inplace=True)
d = pd.melt(d,
id_vars=id_vars,
value_vars=chs,
var_name="channel",
value_name=fnc_name)
dat = pd.merge(dat, d, how='outer')
return dat