Basic Walkthrough: Tasks & Flows - leonoel/missionary GitHub Wiki

Basic walkthrough: Tasks & Flows

Introduction

This walkthrough will introduce the Missionary API concepts of task and flow.

Follow along by starting a repl:

clj

clj -Sdeps '{:deps {missionary/missionary {:mvn/version "b.31"}}}'

cljs

clj -Sdeps '{:deps {org.clojure/clojurescript {:mvn/version "1.11.60"} missionary/missionary {:mvn/version "b.31"}}}' -M -m cljs.main

In a repl session, require the Missionary library:

(require '[missionary.core :as m])

Task

A Missionary task is a value representing an action to be performed.

Use various API functions to create tasks.

(m/sleep 800) ; a sleep task
(m/timeout (m/sleep 1000) 800) ; a timeout task

Use a sequential process block to create a task from a body of forms.

(m/sp (println "one") :two)

Compose tasks, inside a process block, by running them and waiting on their values with ?.

(m/sp (println "Let's take a nap...")
      (str (m/? (m/sleep 900 "Hi "))
           (m/? (m/sleep 100 "there!"))))

A task is not executed when created, it should be explicitly started. Because a task has no identity the same task can be run an arbitrary number of times. Each time the underlying action will be performed and may produce different results.

A Missionary task is implemented as described at leonoel/task. It is a function which can be asynchronously executed when provided with a success continuation function and an error continuation function. It then returns a function which can be used to cancel the task execution.

Pseudo-code description:

task is (fn [success-continuation error-continuation]) which returns cancel-fn
continuation is (fn [value] ...)
cancel-fn is (fn [])

Asynchronously run a task by invoking it and use continuation functions to process a successful or failing result.

((m/sp "world") #(println "Hello" %)
                #(println :KO %))  ; (on stdout) Hello world

Cancel a task using its cancel function.

(def a-task (m/sleep 15000 :done))
(def cancel (a-task #(println :ok %) (fn [_] (println :KO))))
(cancel)  ; (on stdout) :KO

Alternatively, if the host platform supports blocking thread, a task can be executed using the ? function outside of any process block.

clj only

(m/? (m/sp :hello)) ; => :hello

clj only

Task blocking on IO or taking a lot of CPU time can be created with via to be executed on an OS thread.

(m/via m/blk (Thread/sleep 5000) :done)
(m/via m/cpu (+ 1 1))

Task Examples

Create two tasks and read them values sequentially.

clj

(let [v1 (m/? (m/sp "hi"))
      v2 (m/? (m/sp "there"))]
  (printf "Read %s from %s%n" v1 v2))

Create two tasks and read them values asynchronously.

clj

(let [[v1 v2] (m/? (m/join vector
                           (m/sp "hi")
                           (m/sp "there")))]
  (printf "Read %s from %s%n" v1 v2))

Flow

A Missionary flow is a value representing a process able to produce an arbitrary number of values, at any point in time, before terminating.

Use various API functions to create, compose or transform flows.

(m/seed [1 2 3])
(m/zip vector (m/seed (range 3)) (m/seed [:a :b :c]))
(m/eduction (map inc) (m/seed [1 2 3]))

Use ambiguous process block to create a flow from a body of forms.

In an ambiguous process block, various functions can fork the process on arrival of a new value from a flow. See documentation for ?> and ?<.

(m/ap (println (m/?> (m/seed [1 2]))))

Flows are not executed/consumed at creation. You must execute a task to consume them. Use the reduce API function to define a task from a flow.

clj

(let [a-flow (m/seed (range 4))
      a-task (m/reduce conj a-flow)]
  (m/? a-task)) ; => [0 1 2 3]

Tip: If a flow generates side-effects, drain it with reduce and (constantly nil).

clj

(m/? (m/reduce
      (constantly nil)
      (m/ap (println "Hi" (m/?> ##Inf (m/seed (range 20)))))))

Flow Examples

Produce 1000 values asynchronously and read them as soon as they are available.

clj

(let [begin (System/currentTimeMillis)
      ;; create a flow of values generated by asynchronous tasks
      inputs (repeat 1000 (m/via m/cpu "hi")) ;; a task has no identity, it can be reused
      values (m/ap
               (let [flow (m/seed inputs)     ;; create a flow of tasks to execute
                     task (m/?> ##Inf flow)]  ;; from here, fork on every task in **parallel**
                 (m/? task)))                 ;; get a forked task value when available

      ;; drain the flow of values and count them
      n (m/? ;; tasks are executed, and flow is consume here!
         (m/reduce (fn [acc v]
                     (assert (= "hi" v))
                     (inc acc))
                   0 values))]
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))