CellProfiler Concurrency Model - CellProfiler/CellProfiler GitHub Wiki

Multiprocessing and Threading in CellProfiler

CellProfiler and PyZMQ

CellProfiler uses the zeromq library (via PyZMQ) to manage communication between workers (sub processes) and threads when in Analysis mode.

This page documents how the analysis interface works. This is primarily intended for developers, so some knowledge of Python and software engineering is assumed.

Test Mode

When a pipeline runs in test mode, all the work is carried out on the local thread. This system is completely separate from the Analysis mode execution setup. The latter instead sends work to dedicated processes called workers so that jobs can run in parallel. It's important to validate workflows in both Test and Analysis mode when developing for CellProfiler.

Analysis Mode

CellProfiler uses a number of threads to manage analysis mode and it's associated worker processes. The general structure is as follows:

Main Process (1)

Analysis()
└─Runner()
  ├─[Queue] work_queue
  │         (Jobs to do)
  ├─[Queue] in_process_queue
  │         (Jobs running)
  ├─[Queue] finished_queue
  │         (Jobs completed)
  ├─[Queue] recieved_measurements_queue
  │         (Data returned from workers)
  ├─[Thread] boundary.spin
  │         > Handles zmq message i/o
  │         > Translates worker request messages into request queue objects
  │         > Sends reply messages to workers
  │         > Tells workers to keep running
  ├─[Thread] interface
  │         > Manages list of jobs to do. (Runner's queues)
  │         > Collates returned measurements
  │         > Determines completion progress of analysis
  │         > Logs progress
  ├─[Thread] jobserver
  │         └─ [Queue] request_queue
  │                    (Worker requests)
  │         > Handles worker requests e.g. send pipeline
  │         > Prepares replies
  │         > Sends replies to boundary for transmission
  └─[Thread] run_logger (1 per worker)
            > Monitors worker STDOUT
            > Copies messages to native process logger

Worker Process (n)

Main()
└─Worker()
  ├─[Thread] monitor_keepalive
  │         > Checks for periodic keepalive signal.
  │         > Shuts down worker on STOP signal
  │         > Shuts down worker after 15s no signal.
  ├─[Thread] worker.run
  │         > Runs analysis jobs
  │         > Executes pipeline on a single image set at a time
  │         > Each request also checks for STOP signal.
  └─[Thread] KnimeBridgeServer
            > Supposedly handles knime requests
            > Actually doesn't ever run.
            > Update and restore functionality someday?

ZMQ Interface

As the schema above might suggest, there's a lot more back and forth than merely "do this image set and send back the results". CellProfiler has to handle exceptions, interactive modules, initial measurements and a bunch of other things. The key takeaway is that Boundary acts as the server for handling actual inter-process comms. Yes, it's surprisingly NOT jobserver - that thing's role is to do the legwork to respond to messages and then give any reply back to boundary. This is intended to keep the boundary.spin thread as free as possible, which is very important. You could say that jobserver is responsible for dishing out jobs via the Work request object, but transmission of the job is done on boundary.spin.

While inter-thread communication is quite straightforward, inter-process communication is complicated by the fact that workers are started as totally independent processes via subprocess.Popen. The MultiProcessing module could potentially make things cleaner and define child processes, but when this was last tried (CP3?) it didn't play well with PyInstaller's packed executables.

The ZMQ module allows us to send byte messages between processes and/or threads. CellProfiler has a dedicated Communicable class designed to package requests and replies into a transmissable format with the required addressing to reach a specific worker process. This is achieved using a series of sockets for different types of communication.

The uninitiated will probably want to read some of the ZMQ docs because it's a huge rabbit hole, but the basics are as follows: PUB sockets broadcast messages which SUB sockets collect without confirmation. REQ sockets send requests and expect a reply, the ROUTER socket (a fancy REP) collects requests and sends replies. We use ROUTER so that the replies can be asynchronous rather than being answered in order. PAIR is a direct connection between two locations. When you see POLLER, this is a wrapper that allows us to check for messages on a bunch of sockets at once. Calling socket.recv would normally block until something arrives and return the message, but calling poller.poll instead tells us which wrapped socket has a message waiting to be received.

Another thing - boundary.spin is more complicated than you might assume. It's constantly looping but has two phases to each loop: It first polls for messages from the ZMQ system and then converts them into queue Request objects for the other threads. Each time ZMQ messages are detected the spin thread also checks the outgoing message queue and sends anything in there. This means that the spin thread always has to be sent a message before it'll send anything in the queue (as it'll block on the ZMQ poll stage until something arrives). The workaround for this was to have a seperate message socket on the main thread which is used to poke the bear whenever those queues have something added to them. What the message says is irrelevant (it's actually "WAKE UP!"), it's only function is to trigger the rest of spin's loop to run.

Below is a map of the ZMQ sockets used by CellProfiler. As a rule a socket should only be used by one thread, which is why some names appear twice. We're going to pretend that Knime bridge isn't a thing because it doesn't actually do anything right now.

Main Process (1)

Runner()
└─Boundary()
   ├─[zmq.PUB] keepalive_socket
   │           > Sends periodic GO signals to workers.
   │           > Sends STOP message to shut down workers.
   ├─[zmq.SUB] selfnotify_socket (spin thread)
   │           > Wakes up .spin when there's work to do.
   ├─[zmq.PUB] selfnotify_socket (main thread)
   │           > Used when the main program added to a queue.
   │           > Pokes the spin thread out of polling mode
   └─[zmq.ROUTER] request_socket
               > Captures request messages from workers.

Worker Process (n)

Main()
  │└─[zmq.PAIR] deadman_start
  │            > Establishes that the monitor started OK.
  ├─monitor_thread
  │ ├─[zmq.PAIR] deadman_start
  │ │          > Notifies main thread when monitor is set up.
  │ └─[zmq.SUB] keepalive_socket
  │            > Listens for GO/STOP pulse from main process.
  └─worker_run
    ├─[zmq.REQ] work_socket
    │          > Sends requests to main process.
    └─[zmq.SUB] keepalive_socket (filtered)
               > Listens only for STOP pulse from main process.
   

There actually used to be much more complexity. In CP5 a ZMQ Context (socket system) is established for each analysis and terminates at the end of a run. This makes sure that analysis runs don't conflict with eachother and sockets close cleanly. On creation of a new Runner the comms ports are randomised, so you should be able to run multiple copies of CellProfiler without problems.

Schematic

The following is a diagram attempting to provide an overview of the components (objects, threads, processes), and communication structure (zmq sockets, native python synchronization primatives) described above. It is recent as of March 1, 2024, or commit SHA: ea6a2e6d001b10983301c10994e319abef41e618.

CP_Concurrency_Model