Communication between processes - ManuJackPel/Brain-Powered-2022-2023 GitHub Wiki
Sharing information between tasks in a Brain-Computer Interface depends on where and how the tasks are set up. If tasks are in the same script or on the same computer, they can share information more easily. But if they're on different computers or use different internet connections, it can be trickier. This can affect how well your interface works. We'll explore this more in the next sections of this manual.
Communicating when written in the same Python scripts
Navigating concurrency and parallelism in Python can be complex due to the Global Interpreter Lock (GIL), which allows only one thread to execute at a time. Yet, Python provides three methods to work around this:
-
Multithreading: The simplest method, enabling several operations to run concurrently within the same program space. Bear in mind that the GIL means only one thread can execute at a time. For more information, refer to Python's threading documentation.
-
Asyncio: A more complicated method that doesn't facilitate easy parallelism but can handle asynchronous I/O operations more efficiently. Learn more from Python's asyncio documentation.
-
Multiprocessing: This is a more complex method but allows for real parallelism as it uses separate processes instead of threads. Each process runs in its own Python interpreter on multiple CPUs. Additionally, the multiprocessing module provides certain datatypes for easier communication between processes. We opted for multiprocessing in our setup, and a detailed explanation and implementation will follow. Explore Python's multiprocessing documentation for more insight.
Below you can find a simplified code snippet on how we utilized Multiprocessing using Queues. The important take away is that you have to process running in parallel. One for acquiring the data and one for processing it. First we define in a function what we want each process to do. Then we initialize them into multiprocessing Process variables. Once we want the processes to begin we use the .start() method. We use a Queue with a maxsize of 5, the maxsize is arbritary as you should never really data building up in your Queue.
from multiprocessing import Process, Queue
import time
import random
# Function to simulate a long-running data acquisition
def data_acquisition(queue_to_store_data):
for _ in range(10000):
time.sleep(1/250) # Simulate time taken to acquire data
data = random.randint(1, 100) # Simulate acquired data
print(f"Data acquired: {data}")
queue_to_store_data.put(data) # Put the data in the queue
# Function to process data
def data_processing(queue_with_data):
while True:
if not queue_with_data.empty():
data = queue_with_data.get() # Get the data from the queue
processed_data = data ** 2 # Process data
print(f"Data processed: {processed_data}")
if __name__ == "__main__":
data_queue = Queue(maxsize=5) # Initialize a Queue to store data
# Define new processes that call the data_acquisition and data_processing functions
data_acquisition_process = Process(target=data_acquisition, args=(data_queue,))
data_processing_process = Process(target=data_processing, args=(data_queue,))
# Start the processes
data_acquisition_process.start()
data_processing_process.start()
Common Issues
Timing
When not keeping track of the timing of processes signifcant details can build up. This kind of delay can arise when one process is significantly slower than another. If Process 1 is producing data faster than Process 2 can consume and process it, this can lead to a backlog of unprocessed data. In context of BCIs, this makes it that your classifications become more and more delayed. Therefore data processing should always be quicker then data acquisition. For example if you are processing arrays of size 250, the time it takes to get 250 values should be higher then the time it takes to process those values.
Datatypes
Always be aware of the types of data that you're passing between processes. Python's multiprocessing uses the pickle module under the hood to serialize and deserialize data that's being transferred between processes. However, not all data types can be serialized by pickle.
A common issue arises with ctypes, a library for Python that provides C-compatible data types. ctypes can't be pickled, and trying to pass a ctypes object between processes will raise an error. This is where understanding the nature of data serialization and the constraints of pickle can save a lot of debugging time.
To get around this issue, you could:
- Convert the data to a type that can be pickled before passing it to another process. This might involve changing your data structures or the way your code operates.
- Alternatively, you could initialize the variable in the process where you intend to use it. This way, you avoid having to pass it between processes.
Communicating on different devices with same internet
There are various reasons why you might not be able to run all processes in a single Python script. For example, if a process is running outside Python or on a different computer, using Python's multiprocessing library will not be possible. In such situations, where both devices are connected to the same Wi-Fi network, the Lab Streaming Layer (LSL) protocol can be utilized. LSL is a protocol used for handling real-time streaming of time-series data across a network, making it particularly well-suited for handling data in research and experimentation contexts. Within Python LSL is handled by the pylsl library. However, it's important to note that LSL will not work over secured networks like eduroam. We recommend using a personal hotspot.
With LSL, you can create 'inlets' which receive the data, and 'outlets' which transmit that data. Alternatively, when two processes are on the same device you just can read and write to a CSV. In theory, this would be slower then LSL, but in practice this likely will not matter when dealing with small amount of data.
When creating an outlet in pylsl, you need to specify a few details:
- name: This is a unique identifier for your stream. It should be something that describes what the data represents. For example, 'EEG' or 'SineWave'.
- type: This describes the type of data your stream is handling. It could be 'EEG', 'EyeTracking', 'MotionCapture', 'Audio', etc. This makes it easier for outlets to find the correct stream.
- channel_count: This is the number of channels of data. For example, if your EEG setup has 64 electrodes, this value would be 64.
- nominal_srate: This is the sampling rate (in Hz) of your stream. If your stream doesn't have a fixed sampling rate, you can specify this as pylsl.cf_irregular_rate.
- channel_format: This is the format of the data in your stream. It could be one of several formats, including 'float32', 'double64', 'int32', etc. This determines how much memory each data point uses.
- source_id: This is an additional unique identifier for the stream. It should remain the same across different sessions of the same experiment or setup.
Creating an outlet is a bit simpler. You just need to specify which stream you want to connect to. In most documentation, you do this by specifying the type of stream you want. However, we recommend using ad unique identifier like the name or source_id to avoid confusion when multiple inlets are present. To find and connect to a stream, we use something known as a 'resolver'. This function essentially searches for streams that match the criteria you have set, returning a list of suitable matches. You then create an inlet by passing one of the found streams to the StreamInlet constructor. Now you can pull data from this inlet using its pull_sample method. This will return a list (sample) containing the current data from all channels of the stream, as well as a timestamp indicating when the sample was taken.
from pylsl import StreamInfo, StreamOutlet
import numpy as np
import time
# Set up the LSL stream
# name: 'RandomNoise'
# type: 'EEG'
# channel_count: 2
# freq_rate: 100
info = StreamInfo('RandomNoise', 'EEG', 2, 100, 'float32', 'random_noise_id')
outlet = StreamOutlet(info)
# Set up the noise parameters
sample_rate = 100 # sample rate of the LSL stream
while True:
sample = np.random.uniform(-1, 1, 2) # generate 2 random values between -1 and 1
# Push the sample into the LSL stream
outlet.push_sample(sample)
# Simulate time it takes to generate next sample
time.sleep(1.0 / sample_rate)
from pylsl import StreamInlet, resolve_stream
import time
# Wait until a stream named 'RandomNoise' is available
print("Looking for 'RandomNoise' stream...")
streams = resolve_stream('name', 'RandomNoise')
# Create a new inlet to read from the stream
inlet = StreamInlet(streams[0])
print("Connected to 'RandomNoise' stream.")
# Continuously read samples from the inlet
while True:
sample, timestamp = inlet.pull_sample()
print(f"Received sample at time {timestamp}: {sample}")
Common Issues
Lingering Stream Outlets when Multiprocessing
With multiprocessing, StreamOutlets might not shut down correctly, causing the script to run in the background. This can lead to data coming from the old stream when a new inlet is initialized. You can avoid this by ensuring processes are terminated properly. If a lingering stream outlet is found, manually terminate it using tools like Windows' Task Manager or Linux's 'kill' command.
Issues with Connectivity
As LSL works on the network level, it's subject to the same connectivity and network issues that affect other networked applications. Issues can arise due to firewall settings, network configurations, or hardware problems.
Communicating on different devices without same internet
Sometimes, two devices are unable to share the same internet connection, which eliminates the possibility of using multiprocessing and the Lab Streaming Layer (LSL) for data sharing. For example, when both a drone and an EEG recorder are connected to the computer through wifi, like in our case. However, there are several other methods that can be used for communication between devices, such as Bluetooth or TCP/IP. For our setup, we chose to use TCP/IP communication via an Ethernet cable. This method was chosen because we felt that it would be quicker and stabler relative to Bluetooth. Using Ethernet can often provide a more stable and faster connection than wireless alternatives.
In order to accomplish a TCP connection between two laptops through ethernet, a few steps have to be undertaken.
-
Wiring: The laptops have to be connected directly using an ethernet cable. A USB ethernet dongle can be used if one or both laptops lack an ethernet port.
-
IP address setup: In order for the laptops to establish a connection, their ethernet adapters need to be set to a static IPV4 address. Importantly, the network portion of both addresses needs to be the same. The network ID of the IPV4 address is the first three numbers, and the host ID is the last number, which identifies the computer on the network. For example, one might choose the address 192.168.1.1 for one computer and 192.168.1.2 for the other, assuming that these addresses do not coincide with any other devices connected to the computers. In any case, the subnet mask of the ethernet adapters needs to be set to 255.255.255.0.
-
Setting the network priorities: If you want the computers to be connected to the drone and EEG device over Wi-Fi, but communicate with each other over Ethernet, you need to ensure the routing tables on the computers are configured correctly. This might require changing the 'metric' or 'priority' of the Ethernet connection to be higher than the Wi-Fi connection. The exact steps to do this will depend on your operating system.
-
Establishing a TCP connection using Python socket: When using TCP, there is always a host and a client. Though communication between host and client can be bidirectional, it is conventional practise to pick the host as the computer sending the most data. In our case, this is the laptop sending drone commands to the computer that connects to the drone. The following are example codes that use Python's socket library:
Server code (to run on the host computer):
import socket
# create a socket object
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# get local machine name (replace with your manually set IP)
host = "192.168.1.1"
port = 9999
# bind to the port
serversocket.bind((host, port))
# queue up to 5 requests
serversocket.listen(5)
while True:
# establish a connection
clientsocket, addr = serversocket.accept()
print("Got a connection from %s" % str(addr))
msg = 'Thank you for connecting'+ "\r\n"
clientsocket.send(msg.encode('ascii'))
clientsocket.close()
Client code (to run on the second computer):
import socket
# create a socket object
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# get local machine name (replace with the server's manually set IP)
host = "192.168.1.1"
port = 9999
# connection to hostname on the port.
s.connect((host, port))
# receive no more than 1024 bytes
msg = s.recv(1024)
s.close()
print (msg.decode('ascii'))