Filters & Group Filters - TUMFARSynchrony/SynthARium GitHub Wiki

Contents

This page gives an overview of the concept of what we refer to as Filters and Group Filters in our platform, some technical overview of their implementation, and how to implement your own.

Filter Definition & Capabilities

Filters are audio or video manipulators and/or analyzers. They extend the abstract Filter class and implement its process function, which receives each frame transmitted by a client. Filters are initiated according to the Filter in Participant Data.

Capabilities

A single filter can:

  • Edit and or analyze audio or video
    • A filter is either a audio or video filter. When a filter concept requires audio and video input, two filters are required. This can be as simple as one filter buffering the frames and the other filter simply using the buffer. See Accessing other Filters for this example
    • The modification and analysis is not limited in any specific way. It can e.g. include ML, libraries like OpenFace, ...
  • Store data
    • E.g. frame counter, analysis results, frame buffer, ...
  • Access data from other filters
  • Use the FilterAPI to access data and functionality from any other part of the server, including:
    • Sending data to the client
    • Other, not yet implemented functionality can include: accessing and modifying session data, interacting with users (kicking, banning, muting, chatting, ...) or experiments (creating, starting, stopping, ...)

Pipeline

Each frame received by the participant is passed through the filter pipeline. The pipeline then executes the filters in order, so the next filter always receives the output frame from the previous filter. Therefore, the order in which filters are configured can have an impact on the output. Finally, the final output frame is returned and distributed to the clients.

Simplified Filter Pipeline

Simplified Filter Pipeline

In case the track is muted, only filters with the run_if_muted variable set to True are executed. This saves resources, as filters only modifying the frame have no effect on the muted frame and can therefore be skipped. However, analysis on the input frame is still possible.

Detailed Filter Pipeline

Detailed Filter Pipeline

Filter Class

Each filter must extend the Filter class located in backend/filters/filter.py.

This base class provides basic functionality, only the process, name, type, channel and get_filter_data methods are to be implemented. However, some methods are intended to be overwritten for more extensive filters.

The following list gives a short overview over important functions. Please take a look at the code documentation for more details.

  • audio_track_handler and video_track_handler variables: TrackHandlers for the stream a Filter is part of. If the filter is used for video tracks, it is handled by video_track_handler, otherwise audio_track_handler. Both can be used for direct access to other audio or video filters for the same stream / user.
  • run_if_muted variable: bool indicating if TrackHandler should execute this filter even if the track is muted. Default False.
  • async set_config(config: FilterDict) -> None: setter for config, called if a SET_FILTERS request updates an existing filter (same id and type). Can be overwritten in case the filter should react to config changes
  • async def complete_setup(self) -> None: called after setup of all filters in the pipeline is complete. Can be used to access other filters (other filters may not be initialized when a filter is initialized itself) or complete asynchronous setup tasks
  • async def cleanup(self) -> None: cleanup function called if a filter is no longer needed. If the filter created asyncio Tasks, they should be stopped and awaited in cleanup.
  • async def get_filter_data(self) -> None | FilterDataDict: called via API. Returns the data of a filter, e.g. if a test has passed or interesting values that are interesting for a experimenter
  • @staticmethod def name() -> str: provide name of filter class
  • @staticmethod def id() -> str: provide id of filter
  • @staticmethod def filter_type() -> str: provide type of the filter (e.g. TEST or SESSION)
  • @staticmethod def channel() -> str: provide channel of filter (video, audio or both)
  • @staticmethod def default_config() -> dict: provide the default config (by default: empty dictionary)
  • async def process(self, original: VideoFrame | AudioFrame, ndarray: numpy.ndarray) -> numpy.ndarray: process incoming audio or video frames, depending on the type. It receives the original input frame for reference and meta data and a parsed numpy ndarray. The TrackHandler parses the input frame into a ndarray for easy access in the filters and passes it through the pipeline.
  • @staticmethod def validate_dict(data) -> TypeGuard[FilterDict]: validate if data is of type FilterDict, or, in special cases, a subclass of FilterDict

Filter Architecture (Dynamic Filters)

Please make sure to add all required functions to a Filter Class as described in Implementing a Filter.

The filters configurations are automatically fetched and stored in a JSON object. The frotend can request the JSON via an API call GET_FILTERS_CONFIG. The steps are as following:

  1. Start Backend
  2. Create new experiment
  3. Filter Data is fetched via API
  4. In backend: It creates the filter_data JSON with two sub-fields (SESSION and TEST) and returns the data
  5. In detail: It loops over all Filter Subclasses
    1. checks of what type the filter is -> SESSION, TEST or NONE
      1. NONE -> ignored
      2. TEST/SESSION -> adds filter to specific field
    2. gets the filter_json of each filter by collecting the required values from each filter
    3. validates if filter_json is correct
  6. Experimenter can add new participants with dynamic filters

The filter_data JSON looks like:

{
  "TEST": [],
  "SESSION": [
	  {
		  "name": "ROTATION",
		  "id": "rotation",
		  "channel": "video",
		  "groupFilter": false,
		  "config": {
			  "direction": {
				  "defaultValue": ["clockwise", "anti-clockwise"], --> rendered as dropdown list
				  "value": "clockwise",
				  "requiresOtherFilter": false
			  },
			  "angle": {
				  "min": 1,
				  "max": 180,
				  "step": 1,
				  "value": 45,
				  "defaultValue": 45 --> rendered as input value
			  }
		  }
	  }
  ]
}	

Existing Filters

The list of existing filters available at the experimental-hub.

  • Delay
  • Edge outline
  • Glass detection
  • Mute
  • OpenFace AU
  • Rotate
  • Speaking time

Group Filter Definition & Capabilities

Group Filters are the extended version of filters to analyze and/or manipulate multiple audio/video streams from different participants simultaneously. Group Filters extend the abstract base GroupFilter class and implement their implementations for process_individual_frame, align_data, and aggregate functions. process_individual_frame function is responsible for individual data extraction from individual media frames, the align_data function handles the data alignment based on time before executing the aggregation step with aggregate function. Group filters are initiated according to the GroupFilter in Participant Data.

Group Filter Capabilities

A single group filter can:

  • Edit and/or analyze audio or video of each participant individually and at the group settings
    • A group filter is either a audio or video group filter. When a group filter concept requires audio and video input, two filters are required.
    • The modification and analysis is not limited in any specific way. It can e.g. include ML, libraries like OpenFace, ...
  • Store data
    • E.g. frame counter, analysis results, frame buffer, ...
  • Access data from other filters
  • Use the FilterAPI to access data and functionality from any other part of the server, including:
    • Sending data to the client

Group Filter Pipeline

Each participant frame received is passed through the group filter pipeline. The pipeline then executes the group filters steps in order. The individual data extraction step is responsible for data extraction per participant and sharing this data with the aggregator, so the aggregation step can be performed after participant data is aligned with the user-defined function. Finally, the final group filter output is returned and distributed to the clients.

Group Filter Pipeline

Group Filter Pipeline

Group Filter UML

Group filter implementation consists of 2 main modules as shown in the UML diagrams: Group Filter and Group Filter Aggregator

Simple Group Filter UML

Simple Group Filter UML

Group Filter UML

Group Filter UML

Group Filter Class

Each group filter must extend the GroupFilter class located in backend/group_filters/group_filter.py.

This base class provides basic functionality, only the process_individual_frame, align_data, aggregate, name, type, and channel methods are to be implemented. However, some methods are intended to be overwritten for more extensive filters.

The following list gives a short overview over important functions. Please take a look at the code documentation for more details.

  • _config: FilterDict: Group filter configuration
  • _logger: logging.Logger: Logger
  • _context: zmq.Context | None: ZeroMQ context for socket connections
  • _socket: zmq.Socket | None: ZeroMQ socket to send data to the aggregator
  • is_socket_connected: bool: Attribute to keep track of ZeroMQ connection on the data socket with the aggregator
  • _result_socket: zmq.Socket | None: ZeroMQ socket to receive data from the aggregator
  • is_result_socket_connected: bool: Attribute to keep track of ZeroMQ connection on the result socket with the aggregator
  • __aggregation_results: dict[str, Any]: Aggregation results
  • __line_writer: SimpleLineWriter: Line writer to print the aggregtion results on the frame
  • data_len_per_participant: int = 0: Minimum number of data required per participant to start aggregation step
  • num_participants_in_aggregation: int = 2: Number of participants joining the aggregation step. Use "all" to include all participants in the aggregation
  • connect_aggregator(self, data_port: int, result_port: int) -> None: called during group filter initialization to connect to the aggregator with ZeroMQ sockets for both data sahring and result gathering
  • set_config(config: FilterDict) -> None: setter for config, called if a SET_GROUP_FILTERS request updates an existing group filters (same name and type). Can be overwritten in case the filter should react to config changes
  • async complete_setup(self) -> None: called after setup of all group filters in the pipeline is complete. Can be used to complete asynchronous setup tasks
  • async cleanup(self) -> None: cleanup function called if a group filter is no longer needed. If the group filter created asyncio Tasks, they should be stopped and awaited in cleanup.
  • async get_filter_data(self) -> None | FilterDataDict: called via API. Returns the data of a filter, e.g. if a test has passed or interesting values that are interesting for a experimenter
  • @staticmethod name() -> str: provide name of group filter class
  • @staticmethod type() -> str: provide type of group filter (e.g. TEST or SESSION)
  • @staticmethod channel() -> str: provide channel of group filter (video, audio or both)
  • @staticmethod default_config() -> dict: provide the default config (by default: empty dictionary)
  • @staticmethod validate_dict(data) -> TypeGuard[FilterDict]: validate if data is of type FilterDict, or, in special cases, a subclass of FilterDict
  • async process_individual_frame_and_send_data_to_aggregator(self, original: VideoFrame | AudioFrame, ndarray: numpy.ndarray, ts: float) -> numpy.ndarray: used by the internal runtime logic for group filters, DO NOT override. Calls the process_individual_frame function of the group filter to perform individual data extraction step and sends the individual data to the aggregator.
  • async process_individual_frame(self, original: VideoFrame | AudioFrame, ndarray: numpy.ndarray) -> numpy.ndarray: process incoming audio or video frames, depending on the type. It receives the original input frame for reference and meta data and a parsed numpy ndarray and performs the individual data extraction step per participant.
  • @staticmethod async align_data(x: list, y: list, base_timeline: list) -> list: Aligns individual data collected from the participants joined the aggregation. Called by the aggregator.
  • @staticmethod async aggregate(data: list[list[Any]]) -> Any: Performs the aggregation for the participants joined the aggregation step. Called by the aggregator.

Group Filter Aggregator Class

Group filter aggregator is used by the internal runtime logic of the experimental-hub. It is not meant to be overridden, please DO NOT override.

The following list gives a short overview over important functions. Please take a look at the code documentation for more details.

  • _logger: logging.Logger: Logger
  • _task: asyncio.Task: asyncio task which running the aggregator
  • _context: zmq.Context: ZeroMQ context for socket connections
  • _socket: zmq.Socket: ZeroMQ socket to receive data from group filters
  • is_socket_connected: bool: Attribute to keep track of ZeroMQ connection on the data socket with group filters
  • _result_socket: zmq.Socket: ZeroMQ socket to send aggregation result to group filters
  • is_result_socket_connected: bool: Attribute to keep track of ZeroMQ connection on the result socket with group filters
  • _kind: Literal["video", "audio"]: Channel type of the aggregator
  • _group_filter: GroupFilter: Group filter associated with the aggregator
  • _data: dict[str, Queue[tuple[float, Any]]]: Individual data collected from participants to perform aggregation
  • set_task(self, task: asyncio.Task) -> None: Setter for the _task attribute for self-destruction later
  • async cleanup(self) -> None: cleanup function called if a group filter is no longer needed
  • delete_data(self) -> None: Deletes data from _data to reset the individual data collected
  • add_data(self, participant_id: str, t: float, data: Any) -> None: Adds data to _data after receiving from the group filters
  • align_data(self, participant_ids: tuple[str], data: dict[str, list[tuple[float, Any]]]) -> list[list[Any]] | None: Aligns the data by calling the static GroupFilter.align_data function
  • run(self) -> None: Runs the aggregator in a asyncio.task with infinite loop. Performs the individual data collection from group filters, and aggregation step by calling the static GroupFilter.aggregate function after the data is aligned

Existing Group Filters

The list of existing group filters available at the experimental-hub.

  • Synchrony Score