implementing a filter - TUMFARSynchrony/SynthARium GitHub Wiki
Please make sure to always test changes & additions. Both with multiprocessing enabled and disabled, as filters are potentially executed on a subprocess.
A template filter is included in the filters and group_filters folder. These are basic filter/group filters which can be easily adapted to your needs.
This section outlines 7 steps for how to implement a custom filter depending on your intended filter functionality. Some steps are added to ensure a good project structure.
- In
backend/filters
, copy and paste the template folder to create a new filter module (folder with__init__.py
). Choose a descriptive and unique name for your filter, not yet being used by other filters. Rename the folder to this name, the name convention should be lowercase and underscore separated if multiple words. Note: If not using the template to create a new folder, make sure to extendFilter
to implement a custom Filter (i.e. from filters import Filter).
- Rename the .py file to what you intend your filter to do. The naming convention is that it describes ‘process_channel_filter.py’ where:
process
: is the task the filter is supposed to perform.
channel
: is if it performs the task on audio, video, or both channels. If a filter does more than one task, chain the various processes it performs. For example: “display_speaking_time_audio_filter.py”. In this case, even if the objective of the filter is to display the speaking time, the main way speaking time is derived is from the audio and not the video so audio is specified.
- Change the filter class name to your filter name in camel case (i.e. TemplateFilter - no spaces and the first letter of each word is capitalized). Filters should be in the filters directory.
- Import the filter class implemented in step 0 in
backend/filters/*new_module*/__init__.py
, e.g.from .new_file_name import NewFilterClass
-
Before implementing your filter in ‘process’, some required changes are needed in the
name
,type
,channel
, anddefault_config
static functions.
name
: change this function to return a unique name of your filter as it will appear in the frontend. The name needs to be unique as it is used to identify the filter in the backend. The naming convention is to have it be all caps without spaces.
type
: change this to either SESSION or TEST or NONE. A SESSION filter enables the filter both during the lobby room and the main experiment page once the experiment is started. (most likely you are implementing a SESSION filter). A TEST filter is a filter that only runs during the lobby page (i.e. if you want to implement a pre-experiment filter that detects if the participant is wearing glasses or not - this feature can be extended from the glasses_detection_filter as shown in the demo in the development branch).channel
: change thechannel
attribute to either “video”, “audio”, or “both” depending on which channel (or trackhandler) your filter will be processing to analyze/manipulate the stream of.
default_config
: If your filter needs to have some parameters/options/or inputs when applying it to a participant, you have the option of creating input fields through this config. Currently we offer a drop down selection of strings and a number/integer text input. After linking and defining the default configs here, the font end elements will automatically be created. This function is also responsible for sending the whole filter structure (i.e. name, type, groupFilter, channel, config) to be displayed and included in the front end. -
Import the filter class implemented in step 2 in
backend/filters/__init__.py
e.g. placefrom .new_file_name import NewFilterClass
after where all other subclasses of filter are imported in this init.py file. -
Celebrate! You have just added a filter you can use in experiments to the experimental-hub! You can now implement your filters logic in the process which modifies or analyzes the ndarray. Don’t see your filter in the frontend? Please see the troubleshooting FAQ below. Would you like to make a contribution of your filter to the main branch, please see our guidelines.
If the experimenter wants to dynamically change values of a filter from the frontend when applying a filter to a participant, optional variables can be set in the default_config
to dynamically add a string dropdown or integer text field. If no inputs are required for your filter, skip this section.
Adding dynamic variables:
In the default_config
method, there are two option of dynamic values:
- Dropdown List: this includes several pre-determined values in an array. This type needs a defaultValue (an array of options), a value (one value of defaultValue array) and requiresOtherFilter (boolean, required to handle filter dependencies)
- Number Input: a number requires a min and max value, the number step (can be also decimal), value and a defaultValue (the values need to be a number)
For each type of dynamically added inputs to the frontend, all fields are required and need to be there. The validate_filter_json
will check if all fields are correct and will throw an error if not.
For an example, take a look at the RotationFilter
in backend/filters/rotate/rotate_filter.py
.
If the filter requires access to data or functionality not yet provided by the FilterAPI, the Filter API must be extended.
The filter API is also required if a filter wants to access filters from other tracks, since their TrackHandler may run on a separate subprocess. We are working on documentation about how multiprocessing is handled by the server, in the meantime, please consult our backend architecture.
- Define new abstract function in
FilterSubprocessAPI
atbackend/modules/filter_api_interface.py
- Implement function in
FilterAPI
atbackend/modules/filter_api.py
, which handles the requests on the main process with direct access to the data - Implement function in
FilterSubprocessAPI
atbackend/modules/filter_subprocess_api.py
, which is used if multiprocessing is enabled-
FilterSubprocessAPI
sends the requests to the main process, where aFilterSubprocessReceiver
must handle the requests. So also handle the command inFilterSubprocessReceiver
atbackend/modules/filter_subprocess_receiver.py
. - Please take a look at the code documentation and the Backend Architecture before
-
Q: My filter didn’t show up in the drop down when I went to add it to a participant!
A:
Q: Adding my filter crashes/freezes the participant stream.
A: Unfortunately there is some error in the newly implemented filter logic. If you would like some help debugging your filter, you can use the logging library to confirm the code is working as expected. To print to the console, you can add self._logger.info(“PRINT ME!”) to your code.
Q: My filter runs with too much latency! What can I do about it?
A: Everything that runs in the asynchronous process() method gets executed for every frame/ndarray. Reducing unnecessary variable instantiations, print functions to the console, or to the image, are a few simple ways you can try and optimize your code and minimize lag. However, some filters simply demand high computational load, especially if required to send frames back and to another program through either ZMQ (like for our adapted OpenFace filter) or Spout (Like to communicate to Touch Designer or Unity on Windows). Other than the obvious of using a more powerful computer, we will link some reports about methods we used that could potentially inspire a solution for your needs (todo: link Norma’s and Aykut’s thesis). It could also be a network connection (TODO: link to latency test tool)
This section outlines steps to implement a custom group filter depending on your intended group filter functionality. Some steps are added to ensure a good project structure. The necessary changes are also commented in the code with TODO.
- In
backend/group_filters
, copy and paste the template folder to create a new group filter module (folder with__init__.py
). Choose a descriptive and unique name for your group filter, not yet being used by other group filters. Rename the folder to this name, the name convention should be lowercase and underscore separated if multiple words. Note: If not using the template to create a new folder, make sure to extendGroupFilter
to implement a custom group filter (i.e. from group_filters import GroupFilter).
- Rename the .py file to what you intend your group filter to do. The naming convention is that it describes ‘process_group_filter.py’ where process is the task the group filter is supposed to perform.
- Change the group filter class name to your group filter name in camel case (i.e. TemplateGroupFilter - no spaces and the first letter of each word is capitalized). Group filters should be in the group_filters directory.
- Import the group filter class implemented in step 1 in
backend/group_filters/*new_module*/__init__.py
, e.g.from .new_file_name import NewGroupFilterClass
-
Before implementing your group filter's functionality, some required changes are needed in the
name
,type
,channel
, anddefault_config
static functions.name
: change this function to return a unique name of your group filter as it will appear in the frontend. The name needs to be unique as it is used to identify the group filter in the backend. The naming convention is to have it be all caps without spaces.type
: change this to either SESSION, TEST or NONE. A SESSION group filter enables the group filter both during the lobby room and the main experiment page once the experiment is started. (most likely you are implementing a SESSION group filter). A TEST group filter is a group filter that only runs during the lobby page.channel
: change thechannel
attribute to either “video”, “audio”, or “both” depending on which channel your group filter will be processing to analyze/manipulate the stream of.default_config
: If your group filter needs to have some parameters/options/or inputs when applying it to an experimenter, you have the option of creating input fields through this config. Currently we offer a drop down selection of strings and a number/integer text input. After linking and defining the default configs here, the frontend elements will automatically be created. This function is also responsible for sending the whole group filter structure (i.e. name, type, groupFilter, channel, config) to be displayed and included in the frontend. -
Import the group filter class implemented in step 5 in
backend/group_filters/__init__.py
e.g. placefrom .new_file_name import NewGroupFilterClass
after where all other subclasses of group filter are imported in this init.py file. -
Celebrate! You have just added a group filter you can use in experiments to the experimental-hub! You can now implement your group filter's logic in the
process_individual_frame
,align_data
andaggregate
functions along withdata_len_per_participant
andnum_participants_in_aggregation
attributes.
- `process_individual_frame` function is responsible for the individual data extraction step before the aggregation. It takes a media frame from a participant and performs the data extraction from the frame. - `align_data` function is a static function which is called by the group filter aggregator in the internal group filter runtime mechanism of the experimental-hub. It performs the data alignment of participants in the aggregation before the aggregation. - `aggregate` function is also a static function which is called by the group filter aggregator. It performs the aggregation step after the data is aligned for participants joined the aggregation. - `data_len_per_participant` attribute defines the number of data points per participant required in the aggregation. 0 means all historical data is going to be used in the aggregation. - `num_participants_in_aggregation` attribute defines the number of participants joining the aggregation at a time. This is also the minimum number of participants required to perform the aggregation. "all" means all the participants in an experiment will join the aggregation; otherwise num_participants_in_aggregation-wise combinations are calculated and the aggregation is performed for each combination.
Would you like to make a contribution of your group filter to the main branch, please see our guidelines.
If the experimenter wants to dynamically change values of a group filter from the frontend when applying a group filter, optional variables can be set in the default_config
to dynamically add a string dropdown or integer text field. If no inputs are required for your group filter, skip this section. For more details, please check Custom Config under Filters section of this page
A simple filter does not need to access outside data, including other filters. It simply processes the frames it receives without additional data.
As an example, we will take a filter adding a simple text to a video track using opencv.
# ... Imports
class SimpleExampleFilter(Filter):
"""A simple example filter printing `Hello World` on a video Track."""
@staticmethod
def name(self) -> str:
return "SIMPLE_FILTER"
@staticmethod
def filter_type(self) -> str:
return "NONE"
@staticmethod
def channel(self) -> str:
return "video"
async def process(self, _, ndarray: numpy.ndarray) -> numpy.ndarray:
# Parameters for cv2.putText
origin = (50, 50)
font = cv2.FONT_HERSHEY_SIMPLEX
font_size = 1
color = (0, 0, 0)
# Put text on image
ndarray = cv2.putText(ndarray, "Hello World", origin, font, font_size, color)
# Return modified frame
return ndarray
We could extend this filter by adding a variable to the filter config which specify the position, size and contents of the text. This variable can then be dynamically configured by the experimenter.
Filters can access other filters using the audio_track_handler
and video_track_handler
variables of the Filter
base class. The variables contain a reference to the audio and video TrackHandler
respectively.
For this example, we will use a simple buffer filter that buffers the last n frames and another filter that will access the buffer.
# ... Imports
class BufferFilter(Filter):
"""Filter saving the last 60 frames in `frame_buffer`."""
frame_buffer: collections.deque
def __init__(self, ...) -> None:
super().__init__(...)
self.frame_buffer = collections.deque(maxlen=60)
# To run this filter in the muted state:
self.run_if_muted = True
@staticmethod
def name(self) -> str:
return "BUFFER"
@staticmethod
def filter_type(self) -> str:
return "SESSION"
@staticmethod
def channel(self) -> str:
return "audio"
async def process(self, _, ndarray: numpy.ndarray) -> numpy.ndarray:
# Add frame to buffer:
self.frame_buffer.append(ndarray)
# Return unmodified frame:
return ndarray
To access this buffer filter, the second filter needs to know its id. Therefore we need to add a variable to its config, lets call it buffer
.
For this example we will assume that the buffer is a video buffer and neglect error handling.
# ... Imports
class AnotherExampleFilter(Filter):
"""Filter accessing BufferFilter to execute a secret algorithm."""
_frame_buffer: BufferFilter | None = None
async def complete_setup(self) -> None:
# Get frame buffer
buffer_filter_id = self._config["config"]["filterId"]["value"]
buffer_filter = self.video_track_handler.filters[buffer_filter_id]
self._frame_buffer = buffer_filter
@staticmethod
def name(self) -> str:
return "ANOTHER"
@staticmethod
def filter_type(self) -> str:
return "SESSION"
@staticmethod
def channel(self) -> str:
return "video"
@staticmethod
def default_config(self) -> dict:
return {
"filterId": {
"defaultValue": ["buffer"],
"value": "buffer", # <-- this will be later replaced by the frontend with the correct id
"requiresOtherFilter": True
},
},
}
async def process(self, original: VideoFrame, ndarray: numpy.ndarray) -> numpy.ndarray:
# Execute algorithm that requires the data from the other filter using
foo(frame_buffer=self._frame_buffer)
return ndarray
This is a simple template group filter which:
- extracts the mean of video frames from each participant in the individual data extraction step,
- aligns individual data with 1D-linear interpolation,
- calculates and reports the standard deviation of the video frame means in the aggregation step.
data_len_per_participant
is 1 since only one video frame is required at a time. num_participants_in_aggregation
is 2 for demonstration purposes in this example group filter.
# ... Imports
class TemplateGroupFilter(GroupFilter):
"""
A simple template group filter which applies the followings:
- takes the mean of each frame on a video track of a participant at the individual
frame processing step. This part runs in the track handler of the corresponding
participant.
- aligns the data collected at the aggregator for 2 participants before the
aggregation step. This part runs in the corresponding aggregator.
- takes the standard deviation of aligned data of 2 participants at the
aggregation step. This part runs in the corresponding aggregator.
Can be used as a template to copy when creating a new group filter.
"""
data_len_per_participant = 1 # data required for aggregation
num_participants_in_aggregation = 2 # number of participants joining in aggregation
def __init__(self, config: FilterDict, participant_id: str):
super().__init__(config, participant_id)
@staticmethod
def name() -> str:
# TODO: Change this name to a unique name.
return "TEMPLATE_GF"
@staticmethod
def type() -> str:
# TODO: change this according to your filter type (SESSION, TEST or NONE)
return "NONE"
@staticmethod
def channel() -> str:
# TODO: change this according to your filter channel (video, audio, both)
return "video"
@staticmethod
def default_config() -> dict:
# TODO: change this according to your filter config
return {
# example of how a filter config can look like
# add or delete this
# This would show that there is a string variable (direction) which can have different
# values and another int variable (size)
# in the frontend, we would then have either a dropdown (direction) or input (size)
# The values can be changed and sent back to the backend
#
#
# "direction": {
# "defaultValue": ["clockwise", "anti-clockwise"],
# "value": "clockwise",
# "requiresOtherFilter": False,
# },
# "size": {
# "min": 1,
# "max": 60,
# "step": 1,
# "value": 45,
# "defaultValue": 45,
# },
}
async def process_individual_frame(
self, original: VideoFrame | AudioFrame, ndarray: np.ndarray
) -> Any:
# TODO: Change this to implement the individual frame processing step.
return ndarray.mean()
@staticmethod
def align_data(x: list, y: list, base_timeline: list) -> list:
# TODO: Change this to implement an alignment function.
# Needs to be implemented as a static method.
interpolator = interp1d(x, y, kind="nearest", fill_value="extrapolate")
return list(interpolator(base_timeline))
@staticmethod
def aggregate(data: list[list[Any]]) -> Any:
# TODO: Change this to implement the aggregation step.
# Needs to be implemented as a static method.
np_data = np.array(data)
return np_data.std(axis=0)[0]
This is the real-time synchrony score implementation as a group filter in the experimental-hub. The synchrony score is calculated with OASIS algorithm. Below the UML diagram is available and the detailed implementation can be found under backend/group_filters/sync_score
folder.