Continuous flows - leonoel/missionary GitHub Wiki

From reference types

Use watch.

(def !input (atom 0))

(def input-value (m/watch !input))

Integrated from discrete events

Use reductions with appropriate seed and reducing function, and relieve with {} to discard oldest values.

(def click-events
  (m/observe
   (fn [!]
     (.addEventListener js/window "click" !)
     #(.removeEventListener js/window "click" !))))

(def click-count
  (->> click-events
    (m/reductions (fn [r _] (inc r)) 0)
    (m/relieve {})))

Derived from other continuous flows

Use latest.

(m/latest vector input-value click-count)

Snapshots

Flows don't implement deref, but you can derive another flow to extract the value at subscription time.

(def current-click-count (m/eduction (take 1) click-count))

Sharing

Make sure sharing happens in a reactor context and use signal!.

(m/reactor
  (let [>x (m/signal! input-value)
        >twice-x (m/signal! (m/latest + >x >x))]
    ,,,
  ))

Get successive changes

Pass the flow to any operator expecting a discrete flow.

(m/reduce (fn [_ c] (println (str "clicked " c " times."))) nil click-count)