"""
prep
==============
A module for io and preprocessing
"""
[docs]def emg_io(emg_fName, skiprows, sep = ' ', emg_chs_selected='all'):
"""
this function takes .txt format emg file and renders to a pandas dataframe for further processing.
Parameters
-----------
emg_fName : string
emg_fName should be in .txt or in .csv form.
emg_chs_selected : list of int
the index of emg channels (columns) that are supposed to be used in further analysis, defaults to 'all'
sep : string
the seperator to be specified when reading the txt file with pandas.read_csv
skiprows : int
rows to be skipped. This applies for data containing emg information at the begining of the data file.
Returns
-------
pandas's dataframe
Examples
--------
>>>emg_fName = r'D:\Data\RuiJinFirstStroke11Jan\EMG\subj1_healthy_session1.txt'
>>>emg_io(emg_fName, skiprows = 3, emg_chs_selected='all')
See Also
--------
Warnings
--------
Notes
--------
The .txt file or .csv file should not have headers. If so, please use skiprow to trim them. The first column should be 0 in the emg_chs_selected parameter
"""
import pandas as pd
emg_data = pd.read_csv(emg_fName, header = None, skiprows=skiprows, sep = sep, engine = 'python')
if emg_chs_selected == 'all':
return emg_data
else:
emg_data = emg_data[emg_chs_selected]
return emg_data
[docs]def eeg_emg_alignment(eeg_fName, emg_df, sfreq_final, emg_freq, report_fName = None, start_marker = True, fir=[1,None], PREP=True,montage = 'standard_1020'):
"""
This function takes .set format for eeg and txt format for emg.
Parameters
-----------
eeg_fName : string
eeg_fName should be in .set form
emg_fName : string
emg_fName should be in .txt form
report_fName: string
optional, default to None which will not generate a report for the reprocessing result. When reproty_fName is specified, two reports will be generated at the suggested directory including one in HTML,
and one in .h5 format. The .h5 format is an edittable report.
montage: string
The defaut montage is set to standard 1020 montage
start_marker : boolean
if start_marker is true, the segment before the first marker will be cropped, defaults to True
fir: list
the lower and upper boundary of filter. If the boundary is set to None, then the filter become high-pass or low-pass filter.
PREP: boolean
the EEG preprocessing pipeline process, defauts to True. It can de deactivated when set to false.
emg_chs_selected : list of int
the index of emg channels (columns) that are supposed to be used in further analysis, defaults to 'all'
Returns
-------
mne raw object containing aligned eeg and emg data
Examples
--------
>>>eeg_fName = r'D:\Data\RuiJinFirstStroke11Jan\EEG\subj1_healthy_session1.set'
>>>emg_fName = r'D:\Data\RuiJinFirstStroke11Jan\EMG\subj1_healthy_session1.txt'
>>>emg_df = pd.read_csv(emg_fName, header = None, skiprows=3,
sep = ' ',engine = 'python')
>>>eeg_emg_alignment(eeg_fName,emg_df,emg_freq=1000,sfreq_final=500,report_fName=None,PREP=False)
See Also
--------
Warnings
--------
Notes
--------
Make sure the eeg recording are no less than the emg recording. The current version of this function crop emg signal
with respect to emg in order to keep these data aligned.
"""
import mne,os
import pandas as pd
raw_eeg = mne.io.read_raw_eeglab(eeg_fName)
raw_eeg.set_montage(montage)
if start_marker==True:
eeg_onset=mne.events_from_annotations(raw_eeg)[0][0,0]
raw_eeg.crop(tmin = eeg_onset/raw_eeg.info["sfreq"])
if report_fName != None:
#### report raw properties ###
report = mne.Report(verbose=True)
report_fname_editable = os.path.join(report_fName,'.h5')
report_fname_2check = os.path.join(report_fName,'.html')
fig_raw = raw_eeg.plot(scalings=4e-5,n_channels=32,duration =8)
fig_raw_psd = raw_eeg.plot_psd()
report.add_figs_to_section(fig_raw, captions='raw_data', section='raw')
report.add_figs_to_section(fig_raw_psd, captions='raw_psd', section='raw')
raw_eeg.filter(fir[0], h_freq= fir[1])
if PREP ==True:
#### prep steps, require pyprep
eeg_index = mne.pick_types(raw_eeg.info, eeg=True, eog=False, meg=False,emg=False)
ch_names = raw_eeg.info["ch_names"]
ch_names_eeg = list(np.asarray(ch_names)[eeg_index])
sample_rate = raw_eeg.info["sfreq"]
montage_kind = "standard_1020"
montage = mne.channels.make_standard_montage(montage_kind)
raw_eeg_copy=raw_eeg.copy()
# Fit prep
prep_params = {'ref_chs': ch_names_eeg,
'reref_chs': ch_names_eeg,
'line_freqs': np.arange(50, sample_rate/2, 50)}
prep = PrepPipeline(raw_eeg_copy, prep_params, montage)
prep.fit()
raw_eeg = prep.raw
if report_fName != None:
fig_raw = raw_eeg.plot(scalings=4e-5,n_channels=32,duration=8)
fig_raw_psd = raw_eeg.plot_psd()
report.add_figs_to_section(fig_raw, captions='prep signal space', section='prep')
report.add_figs_to_section(fig_raw_psd, captions='prep_psd', section='prep')
report.add_htmls_to_section(htmls =[str(prep.interpolated_channels),str(prep.noisy_channels_original['bad_all'])
,str(prep.still_noisy_channels)],
captions= ['interpolated channels','bad channels detected', "still noisy channels"],
section='prep', replace=False)
report.save(report_fname_2check, overwrite=True)
report.save(report_fname_editable, overwrite=True)
eeg_len = raw_eeg._data.shape[1]
emg_df = emg_df.iloc[0:round(emg_freq/raw_eeg.info["sfreq"])* eeg_len]
ch_types = ['emg']*len(emg_df.columns)
ch_names = []
for i in range(len(emg_df.columns)):
ch_names.append('emg'+str(i+1))
info = mne.create_info(ch_names=ch_names, sfreq=emg_freq, ch_types=ch_types)
raw_emg = mne.io.RawArray(emg_df.T, info)
raw_emg.resample(sfreq=sfreq_final)
raw_emg.info['highpass'] = fir[0]
raw_hybrid = raw_eeg.copy().add_channels([raw_emg])
return raw_hybrid
[docs]def epochs_basedon_emg(raw_hybrid, ref_emg, windowLen, step=100, threshold =0.5,report_fName=None,
add_to_existed_report=False, reject_criteria = dict(eeg=30e-5,emg=1e10),
flat_criteria = dict(eeg=5e-7),tmin = 0.0, tmax = 3.0, save_fName=None):
"""
this function takes .set format for eeg and txt format for emg.
Parameters
-----------
raw_hybrid: mne raw object
The raw object containing aligned eeg and emg
ref_emg: list of string
the emg channels that the algorithm refers to. They should be channels in input raw_hybrid. The algorithm will
consider 'the length of the string' as number of movements.
report_fName: string
optional, default to None which will not generate a report for the epoching result.
When reproty_fName is specified, two reports will be generated at the suggested directory including one in HTML,
and one in .h5 format. The .h5 format is an edittable report.
add_to_existed_report: boolean
It is supposed to be true when one wants to add the epoching report into an existed report.
windowLen: int
rough duration estimation of the movement
step: int
equivalent to resolution, defaults to 100
threshold: float
the threshold should be in [0,1], represnting the quantile of the energy for all windows
reject_criteria: dict
epochs containing eeg and emg that exceed rejection criteria will be rejected. defaults to dict(eeg=30e-5,emg=1e10)
flat_criteria: dict
epochs containing eeg and emg whose amplitude are lower than flat criteria will be rejected.
Defaults to dict(eeg=1e-6). If reject_criteria and flat_criteria are both set to None, then no epochs rejection
would take place
tmin: float
the begining of the epochs with respect to the movement onsets
tmax: float
the end of the epochs with respect to the movement onsets
savefName: string
the path and filename of the saving epochs. It defaults to None, that is the epochs would not be saved
Returns
-------
mne epochs
Examples
---------
See Also
--------
eeg_emg_alignment
Warnings
--------
Notes
--------
"""
import numpy as np
import mne
onsets = np.array([])
descriptions = np.array([])
for i in range(len(ref_emg)):
raw2cut = raw_hybrid.copy()
raw2cut = raw2cut.pick_channels([ref_emg[i]])
df = raw2cut.to_data_frame()[ref_emg[i]].abs()
emgEnergy = df.rolling(windowLen, win_type='boxcar').sum().dropna()[::step]
possibleOnsets =np.array(emgEnergy[emgEnergy>emgEnergy.quantile(threshold)].index.tolist())-windowLen
newOnsets=np.array(firstOnsetD(possibleOnsets))/1000 # convect to seconds
onsets = np.concatenate((onsets,newOnsets))
descriptions =np.concatenate(( descriptions, ['movement'+str(i+1)+'Onset']*len(newOnsets)))
durations = np.zeros_like(onsets)
annot = mne.Annotations(onset=onsets, duration=durations,
description=descriptions,
orig_time=raw2cut.info['meas_date'])
#change to raw2cut see effetcs
raw_validate = raw_hybrid.copy()
raw_validate.set_annotations(annot)
events,events_id = mne.events_from_annotations (raw_validate)
# Here we simply thresholding the epochs to get rid of noisy segment characterized by huge fluctuation.
if reject_criteria ==None and flat_criteria==None:
epochs = mne.Epochs(raw_validate,events,tmin=0.0,tmax=3.0,baseline=(0, 0))
else:
epochs = mne.Epochs(raw_validate,events,tmin=tmin,reject=reject_criteria, flat=flat_criteria,tmax=tmax,
baseline=(0, 0))
if save_fName!=None:
epochs.save(save_fName,overwrite=True)
return epochs
[docs]def firstOnsetD(possibleOnsets):
"""
an auxilary function that identify true movement onsets for all the possible onsets identified based on energy threshold.
Parameters
----------
possibleOnsets: list
list of all the possible onsets
Returns
-------
true movement onsets
Examples
---------
See Also
--------
eeg_emg_alignment
Warnings
--------
Notes
--------
"""
n=len(possibleOnsets)
st_idx = 0
onsets = []
while st_idx < n-1:
end = st_idx + 1
dif = possibleOnsets[end] - possibleOnsets[st_idx]
while end < n - 1 and possibleOnsets[end + 1] - possibleOnsets[end] == dif:
end += 1
onsets.append(possibleOnsets[st_idx])
st_idx = end+1
return onsets