Example: Visual Discrimination ‐ experiment - fdechaumont/micecraft GitHub Wiki
The code given below run a touchscreen-based discrimination task for mice.
It can work as it is, but no visual interface to follow the experiment will be provided to the user. If the logs is sufficient for you, run this code alone. If you prefer to get the visual interface along the experiment, you need this code but you need to run the code of the visual interface (it will automatically start the experiment by calling the VisualDiscriminationExample class).
Context for the code
The touch screen can display 2 images on its left and right sides. A trial is when task is given to the mouse with a correct image to touch on the touchscreen. When the mouse achieve the given trial (depending on the phase), sugar water is given to it as a reward. If the mouse fail the trial, the experiment will wait 10 seconds before giving another trial. Only one mouse is allowed in the test room at a time. The mouse can leave the test room at any time.
How to setup and modify this example
Parameters to change are located at the beginning of the VisualDiscriminationExample.__init__ method, under this line:
# ================ EXPERIMENT PARAMETERS ================
This experiment is divided into 3 phases (the END phase is not part of the experiment). They are define during the initialisation of the VisualDiscriminationExample class as follows:
# Phases creation
# ----------------
Phase("BLACK_WHITE", 1, Criteria(accuracy=(0.9, 30)), TSImage.LIGHT)
Phase("FLOWER_PLANE", 2, Criteria(accuracy=(0.8, 100)))
Phase("REVERSAL", 3, Criteria(accuracy=(0.8, 100)), invert_image=True)
Phase("END", 4, Criteria(), invert_image=True)
This experiment only have one room but is design for multiple rooms if necessary. The rooms are instantiated during the initialisation of the VisualDiscriminationExample class as follows:
# Room creation
# ----------------
wp_alpha = WaterPump(comPort="COM22")
ts_alpha = TouchScreen(comPort="COM20")
gate_alpha = Gate(
COM_Servo="COM36",
COM_Arduino="COM30",
COM_RFID="COM27",
name="gate",
weightFactor=0.6,
mouseAverageWeight=25,
)
Room(
name="room_A",
expe_data_saver=self.save_animals_data,
video_recorder=self.record_video,
gate=gate_alpha,
touchscreen=ts_alpha,
waterpump=wp_alpha,
)
This room creation step is important because it is here where you define which COM Port is linked to which devices. There is also the COM Port of the room sensor device in # Global devices to define.
[!NOTE] When a room is instantiated, all its device will have the room name in their name like roomname-devicename (e.g. "room_A-TouchScreen"). The "-" (6 key) is unique in the device name and can be use to separate room name from device name. Can be useful in the code, especially for listener function.
The images displayed on the touchscreen can be define when creating a Phase (to force an image for a phase for all mice) or during the initialisation of the VisualDiscriminationExample class:
# Images choice
# ----------------
self.experiment_ts_images = [TSImage.PLANE, TSImage.FLOWER]
"""List of images that must be attributed to the animals."""
Images here are the ones that will be given to mouse randomly when the experiment read their RFID tag for the first time. For this experiment, a mouse can have either the FLOWER image or the PLANE image. The code image attribution is balanced and random, so that all images are attributed and no image is given more than the other (if the number of animals is a multiple of the number of images in the list, of course). To see the attribution method, look inside the register_rfid method of VisualDiscriminationExample.
Now, you have a control over a lot of parameters and can modify this example to suits your experiment expectations. If you want more tuning, especially over the room states and their logic, you can modify the appropriate methods in the Room class, they all start by set_state_ following by the state name. Be careful to have an INITIAL state and an EXIT state for the gate logic and to simplify your logs parsing analysis.
Experiment logic
- Data Logging: Every event or data, such as gate entries, touchscreen interactions, rewards, etc is logged with a timestamp in a file.
- Animal Creation: The experiment waits for a mouse to enter one of the gates to create the corresponding
Animalobject with the RFID tag. - Room State: When a mouse is in a test room, the
Roomclass switch between different states that follow the protocol logic. TheRoomclass, based on the animal's currentPhaseand assignedTSImage, displays the correct and incorrect images on the touchscreen during the TRIAL state. - Mouse Interaction: The mouse makes a choice by touching one of the images.
- Correct Choice: If the mouse touches the correct image, it receives a reward (success state), and the trial is recorded as a success.
- Incorrect Choice: If the mouse touches the incorrect image, a timeout period is initiated (fail state), during which the mouse cannot start a new trial. The trial is recorded as a failure.
- Phase Progression: When a mouse exit the room, the script checks if the mouse has met the
Criteriato advance to the nextPhase. If the criteria is met, the mouse's phase is updated.
Classes presentation
-
TouchScreenExperiment: This is the main orchestrator of the experiment. It manages the rooms, the animals, and the overall state of the experiment. It contains the listeners for hardware events (like a mouse entering a gate) that drive the experimental logic. -
ExperimentSettings: A helper class that handles the naming and commenting of experiment sessions. It saves these details so they can be easily reloaded for subsequent sessions, ensuring consistent naming. -
Room: Represents a single physical experimental chamber. It manages all the hardware associated with that chamber, such as theGate,TouchScreen, andWaterPump. -
Animal: Represents a single mouse in the experiment. It stores all the data relevant to that animal, including its RFID, its assigned correct image (ts_image), its currentphase, and a history of all its trials and rewards. -
Phase: Defines a specific stage of training. Each phase has aCriteriathat an animal must meet to progress to the next phase. Phases are ordered by theirrank. -
Criteria: This class specifies the conditions for completing a phase. These conditions can include a minimum number of trials, a minimum number of rewards, or achieving a certain accuracy over a set number of recent trials. -
TSImage: An enumeration (Enum) that defines the different images that can be displayed on the touchscreen. It provides a convenient way to manage and reference the images used in the experiment.
"""
This code defines a visual discrimination experiment for mice, where they are
trained to discriminate between two images displayed on a touchscreen.
Use it as an example or a template to create your own MiceCraft experiment.
"""
from typing import Any, Callable, Literal
from pathlib import Path
from time import sleep
from datetime import datetime
import logging
import os
import sys
from unittest import result
from blocks.autogate.Gate import Gate, GateOrder
from blocks.waterpump.WaterPump import WaterPump
from random import shuffle
from mail.Mail import LMTMail
from enum import Enum
from blocks.DeviceEvent import DeviceEvent
from math import exp
from blocks.alarm.Alarm import Alarm
import traceback
from threading import Thread, Timer
# import serial.tools.list_ports
from blocks.DiscoverDevices import DiscoverDevices
from blocks.display.oled.OledDisplay import OledDisplay
from blocks.lever.Lever import Lever
from blocks.FED3.Fed3Manager2 import Fed3Manager2
from blocks.touchscreen.TouchScreen import TouchScreen
from blocks.camera.CameraRecorder import CameraRecorder, CRText
import os
from blocks.sensors.roomSensor.RoomSensorDigest import RoomSensorDigest
from experiments.api.loguploader.LogUploader import LogUploader
from experiments.api.WaitForAllThreads import WaitForAllThreads
from datetime import timedelta
from enum import Enum
from experiments.api.ParameterSaver import ParameterSaver
class ExperimentSettings:
"""Class that handle the identification of the experiment, with its name
and comment.
It also handle the saving and loading of the previous experiment name, and
ask the user if they want to reload it.
"""
def __init__(self):
"""Initialise the experiment name, with loading and saving of the
previous experiment name."""
script_path = Path(__file__).parent
self.settings_saver = ParameterSaver(
str(script_path), "previous_experiment"
)
self.reset_experiment()
self.load_previous_experiment()
def start_experiment(self):
"""Start the experiment by asking the user if they want to reload the
previous experiment, and if not, ask them to input the experiment name
and comment."""
self.user_settings_selection()
self.save_experiment()
def reset_experiment(self):
"""Reset the experiment name and comment."""
self.name: str = ""
self.comment: str = ""
self.auto_random_attribution: bool = True
def user_settings_selection(self):
"""Ask the user if they want to reload the previous experiment,
if it exists."""
if self.name == "":
print("No previous experiment found.")
reload = None
else:
print("Do you want to reload the previous experiment ?")
reload = input(f"Reload {self.name} ? [Y/N]: ").casefold()
if reload is not None and reload.startswith("y"):
print(f"Loaded experiment: {self.name}")
self.comment += " (reloaded)"
print(f"Loaded comment: {self.comment}")
else:
self.user_input_identification()
self.ask_user_image_assignment()
def ask_user_image_assignment(self):
"""Ask the user whether to assign TSImage at random during experiment
or to wait until the user assigns them manually."""
choice = input("Assign animals image at random? [Y/N]: ").casefold()
if choice.startswith("y"):
self.auto_random_attribution = True
else:
self.auto_random_attribution = False
def save_experiment(self):
"""Save the experiment name and comment for the next session."""
data_to_save = {
"experiment_name": self.name,
"experiment_comment": self.comment,
"random_assignment": self.auto_random_attribution,
}
self.settings_saver.setData(data_to_save)
def load_previous_experiment(self):
"""Load the previous experiment name and comment, if they exist."""
exp_name = self.settings_saver.getValue("experiment_name")
if exp_name is not None and isinstance(exp_name, str):
self.name = exp_name
exp_comment = self.settings_saver.getValue("experiment_comment")
if exp_comment is not None and isinstance(exp_comment, str):
self.comment = exp_comment
random_attr = self.settings_saver.getValue("random_assignment")
if random_attr is not None and isinstance(random_attr, bool):
self.auto_random_attribution = random_attr
def user_input_identification(self):
"""Ask the user to input the experiment name and comment."""
name = input("Enter the id for this experiment: ")
if name is None or name.strip() == "":
raise ValueError("Experiment ID cannot be empty.")
name = name.replace("-", "_")
self.name = name
self.comment = input("Experiment comments: ")
class TSImage(Enum):
"""All possible displayed images on the touchscreen."""
NONE = -1
DARK = 0
LIGHT = 1
FLOWER = 2
PLANE = 3
def get_opposite(self):
"""Get the opposite TSImage."""
opposites = {
TSImage.LIGHT: TSImage.DARK,
TSImage.DARK: TSImage.LIGHT,
TSImage.FLOWER: TSImage.PLANE,
TSImage.PLANE: TSImage.FLOWER,
TSImage.NONE: TSImage.NONE,
}
return opposites[self]
def get_image_id(self) -> int:
"""Get the index of the image in the image bank of the touchscreen.
This index is needed in order to display it on the touchscreen."""
ids = {
TSImage.DARK: 8,
TSImage.LIGHT: 7,
TSImage.FLOWER: 1,
TSImage.PLANE: 0,
TSImage.NONE: 8,
}
return ids[self]
def __str__(self) -> str:
"""Return the name of the TSImage."""
return self.name
class Phase:
"""Object that handle the phases for an animal during the experiment."""
ALL: list[Phase] = []
@classmethod
def get(cls, id: int | str) -> Phase:
"""Get a Phase object by its name or rank."""
name = None
rank = None
if isinstance(id, int):
rank = id
if isinstance(id, str):
name = id
for phase in cls.ALL:
if name and phase.name == name:
return phase
if rank and phase.rank == rank:
return phase
raise ValueError(f"Phase ID '{id}' ({type(id)}) not found.")
@classmethod
def get_first(cls) -> Phase:
"""Get the first Phase object based on rank."""
if not cls.ALL:
raise ValueError("No phases available.")
return min(cls.ALL, key=lambda phase: phase.rank)
@classmethod
def get_last(cls) -> Phase:
"""Get the last Phase object based on rank."""
if not cls.ALL:
raise ValueError("No phases available.")
return max(cls.ALL, key=lambda phase: phase.rank)
def __init__(
self,
name: str,
rank: int,
criteria: Criteria,
correct_image: TSImage | None = None,
invert_image: bool = False,
) -> None:
"""Initialise a Phase object with a name and its criteria completion
as optional keyword arguments.
Parameters
----------
name : str
Name of the phase, used for display and identification. Must be
unique among all phases.
rank : int
Rank of the phase, used for ordering all phases. The lower the
rank, the earlier the phase. Must be unique among all phases.
criteria : Criteria
Criteria for phase completion.
correct_image : TSImage | None, optional
The correct image for the phase. If None, use the image given to
the animal. None by default.
invert_image : bool, optional
Whether to invert the correct image for this phase. Will call the
TSImage.get_opposite() method on the *correct_image* if True.
False by default.
"""
for phase in Phase.ALL:
if phase.name == name:
raise ValueError(f"Phase name '{name}' already exists.")
if phase.rank == rank:
raise ValueError(f"Phase rank '{rank}' already exists.")
self.name: str = name.replace("-", "_")
self.rank: int = rank
self.criteria: Criteria = criteria
self.correct_image: TSImage | None = correct_image
self.invert_image: bool = invert_image
Phase.ALL.append(self)
Phase.ALL.sort(key=lambda p: p.rank)
def __str__(self) -> str:
return f"{Phase.ALL.index(self):02d}-{self.name}"
def next(self) -> Phase:
"""Proceed to the next phase in declaration order."""
idx = Phase.ALL.index(self)
if idx == len(Phase.ALL) - 1:
return self
else:
return Phase.ALL[idx + 1]
def previous(self) -> Phase:
"""Return the previous phase in declaration order."""
idx = Phase.ALL.index(self)
if idx > 0:
return Phase.ALL[idx - 1]
else:
return self
class Criteria:
"""Object that handle the criteria for a mouse to complete a phase.
The animal must have, at least, those criteria to complete the phase."""
@classmethod
def from_repr(cls, repr: str):
"""Initialise a Criteria object from its string representation."""
values = list(map(int, repr.split("_")))
return cls(
min_rewards=values[0],
min_trials=values[1],
accuracy=(values[2], values[3]),
)
def __init__(
self,
min_rewards: int = 0,
min_trials: int = 0,
accuracy: tuple[float, int] = (0.0, 0),
) -> None:
"""Initialise a Criteria object with the criteria as optional keyword arguments.
Parameters
----------
min_rewards : int, optional
Minimum number of rewards an animal needs to pick up, by default 0.
min_trials : int, optional
Minimum number of trials an animal needs to take, by default 0.
accuracy : tuple of (float, int), optional
Minimum accuracy needed over the last N trials (accuracy, N), by default (0.0, 0).
"""
if min_rewards < 0:
raise ValueError("'min_rewards' must be a positive integer.")
if min_trials < 0:
raise ValueError("'min_trials' must be a positive integer.")
if accuracy[0] < 0 or accuracy[0] > 1:
raise ValueError("'accuracy[0]' must be in [0,1].")
if accuracy[1] < 0:
raise ValueError("'accuracy[1]' must be a positive integer.")
self.rewards: int = min_rewards
"""Minimum number of rewards an animal needs to pick up."""
self.trials: int = min_trials
"""Minimum number of trials an animal needs to take."""
self.accuracy: tuple[float, int] = accuracy
"""Minimum accuracy needed over the last N trials (accuracy, N)."""
def __str__(self) -> str:
str_repr = [
self.rewards,
self.trials,
self.accuracy[0],
self.accuracy[1],
]
str_repr = map(str, str_repr)
return "_".join(str_repr)
def is_fulfilled(self, animal: Animal) -> bool:
"""Check if an animal has completed the criteria."""
if self.rewards > 0:
rewards_list = animal.get_rewards(animal.phase)
if sum(rewards_list.values()) < self.rewards:
return False
if self.trials > 0 or self.accuracy[1] > 0:
trials_list = animal.get_trials(animal.phase)
if len(trials_list) < self.trials:
return False
if len(trials_list) < self.accuracy[1]:
return False
if self.accuracy[0] > 0:
last_results = list(trials_list.values())[-self.accuracy[1] :]
accuracy = sum(last_results) / len(last_results)
if accuracy < self.accuracy[0]:
return False
return True
def get_progression(self, animal: Animal) -> list[str]:
"""Return a list containing all information regarding criteria
progression."""
progress_list = []
if self.rewards > 0:
rewards_list = animal.get_rewards(animal.phase)
rewards_taken = sum(rewards_list.values())
if rewards_taken < self.rewards:
progress_list.append(
f"rewards: {rewards_taken}/{self.rewards} "
f"({rewards_taken / self.rewards * 100:.0f}%)"
)
trials = animal.get_trials(animal.phase)
if self.trials > 0:
if len(trials) < self.trials:
progress_list.append(
f"trials: {len(trials)}/{self.trials} "
f"({len(trials) / self.trials * 100:.0f}%)"
)
if self.accuracy[1] > 0:
if len(trials) < self.accuracy[1]:
progress_list.append(
f"need {self.accuracy[1] - len(trials)} more trials for "
f"accuracy ({(len(trials) / self.accuracy[1]) * 100:.0f}%)"
)
else:
last_results = list(trials.values())[-self.accuracy[1] :]
accuracy = sum(last_results) / len(last_results)
progress_list.append(f"accuracy: {accuracy * 100:.0f}%")
if not progress_list:
progress_list = ["Phase completed."]
return progress_list
class Animal:
"""Animal object for MiceCraft experiment."""
def __init__(self, rfid: str):
"""
Initialise an Animal object for the experiment.
Parameters
----------
rfid : str
Unique identifier (RFID) for the animal.
"""
self.rfid: str = rfid
self.full_description: str = ""
self.ts_image: TSImage = TSImage.LIGHT
"""Assigned touchscreen image of the animal."""
self.phase: Phase = Phase.get_first()
"""Current phase of the animal."""
self.phases_start_time: dict[Phase, datetime] = {
self.phase: datetime.now()
}
"""Dict with Phase keys and datetime values of the start time of each
phase."""
self.phases_success_time: dict[Phase, datetime] = {}
"""Dict with Phase keys and datetime values of the success time of each
phase."""
self.trials_dic: dict[datetime, bool] = {}
"""Dict with datetime keys when animal performs a trial,
True= success, False= fail."""
self.touched_left_dic: dict[datetime, bool] = {}
"""Dict with datetime keys when touchscreen is touched meaningfully,
True= choose left, False= choose right."""
self.rewards_dic: dict[datetime, bool] = {}
"""Dict with datetime keys when a reward is delivered,
True= reward picked, False= reward not picked."""
self.progression_display: list[str] = ["not started yet"]
"""List describing current criteria progression."""
@staticmethod
def datetime_to_str(date: datetime):
"""Transform a datetime object into str (used in 'save_to_dict')."""
return date.strftime("%Y/%m/%d, %H:%M:%S")
@staticmethod
def str_to_datetime(date: str):
"""Transform a str into datetime object (used in 'from_dict')."""
return datetime.strptime(date, "%Y/%m/%d, %H:%M:%S")
def save_as_dict(self) -> dict[str, Any]:
"""Save mouse data in a dict for further JSON save.
Returns
-------
dict
A JSON savable dictionnary.
"""
dic = {}
dic["rfid"] = self.rfid
dic["full_description"] = self.full_description
dic["ts_image"] = str(self.ts_image)
dic["phase"] = str(self.phase)
dic["criteria"] = str(self.phase.criteria)
dic["phases_start_time"] = {
str(phase): self.datetime_to_str(date)
for phase, date in self.phases_start_time.items()
}
dic["phases_success_time"] = {
str(phase): self.datetime_to_str(date)
for phase, date in self.phases_success_time.items()
}
dic["trials_dic"] = {
self.datetime_to_str(date): result
for date, result in self.trials_dic.items()
}
dic["choice_left_dic"] = {
self.datetime_to_str(date): result
for date, result in self.touched_left_dic.items()
}
dic["rewards_picked"] = [
self.datetime_to_str(date) for date in self.rewards_dic
]
return dic
@classmethod
def load_from_dict(cls, dic: dict[str, Any]) -> Animal:
"""Load mouse data from a previously saved dictionary.
Parameters
----------
dic : dict
Dictionary containing the saved attributes needed to recreate a
Mouse instance.
"""
instance = cls(dic["rfid"])
instance.full_description = dic["full_description"] + " (reloaded)"
instance.ts_image = TSImage[dic["ts_image"]]
_, phase_name = dic["phase"].split("-")
instance.phase = Phase.get(phase_name)
assert str(instance.phase.criteria) == Criteria.from_repr(
dic["criteria"]
)
instance.phases_start_time = {
Phase.get(phase.split("-")[1]): cls.str_to_datetime(date)
for phase, date in dic["phases_start_time"].items()
}
instance.phases_success_time = {
Phase.get(phase.split("-")[1]): cls.str_to_datetime(date)
for phase, date in dic["phases_success_time"].items()
}
instance.trials_dic = {
cls.str_to_datetime(date): result
for date, result in dic["trials_dic"].items()
}
instance.touched_left_dic = {
cls.str_to_datetime(date): result
for date, result in dic["choice_left_dic"].items()
}
instance.rewards_dic = {
cls.str_to_datetime(date): result
for date, result in dic["rewards_picked"].items()
}
instance.progression_display = instance.phase.criteria.get_progression(
instance
)
return instance
def add_trial(self, result: bool):
"""Save a trial datetime and its outcome in 'trials_dic'.
Parameters
----------
result : bool
Outcome of the trial (True if successful, False otherwise).
"""
if result:
result_str = "success"
else:
result_str = "fail"
self.trials_dic[datetime.now()] = result
logging.info(f"[trial_outcome] rfid {self.rfid} outcome {result_str}")
self.progression_display = self.phase.criteria.get_progression(self)
def add_side_choice(self, choice_left: bool):
"""Save a side choice in 'touched_left_dic', True if left, False
otherwise."""
self.touched_left_dic[datetime.now()] = choice_left
if choice_left:
choice_str = "left"
else:
choice_str = "right"
logging.info(
f"[side_choosing] rfid {self.rfid} touched_side {choice_str}"
)
def add_picked_reward(self, picked: bool):
"""Save a reward picked in 'rewards_dic', True if picked, False
otherwise."""
self.rewards_dic[datetime.now()] = picked
if picked:
picked_str = "picked"
else:
picked_str = "try_picking_but_no_reward_delivered"
logging.info(f"[reward_picking] rfid {self.rfid} reward {picked_str}")
self.progression_display = self.phase.criteria.get_progression(self)
def proceed_to_next_phase(self):
"""Updates animal data to initialise next phase."""
time = datetime.now()
logging.info(f"[phase_completion] rfid {self.rfid} phase {self.phase}")
self.phases_success_time[self.phase] = time
self.phase = self.phase.next()
self.phases_start_time[self.phase] = time
def get_trials(self, phase: Phase) -> dict[datetime, bool]:
"""Get all trials corresponding to specified phase."""
if phase not in self.phases_start_time:
return {}
start_time = self.phases_start_time[phase]
end_time = self.phases_success_time.get(phase, datetime.max)
trials_filtered = {
date: result
for date, result in self.trials_dic.items()
if start_time <= date and date <= end_time
}
return trials_filtered
def get_rewards(self, phase: Phase) -> dict[datetime, bool]:
"""Get all rewards picked datetime corresponding to animal phase."""
if phase not in self.phases_start_time:
return {}
start_time = self.phases_start_time[phase]
end_time = self.phases_success_time.get(phase, datetime.max)
rewards_filtered = {
date: result
for date, result in self.rewards_dic.items()
if start_time <= date and date <= end_time
}
return rewards_filtered
def phase_completed(self) -> bool:
"""Check if an animal has completed its phase criterias."""
return self.phase.criteria.is_fulfilled(self)
def get_correct_image(self) -> TSImage:
"""Get the correct image for the animal in its current phase."""
if self.phase.correct_image is not None:
correct_image = self.phase.correct_image
else:
correct_image = self.ts_image
if self.phase.invert_image:
correct_image = correct_image.get_opposite()
return correct_image
class Room:
"""Class for managing room logic and devices."""
ALL: list[Room] = []
@classmethod
def get_from_name(cls, name: str) -> Room | None:
"""Get a room by its name or by the name of one of its devices."""
if not cls.ALL:
return None
room_name = name.split("-")[0]
for room in cls.ALL:
if str(room) == room_name:
return room
return None
@classmethod
def get_from_rfid_in(cls, rfid: str | None) -> Room | None:
"""Get the room where the RFID is currently in."""
if rfid is None:
return None
for room in cls.ALL:
if room.animal_in == rfid:
return room
return None
def __init__(
self,
name: str,
expe_data_saver: Callable,
video_recorder: Callable | None,
gate: Gate,
touchscreen: TouchScreen,
waterpump: WaterPump,
):
"""Initialise a room with its name and devices."""
# check room name unicity
correct_name = name.replace("-", "_")
for room in Room.ALL:
if room.name == correct_name:
raise ValueError(f"Room name '{correct_name}' already exists.")
self.name: str = correct_name
"""Name of the room, used for display and identification. Must be
unique among all rooms."""
self.animal_in: Animal | None = None
"""Animal currently in the room, None if no animal."""
self.expe_data_saver: Callable = expe_data_saver
"""Function to call to save all experiment data (not just room data).
Used after each trial outcome."""
self.video_recorder: Callable | None = video_recorder
"""Function to call to record a video of the animal."""
self.action_enabled: bool = False
"""Whether the animal actions are enabled or not (used for example to
avoid taking into account touchscreen touches during the inter-trial
interval)."""
self.running_timers: list[Timer] = []
"""List of currently running timers."""
self.gate: Gate = gate
self.gate.name = self.name + "-" + self.gate.name.replace("-", "_")
self.ts: TouchScreen = touchscreen
self.ts.name = self.name + "-" + self.ts.name.replace("-", "_")
self.wp: WaterPump = waterpump
self.wp.name = self.name + "-" + self.wp.name.replace("-", "_")
Room.ALL.append(self)
def __str__(self) -> str:
"""Return the name of the room."""
return self.name
def set_gate_listener(self, listener: Callable):
"""Set the gate listener."""
self.gate_listener = listener
self.init_hardware()
def init_hardware(self, display_log: bool = True):
"""Initialise all hardware of the room."""
# gate
# ----------------
if self.gate_listener is not None:
self.gate.addDeviceListener(self.gate_listener)
self.gate.setSpeedAndTorqueLimits(140, 140)
self.gate.weightFactor = 0.6 # type: ignore
self.gate.setOrder(
GateOrder.ONLY_ONE_ANIMAL_IN_B, options=["no rfid check on return"]
)
logging.info(
"[init_hardware] "
f"room {str(self)} "
f"device {type(self.gate).__name__} "
f"COM_SERVO {self.gate.COM_Servo} "
f"COM_ARDUINO {self.gate.COM_Arduino} "
f"COM_RFID {self.gate.COM_RFID} "
f"name {self.gate.name} "
)
# touchscreen
# ----------------
self.ts.addDeviceListener(self.touchscreen_listener)
self.ts.setTransparency(0.5)
self.ts.setMouseMode()
self.ts.clear()
self.ts.setConfig(1, 1, 900)
self.ts.showCalibration(False)
logging.info(
"[init_room] "
f"room {str(self)} "
f"device {type(self.ts).__name__} "
f"COM_PORT {self.ts.comPort} "
f"name {self.ts.name} "
)
# waterpump
# ----------------
self.wp.addDeviceListener(self.touchscreen_listener)
self.wp.setDropParameters(255, 17, 0.025)
logging.info(
f"[init_room] "
f"room {str(self)} "
f"device {type(self.wp).__name__} "
f"COM_PORT {self.wp.comPort} "
f"name {self.wp.name} "
)
def check_in_warning(self, info: str):
"""Check if there is an animal in the room."""
logging.info(
"[warning] [check_in_failed] " f"room {str(self)} " f"info {info} "
)
def touchscreen_listener(self, event: DeviceEvent):
"""Function called by the touchscreen when it fires an event."""
if self.animal_in is None:
self.check_in_warning(event.description)
return
if "symbol xy touched" in event.description:
if "ts_left_image" in event.description:
self.animal_in.add_side_choice(choice_left=True)
if "ts_right_image" in event.description:
self.animal_in.add_side_choice(choice_left=False)
if (
str(self.animal_in.ts_image) in event.description
or str(TSImage.LIGHT) in event.description
):
logging.info(
"[trial_result] "
f"room {str(self)} "
f"rfid_in {self.animal_in} "
f"phase {str(self.animal_in.phase)} "
f"result SUCCESS "
)
if self.video_recorder:
self.video_recorder(self.animal_in, True)
self.set_state_success(1)
self.animal_in.add_trial(True)
self.expe_data_saver()
if (
str(self.animal_in.ts_image.get_opposite())
in event.description
or str(TSImage.DARK) in event.description
):
logging.info(
"[trial_result] "
f"room {str(self)} "
f"rfid_in {self.animal_in} "
f"phase {str(self.animal_in.phase)} "
f"result FAIL "
)
if self.video_recorder:
self.video_recorder(self.animal_in, False)
self.set_state_fail(10)
self.animal_in.add_trial(False)
self.expe_data_saver()
def waterpump_listener(self, event: DeviceEvent):
"""Function called by the waterpump when it fires an event."""
if self.animal_in is None:
self.check_in_warning(event.description)
return
if "reward picked" in event.description:
if self.wp.rewardDelivered and self.wp.rewardPicked:
self.animal_in.add_picked_reward(True)
self.set_state_trial()
else:
self.animal_in.add_picked_reward(False)
def get_all_devices(self) -> list[Any]:
"""Get all devices of the room in a list."""
return [self.gate, self.ts, self.wp]
def set_animal_weight(self, weight: int):
"""Set the animal weight for the gate parameters."""
self.gate.mouseAverageWeight = weight
logging.info(
"[gate_expected_weight] "
f"room {str(self)} "
f"gate_name {self.gate.name} "
f"expected_weight_set_to_(g) {weight}"
)
def shutdown_hardware(self):
"""Shutdown all hardware of the room."""
self.gate.shutdown()
self.ts.shutdown()
self.wp.shutdown()
def reset(self):
"""Reset room parameters and re-init hardware."""
self.animal_in = None
self.ts.enabled = False
self.wp.rewardPicked = False
self.wp.rewardDelivered = False
self.init_hardware()
def start_timer(
self, duration_sec: int, callback: Callable, *args, **kwargs
):
"""Start a timer that will call the specified callback after the given
duration (*in seconds*)."""
self.running_timers.append(
Timer(duration_sec, callback, *args, **kwargs)
)
self.running_timers[-1].start()
def cancel_all_timers(self):
"""Cancel all running timers."""
for timer in self.running_timers:
timer.cancel()
self.running_timers = []
def ts_display(self, left_img: TSImage, right_img: TSImage):
"""Displays images on left and right side."""
self.ts.clear()
logging.info(
"[touchscreen_display] "
f"room {str(self)} "
f"touchscreen_name {self.ts.name} "
f"left_image_name {str(left_img)} "
f"left_image_id {left_img.get_image_id()} "
f"right_image_name {str(right_img)} "
f"right_image_id {right_img.get_image_id()} "
)
self.ts.setXYImage(
f"ts_left_image_{str(left_img)}",
left_img.get_image_id(),
1920 / 2 - 400,
750,
0,
1,
)
self.ts.setXYImage(
f"ts_right_image_{str(right_img)}",
right_img.get_image_id(),
1920 / 2 + 400,
750,
0,
1,
)
def ts_random_display(self, img: TSImage):
"""
Display the image and its opposite on the touchscreen, but choose their
sides randomly."""
random_display = [img, img.get_opposite()]
shuffle(random_display)
logging.info(
f"[touchscreen_random_display] "
f"room {str(self)} "
f"touchscreen_name {self.ts.name} "
f"image {str(img)} "
)
self.ts_display(
left_img=random_display[0], right_img=random_display[1]
)
def simulate_ts_event(self, result: bool = False):
"""Trigger a random touchscreen event."""
if result:
img_name = str(TSImage.LIGHT)
else:
img_name = str(TSImage.DARK)
self.ts.fireEvent(
DeviceEvent(
deviceType="touchscreen",
deviceObject=self,
description="symbol xy touched " + f"ts_left_image_{img_name}",
data=("simulate", 0, 0, 0, 0, 0),
)
)
logging.info(
"[touch_simulation] "
f"room {str(self)} "
f"device_name {type(self.ts).__name__ + '_' + self.ts.name} "
f"img_touch {img_name} "
)
def state_clear(self, flush_duration: int = 0):
"""Clear the actual state of the room, in order to prepare the
next one.
- disable actions
- turn off waterpump light
- turn off touchscreen
- flush any reward if needed (flush_duration > 0) and log it
"""
self.ts.enabled = False
self.wp.lightOff()
self.ts.clear()
logging.info(f"[room_state] room {str(self)} state CLEAR")
if flush_duration > 0:
self.wp.flush(255, flush_duration)
logging.info(f"[reward_flushing] room {str(self)}")
def set_state_initial(self, animal: Animal):
"""Set room in INITIAL state. Called by *gate_listener* when an animal
enters the room."""
self.cancel_all_timers()
self.state_clear()
self.animal_in = animal
logging.info(
"[animal_in] "
f"room {str(self)} "
f"animal {self.animal_in.rfid} "
)
logging.info(f"[room_state] room {str(self)} state INITIAL")
self.set_state_trial()
def set_state_exit(self):
"""Set room in EXIT state. Called by *gate_listener* when an animal
exits the room."""
self.cancel_all_timers()
self.state_clear(500)
logging.info(f"[room_state] room {str(self)} state EXIT")
if self.animal_in is None:
self.check_in_warning("set exit state but no animal in room")
else:
if self.animal_in.phase_completed():
self.animal_in.proceed_to_next_phase()
logging.info(
"[animal_out] "
f"room {str(self)} "
f"animal {self.animal_in.rfid} "
)
self.expe_data_saver()
self.animal_in = None
self.init_hardware(display_log=False) # unplugged management
def set_state_success(self, reward_size: int):
"""Set room in SUCCESS state."""
self.cancel_all_timers()
self.state_clear()
self.wp.deliverDrop(reward_size)
self.wp.lightOn(30)
logging.info(
f"[reward_delivery] room {str(self)} reward_size {reward_size}"
)
logging.info(f"[room_state] room {str(self)} state SUCCESS")
def set_state_fail(self, wait_time: int):
"""Set room in FAIL state."""
self.cancel_all_timers()
self.state_clear(1000)
self.start_timer(wait_time, self.set_state_trial)
logging.info(f"[room_state] room {str(self)} state FAIL")
def set_state_trial(self):
"""Grab the phase of animal. Update room depending on animal phase
(basically the state where the mouse have a trial set up).
"""
self.expe_data_saver()
if self.animal_in is None:
self.check_in_warning("tried to set room in trial state")
return
self.state_clear()
self.ts_random_display(self.animal_in.get_correct_image())
self.action_enabled = True
logging.info(f"[room_state] room {str(self)} state TRIAL")
class VisualDiscriminationExample:
"""Class that handle the whole touchscreen experiment."""
def __init__(self):
"""Initialise the touchscreen experiment, with all hardware and
parameters setup.
Parameters
----------
images_to_attribute : list of TSImage
The images to attribute to the animals. If auto_random_attribution
is True, those images will be given at random but with an equal
distribution of them (list is randomized and then given to new
animal in order, list is shuffle again when end is reached).
"""
# ================ EXPERIMENT PARAMETERS ================
# Phases creation
# ----------------
Phase("BLACK_WHITE", 1, Criteria(accuracy=(0.9, 30)), TSImage.LIGHT)
Phase("FLOWER_PLANE", 2, Criteria(accuracy=(0.8, 100)))
Phase("REVERSAL", 3, Criteria(accuracy=(0.8, 100)), invert_image=True)
Phase("END", 4, Criteria(), invert_image=True)
# Room creation
# ----------------
wp_alpha = WaterPump(comPort="COM22")
ts_alpha = TouchScreen(comPort="COM20")
gate_alpha = Gate(
COM_Servo="COM36",
COM_Arduino="COM30",
COM_RFID="COM27",
weightFactor=0.6, # type: ignore
mouseAverageWeight=25,
)
Room(
name="room_A",
expe_data_saver=self.save_animals_data,
video_recorder=self.record_video,
gate=gate_alpha,
touchscreen=ts_alpha,
waterpump=wp_alpha,
)
# Images choice
# ----------------
self.experiment_ts_images = [TSImage.PLANE, TSImage.FLOWER]
"""List of images that must be attributed to the animals."""
# Global devices
# ----------------
self.camRecorder = CameraRecorder(
deviceNumber=0, bufferDurationS=50, showStream=True
) # camera recorder for saving videos
self.roomSensorDigest = RoomSensorDigest(
comPort="COM25", delayS=5 * 60
) # room sensors (get data every 5 minutes)
# ================ EXPERIMENT ================
# experiment settings
# ----------------
self.info = ExperimentSettings()
self.info.start_experiment()
current_date = datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss")
current_exp = self.info.name + "-"
current_exp += current_date
os.makedirs(current_exp)
os.chdir(current_exp)
# logs setup
# ----------------
log_file = current_exp + ".log.txt"
print("Logfile: ", log_file)
logging.basicConfig(
level=logging.INFO,
filename=log_file,
format="%(asctime)s.%(msecs)03d: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
) # log message in appropriate file with time and date
# log also in console
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
# animals
# ----------------
self.animals: dict[str, Animal] = {}
"""{RFID: Animal} Dictionary with RFID as key and Animal object as
value for all animals that entered the experiment."""
self.animals_saver = ParameterSaver(
Path(__file__).parent, self.info.name
)
# load animals data if exist, and fill self.animals with it
for data in self.animals_saver.getData().values():
animal = Animal.load_from_dict(data)
self.animals[animal.rfid] = animal
logging.info(
f"[animals_loading] rfid {animal.rfid} phase {animal.phase} ts_image {animal.ts_image}"
)
# Rooms
# ----------------
for room in Room.ALL:
room.set_gate_listener(self.gate_listener)
self.init_experiment()
logging.info("application started")
logging.info(f"NAME: {self.info.name}")
logging.info(f"DATE: {current_date}")
logging.info(f"COMMENT: {self.info.comment}")
# ================ EXPERIMENT MANAGEMENT ================
def init_experiment(self):
"""Initialise all systems."""
for room in Room.ALL:
room.init_hardware()
self.roomSensorDigest.addDeviceListener(self.room_sensor_listener)
self.roomSensorDigest.delayS = 5 * 60
logging.info(
"[init_sensors] "
f"device {type(self.roomSensorDigest).__name__} "
f"COM_PORT {self.roomSensorDigest.comPort} "
)
def shutdown_experiment(self):
"""Shutdown all systems."""
for room in Room.ALL:
room.shutdown_hardware()
self.camRecorder.shutdown()
self.roomSensorDigest.shutdown()
WaitForAllThreads()
def record_video(self, animal: Animal, result: bool | None):
"""Save small video record of the trial.
Parameters
----------
trial_result : bool | None
The result of the trial. It will be written on the video.
If None, indicates that there is no good answer for this trial.
"""
if result is None:
txt = "Trial without answer"
specific_color = (255, 0, 0)
else:
if result:
txt = "Successful trial"
specific_color = (0, 0, 255)
else:
txt = "Failed trial"
specific_color = (0, 255, 0)
txt_settings = {"fontScale": 0.5, "centered": False}
text_list = []
text_list.append(CRText(txt, x=10, y=10, centerX=True, **txt_settings))
txt = f"RFID {animal.rfid}"
text_list.append(CRText(txt, x=10, y=300, **txt_settings))
txt = f"correct image: {str(animal.ts_image)}"
text_list.append(CRText(txt, x=10, y=340, **txt_settings))
txt = f"current phase: {str(animal.phase)}"
text_list.append(CRText(txt, x=10, y=380, **txt_settings))
text_list.append(
CRText(
"X",
x=0,
y=0,
bgColor=specific_color,
color=specific_color,
**txt_settings,
)
)
self.camRecorder.delayedSave(
delayS=5,
minDateTime=datetime.now() - timedelta(seconds=5),
textList=text_list,
)
def get_all_rfid(self):
"""Get all registered RFID in *animals* dictionary."""
return list(self.animals.keys())
def save_animals_data(self):
"""Save all animals data."""
data = {}
for rfid, animal in self.animals.items():
data[rfid] = animal.save_as_dict()
logging.info(f"[animals_data_saving] rfid {rfid}")
self.animals_saver.setData(data)
self.animals_saver.save()
def register_rfid(self, rfid: str | None):
"""Registered RFID if not already in animals."""
if rfid is None:
logging.info(f"[warning] [rfid_registration] rfid {rfid}")
return
all_rfid = self.get_all_rfid()
if rfid not in all_rfid:
new_animal = Animal(rfid)
if self.info.auto_random_attribution:
img_idx = len(all_rfid) % len(self.experiment_ts_images)
if img_idx == 0:
shuffle(self.experiment_ts_images)
choosen_image = self.experiment_ts_images[img_idx]
new_animal.ts_image = choosen_image
logging.info(f"[rfid_registration] rfid {rfid}")
logging.info(
"[ts_image_attribution] "
f"rfid {rfid}"
f"touchscreen_image {str(new_animal.ts_image)}"
)
else:
new_animal.ts_image = TSImage.NONE
self.animals[rfid] = new_animal
# ================ VISUAL APP INTERACTIONS ================
def get_ts_image(self, rfid: str):
"""Get the TSImage of the corresponding animal."""
if rfid in self.animals.keys():
return self.animals[rfid].ts_image
else:
return None
def set_ts_image(self, rfid: str, ts_image: TSImage):
"""Set the TSImage of the corresponding animal."""
if rfid in self.animals.keys():
self.animals[rfid].ts_image = ts_image
logging.info(
"[user_ts_image_attribution] "
f"rfid {rfid} "
f"touchscreen_image {str(ts_image)}"
)
def get_phase(self, rfid: str) -> Phase | None:
"""Get the phase of corresponding animal."""
if rfid in self.animals.keys():
return self.animals[rfid].phase
else:
return None
def set_phase(self, rfid: str, phase: Phase):
if rfid in self.animals.keys():
self.animals[rfid].phase = phase
logging.info(
"[user_phase_attribution] "
f"rfid {rfid} "
f"phase {str(phase)}"
)
def get_all_rooms(self) -> list[Room]:
"""Get all rooms of the experiment in a list."""
return Room.ALL
def get_room(
self, name: str | None = None, rfid_in: str | None = None
) -> Room | None:
"""Get the room from its *name* or from its *rfid_in*."""
if name is not None:
return Room.get_from_name(name)
elif rfid_in is not None:
return Room.get_from_rfid_in(rfid_in)
else:
return None
# ================ LISTENERS ================
def room_sensor_listener(self, event: DeviceEvent):
"""Function called when room sensors fire an event."""
logging.info(event.description)
def gate_listener(self, event: DeviceEvent):
"""Function called by the gates when they fire an event."""
# get room and device corresponding to event
room = self.get_room(name=event.deviceObject.name)
if room is None:
logging.info(
"[gate_listener] "
f"room not_found "
f"type {str(event.deviceType)} "
f"name {event.deviceObject.name} "
f"event {event.description}"
)
return
# Animal in
# ----------------
if "allowed to cross" in event.description:
if "TO SIDE B" in event.description:
read_rfid: str = event.data # type: ignore
self.register_rfid(read_rfid)
room.set_state_initial(self.animals[read_rfid])
# get animal corresponding to RFID in room, if exist
if room.animal_in is None:
logging.info(
"[warning] "
"[gate_listener] "
f"room {str(room)} "
f"rfid {None} "
f"event_desc {event.description}"
)
return
# Animal out
# ----------------
if "FREE TO GET TO SIDE A" in event.description:
room.set_state_exit()
logging.info(
"[animal_progression] "
f"{' '.join(room.animal_in.progression_display)}"
)
if __name__ == "__main__":
print("Starting experiment")
expe = VisualDiscriminationExample()
print("Done")