Task interop - leonoel/missionary GitHub Wiki

Blocking

Block the current thread

Just pass a task to ?.

(m/? (m/sleep 200 :ok))   ;; blocks the thread for 200ms and returns :ok

Turn a blocking call into a task

Assign an executor to the blocking call using via. cpu and blk are provided, respectively for cpu-bound and io-bound.

(m/via m/blk (slurp "https://clojure.org"))
(m/via m/cpu (mine-crypto))

Non-blocking

Tasks to callbacks

The task protocol is callback-based, so you can call any task as function to interop with a callback-based API.

Futures, Promises

The javascript promise constructor is compatible with tasks. Just pass the task as argument, the task will run and resolve the promise on completion.

(defn promise! "Runs given task and returns a promise completing with the result of this task" [t]
  (js/Promise. t))

Java's CompletableFuture requires a bit more glue code :

(defn cf! "Runs given task and returns a CompletableFuture completing with the result of this task."
  [t]
  (let [cf (java.util.concurrent.CompletableFuture.)]
    (t #(.complete cf %) #(.completeExceptionally cf %))
    cf))

Keep in mind, you can't cancel the task process after it's been wrapped in a future !

core.async channels

The pattern is the same, but core.async channels have no error semantics so you'll need to wrap the result into a product type.

(require '[clojure.core.async :as a])

(defn chan! "Runs given task and returns a core.async channel eventually filled with a thunk of the result." [t]
  (let [c (a/chan)]
    (t #(a/put! c (fn [] %)) #(a/put! c (fn [] (throw %))))
    c))

Callbacks to tasks

If an API expects a one-shot callback, in many cases you can pass it a dfv and use it as a task to await the result.

core.async channels

(require '[clojure.core.async :as a])

(defn >! "Puts given value on given channel, returns a task completing with true when put is accepted, of false if port was closed."
  [c x] (doto (m/dfv) (->> (a/put! c x))))

(defn <! "Takes from given channel, returns a task completing with value when take is accepted, or nil if port was closed."
  [c] (doto (m/dfv) (->> (a/take! c))))

Futures, Promises

To handle error status, you can wrap the result in a thunk and use absolve to redirect errors.

(defn await-promise "Returns a task completing with the result of given promise"
  [p]
  (let [v (m/dfv)]
    (.then p #(v (fn [] %)) #(v (fn [] (throw %))))
    (m/absolve v)))
(defn await-cf "Returns a task completing with the result of given CompletableFuture."
  [^java.util.concurrent.CompletableFuture cf]
  (let [v (m/dfv)]
    (.handleAsync cf
      (reify java.util.function.BiFunction
        (apply [_ r e]
          (v #(if (some? e) (throw e) r)))))
    (m/absolve v)))