Preproccesing, Feature Extraction and Classification - ManuJackPel/Brain-Powered-2022-2023 GitHub Wiki
Background Terminology
In the context of Brain-Computer Interfaces (BCI), the process of transforming raw brain signals into meaningful commands involves several key steps: preprocessing, feature extraction, and classification. Preprocessing involves enhancing the quality of raw brain signals. Brain signals, especially EEG, will be extremely noisy. This noise can stem from muscle activity, eye movements or surrounding electronics. The goal is to improve the signal-to-noise ratio and ensure that subsequent stages work with the highest quality data. Feature extraction occurs after preprocessing, the purpose being to extract the relevant information from the processed signals. Brain signals contain a lot of data, but not all of it is useful for the task at hand. Feature extraction aims to identify and exttract the most important characteristics. In the context of EEG data this can include filtering for a specific band(alpha, gamma, mu activity) or transforming the data to a different domain, for example when using a fast fourier transform. Lastly, classification aims to take the extracted features and turn them into useful commands. Classification involves using models to categorize the extracted features into distinct classes or output, in our case the models are trained on labeled data. For instance, different patterns of brain activity could be classified as "move left," "move right," or "stay still."
Pipeline Used
The pipeline used in the academic year of 2022-2023 was relatively simple. Participants were tasked with opening and closing their left and right hand. The mu band activity of these actions were taken. A common spatial pattern(CSP) was trained and the logarithmic variance was taken. The two CSP features with the highest variance were taken. We then made use of several classifiers including Support Vector Machines(SVM), Linear Discriminant Analysis(LDA) and K-Nearest Neighbhours(KNN).
This pipeline was adapted from Dr. Marijn van Vliet. We recommend consulting his videos and source code for a better explanation. We will only indicate the main differences between his setup and ours. Notably, we use motor movements as opposed to motor imagery but this does not make much of a difference in the pipeline.
A link to the source code can be found here. And a link to the video lectures can be found below.
Importing Data
Here's a revised version of your writing while retaining the style:
The first distinction is how we import the data. The original code taps uses a public dataset from a BCI competition, housed within a Matlab file. Consequently. Meanwhile we rely on data from our experimental paradigm, which was recorded by EventIDE. EventIDE archives the recorded data using two distinct files. The primary file showcases raw data, where each column signifies a channel, and rows depict voltage at specific times. Notably, this file includes an additional 'marker' row, pinpointing the onset of a condition. This row will not indicate the exact type of condition. This is fixed by using the second file, termed the 'Base Report'. This report holds the relevant condition information and some supplementary metadata. To craft a comprehensive pandas DataFrame, one must combine data from both these files.
The code you see is from 2022-2023. But, depending on how you've set up EventIDE, it might not fit perfectly. One key point here is the LEN_SLICE variable. EventIDE sometimes misses samples during a condition. This means two conditions, even if they last the same amount of time, could have different sample counts. So, to keep things consistent, we use LEN_SLICE to set a fixed sample size, even if that means cutting some off.
import pandas as pd
import numpy as np
# Constants
BASE_REPORT_PATH = '/eventide_data/BaseReport.csv'
RAW_DATA_PATH = '../eventide_data/manumm/SignalData_Manu-MM.csv'
LEN_SLICE = 820 # length of file in indices
# Importing Base Report
base_report = pd.read_csv(BASE_REPORT_PATH, skiprows=8, index_col=None)
base_report.rename(columns={"Trial Number": 'trial number',
"Hand Condition": 'condition'}, inplace=True)
# Extract the trial conditions as integers
trial_condition = base_report['condition'].apply(int).to_numpy()
# Import raw data and adjust column names
raw_data = pd.read_csv(RAW_DATA_PATH, delimiter='\t', skiprows=1)
raw_data.reset_index(inplace=True)
raw_data.columns = ['Timestamp', 'Fp1', 'Fp2', 'F3', 'F4', 'C3', 'C4', 'P3', 'P4',
'O1', 'O2', 'F7', 'F8', 'T3', 'T4', 'T5', 'T6', 'Fz', 'Cz',
'Pz', 'Hardware timestamp', 'marker', 'marker time', 'marker type', 'bs1']
# Determine indices for task start and end based on markers
task_start_indices = raw_data[raw_data['marker type'] == 'trial'].index
task_end_indices = raw_data[raw_data['marker type'] == 'Pause'].index
# Extract relevant data for each trial and sort by class
trial_data = {'cl1': [], 'cl2': []}
for trial, (start, end, _class) in enumerate(zip(task_start_indices, task_end_indices, trial_condition)):
trial_interval = raw_data[start:end].drop(columns=['Timestamp', 'Hardware timestamp', 'bs1',
'marker', 'marker time', 'marker type'])
trial_interval.reset_index(drop=True, inplace=True)
# Classify the data based on its type and slice to 820 rows
if _class == 0:
trial_data['cl1'].append(trial_interval.iloc[:LEN_SLICE])
else:
trial_data['cl2'].append(trial_interval.iloc[:LEN_SLICE])
Making Common Spatial Patterns easier to use
The code for the Common Spatial Pattern(CSP) was changed to improve readability and ease of use. We moved all the functions of the orginal code to a class. This way the user only has to provide the samples between two conditons to initialize the CSP Object. After initilization new samples can be passed to the CSP object, which then applies the mixing matrix and returns the components.
class CSP():
def __init__(self, cl_one_data, cl_two_data):
self.n_channels, self.n_samples, self.n_trials = cl_one_data.shape
self.W = self.csp(cl_one_data, cl_two_data)
def csp(self, trials_r, trials_f):
'''
Calculate the CSP transformation matrix W.
arguments:
trials_r - Array (channels x samples x trials) containing right hand movement trials
trials_f - Array (channels x samples x trials) containing foot movement trials
returns:
Mixing matrix W
'''
cov_r = self.cov(trials_r)
cov_f = self.cov(trials_f)
P = self.whitening(cov_r + cov_f)
B, _, _ = linalg.svd( P.T.dot(cov_f).dot(P) )
W = P.dot(B)
return W
def cov(self, trials):
''' Calculate the covariance for each trial and return their average '''
ntrials = trials.shape[2]
covs = [ trials[:,:,i].dot(trials[:,:,i].T) / self.n_samples for i in range(ntrials) ]
return np.mean(covs, axis=0)
def whitening(self, sigma):
''' Calculate a whitening matrix for covariance matrix sigma. '''
U, l, _ = linalg.svd(sigma)
return U.dot( np.diag(l ** -0.5) )
def __call__(self, trials):
''' Apply a mixing matrix to each trial (basically multiply W with the EEG signal matrix)'''
ntrials = trials.shape[2]
trials_csp = np.zeros((self.n_channels, self.n_samples, self.n_trials))
for i in range(ntrials):
trials_csp[:,:,i] = self.W.T.dot(trials[:,:,i])
return trials_csp
# Calculating the CSP matrix
csp = CSP(trials_condition_a, trials_condition_b)
# Applying the matrix to new data
csp_components = csp(new_trials)
Use of Scikit learn
Instead of writing the classifiers from scratch we highly advise to use the scikit-learn library. The library is the standard Python library for classic machine learning. It allows users to easily fit classifers and use them to predict future data. Other benefits include confidence that the classifiers work and a wide array of documentation. Below is an example of how the accuracy of different classifiers can be evaluated during offline classification.
from sklearn.svm import SVC
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import numpy as np
# Prepare the data
# Concatenate data for each class and reshape for classifier input.
# Create target labels for training and test data.
train_data = np.transpose(np.concatenate((train[cl1], train[cl2]), axis=1))
test_data = np.transpose(np.concatenate((test[cl1], test[cl2]), axis=1))
train_labels = np.concatenate((np.ones(train[cl1].shape[1]), np.zeros(train[cl2].shape[1])))
test_labels = np.concatenate((np.ones(test[cl1].shape[1]), np.zeros(test[cl2].shape[1])))
# Create SVM, LDA, and KNN models.
svm = SVC(kernel='linear')
lda = LinearDiscriminantAnalysis()
knn = KNeighborsClassifier(n_neighbors=3)
# Fit the models on the training data.
for model in [svm, lda, knn]:
model.fit(train_data, train_labels)
# Predictions
# Predict labels for test and train data using the trained models.
predictions = {
'svm': {
'test': svm.predict(test_data),
'train': svm.predict(train_data),
},
'lda': {
'test': lda.predict(test_data),
'train': lda.predict(train_data),
},
'knn': {
'test': knn.predict(test_data),
'train': knn.predict(train_data),
}
}
# Calculate accuracies
# Compute accuracy scores for both training and test data for each model.
accuracies = {}
for model_name, model_data in predictions.items():
accuracies[model_name] = {}
# Compute accuracies for both train and test data
accuracies[model_name]['train'] = accuracy_score(train_labels, model_data['train'])
accuracies[model_name]['test'] = accuracy_score(test_labels, model_data['test'])
# Display results
# Print accuracy scores for each classifier on train and test datasets.
for model_name, model_data in accuracies.items():
print(f"{model_name.upper()} Accuracy (Train Data): {model_data['train']}")
print(f"{model_name.upper()} Accuracy (Test Data): {model_data['test']}")