Select a theme

Default - Sky - Beige - Simple - Serif - Night
Moon - Solarized

core.async

Clojure take on async programming

Created by narkisr / @narkisr

  • Clojure intro
  • Why async?
  • core.async walkthrough
  • core.async on JS

Clojure

  • LISP dialect
  • Concurrency tamed
  • Meta programmable

Calling

 
; prefix notation
(+ 1 1) -> 1 + 1 ;

; call nesting
(println (+ 1 2 )) ->  println(1 + 2);

; interop
(.length "abc")
	    

Data structures

 
; vector
[1 2 3]

; list 
'(1 2 3)

; map 
{1 2}
	     

Defing/Scoping

 
; define a value
(def a 2)

(defn -main [name]
  (println "hello " name))

(defn scope []
  (let [a 1 b 2]
    (println a b))) ; 1 2 

	    

Control sturctures

 
(while true
  (println 1))

(if (= a 2) 
   (println "its two")
   (println "not two"))

(when foo (println "im foo"))

(loop [i 0]
 (if (= i 10)
   (println "got to 10!")
   (recur (inc i))))



	     

Why async?

  • Better use of threads
  • We have to (JS)
  • Coordination

core.async

  • Same model used in Go
  • Just a library
  • Targets JVM/JS

Callbacks

	    
; many to many channel
; can serve many puts/takes
(def c (chan))

; nil since its empty
(take! c (fn [v] (println v)))

; triggers the print 
(put! c "hello world")
            

But we are in callback hell!

Blocking put/take

 
(def c (chan))

;<!! means: < writes into, !! blocks and not transaction safe
(future (println "done" (>!! c 42)))

; once delivered prints 42
(future (println "Got!" (<!! c)))

; same as future but returns a channel
(println "It works!" (<!! (thread 42)))
          

But!

A thread is still required

Go macro

  • A logical thread
  • Turns code to a state machine
  • Inspects channel operations
  • Parks blocking operations
  • Un-Parks when ready to run

Go

	    
; returns a many to many channel
(go 42)

(<!! (go 42))

; notice the use of <! vs <!!
(<!! (go (println "It works!" (<! (go 42)))))
         
	    

Park vs Block

  • <!! >!! block a thread from a fixed thread pool
  • <! >! multiplexes a logical thread across blocking calls

Buffered channels

  • Back pressure mechanism
  • Producer/Consumer sync

Buffered ..

 
(def fbc (chan 1)) 

(go (>! fbc 1)
    (println "done"))

(go (>! fbc 2)
    (println "done")) 

(<!! fbc)
(<!! fbc) 
	    
	    

Dropping buffers

  • Don't block
  • Discard messages

Dropping ..

 
; sliding-buffer drops from start
(def fbc (chan (dropping-buffer 1)))

(go (>! fbc 1)
    (println "done"))

(go (>! fbc 2)
    (println "done")) 

; we will get only 1 (2 was thrown away)
(<!! fbc)
	     

Closing

 
(def c (chan))

(close! c)
    
; we get back nil
(<!! c)
	    
	    

Real world example

Tinymasq

 
(def lookups (chan (dropping-buffer 100)))
(def answers (chan (dropping-buffer 100)))

(defn accept-loop []
  (go 
    (while true
      (let [pkt (packet (byte-array 1024))]
        (.receive @udp-server pkt)
        (>! lookups pkt)))))
	   

Processing

 
(defn process-loop []
  (go 
    (while true
      (let [pkt (<! lookups) 
            message (Message. (.getData pkt)) 
            record (.getQuestion message) 
            host (.toString (.getName record) false)
            ip (get-host (normalized-host host))]
        (when ip
          (.addRecord message (record-of host (into-bytes ip)) Section/ANSWER))
        (.setData pkt (.toWire message))
        (>! answers pkt)))))

        

Reply

 
(defn reply-loop []
  (go
    (while true
     (let [pkt (<! answers)] 
       (.send @udp-server pkt))))) 
           

Multiplexing

 
(def a (chan))
(def b (chan))

(put! a 42)                    

; will return [42 channel-with-response]
(alts!! [a b])  

(alts!! [a :default :meh]) 
	     

Timeout

 
(<!! (timeout 1000))
  
; [nil timeout-channel] 
(alts!! [a (timeout 1000)]) 

; alt with a write no takers so [nil timeout-channel] 
(alts!! [[a 42] (timeout 1000)]) 

	     

Alts ordering

 
(put! a 1)
(put! b 2)

; order will is random (prevent starving)
(alts!! [a b]) 

; we can have priority by ordering
(alts!! [a b] :priority true) 
	     

Mult and Tap

 
(def to-mult (chan 1))
(def m (mult to-mult))

(dotimes [n 4]
  (let [c (chan 1)]
    (tap m c)
    (go 
      (while true
         (when-let [v (<! c)]
           (println "Got! " v)
         (println "Exiting!"))))))

(>!! to-mult 42)
(>!! to-mult 43)

(close! to-mult)
 

Pubsub

 
(def to-pub (chan 1))
(def p (pub to-pub :type))

(let [c (chan 1)]
  (sub p :error c)
    (go 
      (while true
        (when-let [e (<! c)]
   	   (println "got an error" e)))))

(>!! to-pub {:type :error :msg "bad thing"})
(close! to-pub)
 

Clojurescript

  • Clojure compiled to JS
  • Not full Clojure but pretty close
  • core.async supports it
  • No thread just go blocks

10k processes

 
(let [render (render-loop 40)]
  (loop [i 0]
    (when (< i (* width height))
      (go 
        (while true
          ; sleeping for a random time
          (<! (timeout (+ 1000 (rand-int 10000))))
          ; passing position and color 
          (>! render [(rand-int 10000) (rand-int 10)])))
      (recur (inc i)))))
       

Thank you!

@narkisr https://github.com/narkisr