Advanced concepts - REHYB/LegoArmExoskeleton GitHub Wiki

WiFi speed (Decrease latency)

In order to further decrease the latency in the system the WiFi router can be set to 5 GHz. If the router is in close proximity to both the PC which runs the Unity program and the Raspberry Pi which controls the exoskeleton, this change would result in faster communication. However, if the devices are located further from each other, the regular 2,4 GHz frequency is recommended to be used. How to change the frequency of the router depends on the model, so visit the documentation or support page of your router.

Override SendMsg method in UdpHost.cs

If you want to send not only string messages, but other, more complicated object through UDP from Unity using the UdpUnityPackage which is used in the tutorial, the SendMsg method can be overriden. Keep in mind, that most likely you want to serialize something and at the end of the day actually want to send a string format, however the input parameter of the method mustn't be a string.

Time synchronization

In some scenarios it is useful to measure the latency between the LEGO exoskeleton and Unity (or another system). To measure the latency in the experiment setting the following method was used:

The Unity program sends the time as a string timestamp of the PC which runs it. The Raspberry Pi grabs this message and sets its own clock with the following code:

    if message == 'time':
        date_str = str(message.payload.decode("utf-8"))
        os.system('sudo date -s %s' % date_str)

Even though this synchronization provides the ground for an estimated latency measure, it is NOT PRECISE. The NTP protocol is advised to be used to synchronize the Raspberry Pi and Unity. The ntplib python library should allow the Raspberry Pi to fetch the time of the "server" (in this case the PC which is running Unity).

Threading

The BrickPi system does not support by default asynchronous operation. This means that for example if we want to provide a nudging movement by setting the motor to a new target position we have to wait for the motor movement to be finished. This means that the scripts other functionalities in the meantime are paused for that time being. In this example the motor is giving a small nudge movement by rotating by 90 degrees. time.sleep(0.5) ensures that the motor movement is done. If that line is not there the movement will not happen.

target = BP.get_motor_encoder(BP.PORT_B) + 90
BP.set_motor_position(BP.PORT_B, target)
time.sleep(0.5)
BP.set_motor_power(BP.PORT_B, BP.MOTOR_FLOAT)

However, in multiple scenarios the actuation of the motors of the exoskeleton system needs to be asynchronous. For example streaming the motor position to the Unity program needs to be continuous just as logging the data locally. To do so we can introduce threading.

One possible tested way of doing it is the following:

First of all threading needs to be imported.

import threading

In the main function of the script we start a thread called receiveMsgThread:

# Starting a thread which is listening to the UDP messages until the program is closed
receiveMsgThread = threading.Thread(target=receiveMsg, args=())
receiveMsgThread.daemon = True # Making the Thread daemon so it stops when the main program has quit
receiveMsgThread.start()

Since it's a daemon thread we don't have to worry about closing it when exiting the script.

This thread is calling the following function, receiveMsg. It continuously checks for UDP messages on a different thread. It means that in the main endless loop of the script (while True:) our logging will run without interruption.

def receiveMsg():
    while True:
        data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
        if (addr[0] == SERVER_IP):
            messageCallback(data)

This function calls another function once again, called messageCallback. In this function we once again start a new thread which is only responsible for once actuating the motor(s).

def messageCallback(data):
    message = data.decode("utf-8")

    if message == 'left' or message == 'right' or message == 'up' or message == 'down': 
        actuateThread = threading.Thread(target=executeNudging, args=(message,))
        actuateThread.start()

This actuateThread is calling the executeNudging function which is rotating the motors as described above.

def executeNudging(direction: str):
    if direction == "up":
        target = BP.get_motor_encoder(BP.PORT_C) - 90
        BP.set_motor_position(BP.PORT_C + BP.PORT_A, target)
        time.sleep(0.5)
        BP.set_motor_power(BP.PORT_C + BP.PORT_A, BP.MOTOR_FLOAT)
    elif direction == "down":
        target = BP.get_motor_encoder(BP.PORT_C) + 90
        BP.set_motor_position(BP.PORT_C + BP.PORT_A, target)
        time.sleep(0.5)
        BP.set_motor_power(BP.PORT_C + BP.PORT_A, BP.MOTOR_FLOAT)
    elif direction == "left":
        target = BP.get_motor_encoder(BP.PORT_B) + 90
        BP.set_motor_position(BP.PORT_B, target)
        time.sleep(0.5)
        BP.set_motor_power(BP.PORT_B, BP.MOTOR_FLOAT)
    elif direction == "right":
        target = BP.get_motor_encoder(BP.PORT_B) - 90
        BP.set_motor_position(BP.PORT_B, target)
        time.sleep(0.5)
        BP.set_motor_power(BP.PORT_B, BP.MOTOR_FLOAT)

By having this also as a new thread multiple nudges are not blocking each other but can be executed in parallel. This is useful when multiple motors are actuated frequently sometimes overlapping.

The full example code is here:


#!/usr/bin/env python
from __future__ import print_function # use python 3 syntax but make it compatible with python 2
from __future__ import division

import paho.mqtt.client as mqtt
import time     # import the time library for the sleep function
import brickpi3 # import the BrickPi3 drivers
from datetime import datetime
import threading
import logging
import socket
import os

print("Setting up logging...")
logging.basicConfig(filename='exoskeleton.log', filemode="w", format='%(message)s', level=logging.DEBUG)
logging.info("value_wrist,value_elbow,timestamp")
print("Logging is set up!")

BP = brickpi3.BrickPi3() # Create an instance of the BrickPi3 class. BP will be the BrickPi3 object.
BP.set_led(0) # For succesfull setup indication, we turn off the LED and turn it on if it's succesfully set up

print("Setting up UDP connection...")

# Unity program
SERVER_IP = '192.168.0.101'
SERVER_PORT = 5013

# Exoskeleton
CLIENT_IP = '192.168.0.200'
CLIENT_PORT = 5011
sock = socket.socket(socket.AF_INET, # Internet
                     socket.SOCK_DGRAM) # UDP

sock.bind((CLIENT_IP, CLIENT_PORT))
print("UDP connection set up!")

print("Sending test UDP message...")
sock.sendto(str.encode("from pi"), (SERVER_IP, SERVER_PORT))

def executeNudging(direction: str):
    if direction == "up":
        target = BP.get_motor_encoder(BP.PORT_C) - 90
        BP.set_motor_position(BP.PORT_C + BP.PORT_A, target)
        time.sleep(0.5)
        BP.set_motor_power(BP.PORT_C + BP.PORT_A, BP.MOTOR_FLOAT)
    elif direction == "down":
        target = BP.get_motor_encoder(BP.PORT_C) + 90
        BP.set_motor_position(BP.PORT_C + BP.PORT_A, target)
        time.sleep(0.5)
        BP.set_motor_power(BP.PORT_C + BP.PORT_A, BP.MOTOR_FLOAT)
    elif direction == "left":
        target = BP.get_motor_encoder(BP.PORT_B) + 90
        BP.set_motor_position(BP.PORT_B, target)
        time.sleep(0.5)
        BP.set_motor_power(BP.PORT_B, BP.MOTOR_FLOAT)
    elif direction == "right":
        target = BP.get_motor_encoder(BP.PORT_B) - 90
        BP.set_motor_position(BP.PORT_B, target)
        time.sleep(0.5)
        BP.set_motor_power(BP.PORT_B, BP.MOTOR_FLOAT)

def messageCallback(data):
    message = data.decode("utf-8")

    if message == 'left' or message == 'right' or message == 'up' or message == 'down': 
        actuateThread = threading.Thread(target=executeNudging, args=(message,))
        actuateThread.start()

    if message == 'time':
        date_str = str(message.payload.decode("utf-8"))
        print("date:")
        print(date_str)
        os.system('sudo date -s %s' % date_str)

def receiveMsg():
    while True:
        data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
        if (addr[0] == SERVER_IP):
            messageCallback(data)

def main():
    try:
        try:
            BP.offset_motor_encoder(BP.PORT_A, BP.get_motor_encoder(BP.PORT_A))
            BP.offset_motor_encoder(BP.PORT_B, BP.get_motor_encoder(BP.PORT_B))
            BP.offset_motor_encoder(BP.PORT_C, BP.get_motor_encoder(BP.PORT_C))
        except IOError as error:
            print(error)

        # Starting a thread which is listening to the UDP messages until the program is closed
        receiveMsgThread = threading.Thread(target=receiveMsg, args=())
        receiveMsgThread.daemon = True # Making the Thread daemon so it stops when the main program has quit
        receiveMsgThread.start()

        BP.set_led(100) # Light up the LED to show the setup is successful
        print("Logging motor value and ready to take nudging messages through UDP...")
    
        while True:       
            timeStamp = datetime.now().strftime('%H:%M:%S.%f')
            
            value_wrist = float(-BP.get_motor_encoder(BP.PORT_B))
            value_wrist = str(value_wrist)
            value_elbow = (BP.get_motor_encoder(BP.PORT_A) + BP.get_motor_encoder(BP.PORT_C)) * -1 / 2
            value_elbow = str(value_elbow)

            udp_message = str.encode(f"{value_elbow},{value_wrist},{timeStamp}")
            sock.sendto(udp_message, (SERVER_IP, SERVER_PORT))
        
            logging.info(value_wrist + "," + value_elbow + "," + timeStamp)

            time.sleep(0.01) # Without sleep the system logs and sends data to the server ~820 times per second (820 Hz)

    except KeyboardInterrupt: # except the program gets interrupted by Ctrl+C on the keyboard.
        sock.close()
        print('\nsocket closed')
        BP.reset_all()        # Unconfigure the sensors, disable the motors, and restore the LED to the control of the BrickPi3 firmware.
        logging.info("program stopped")
        print('program stopped')

if __name__ == "__main__":
    main()