Clojure for Big Data Processing

A functional take on Hadoop

Created by Ronen Narkis / @narkisr


  • Introducing Clojure
  • Clojure Hadoop
  • Clojure on EMR
  • Cascalog


A JVM Lisp dialect Developed in 2003 by Rich Hickey

Hello world

(println "hello")

Defining a function

(defn say [arg] (println arg))

(say "hello world")


[1 2 3]     ; vector
{:a 1 :b 2} ; hash
'(1 2 3)    ; list
#{1 2}      ; set

Basic Iteration

(doseq [i [1 2 3]] (println i))

; 1 2 3

High order functions

(filter #(> % 1) [1 2 3])

; 2 3 

Java interop

(.length (String. "hello"))

; 5

Clojure Hadoop

Java Word count

public static class Map extends Mapper {
  private final static IntWritable one = new IntWritable();
  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context) 
		throws IOException, InterruptedException {
   String line = value.toString();
   StringTokenizer tokenizer = new StringTokenizer(line);
   while (tokenizer.hasMoreTokens()) {
	context.write(word, one);
public static class Reduce extends Reducer {
  public void reduce(Text key, Iterable values, Context context) 
   throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
     sum += val.get();
   context.write(key, new IntWritable(sum));
(defn tokenize [s] (StringTokenizer. s))

(defn mapper-map [this ^Text key ^Text value ^MapContext context]
  (doseq [t (enumeration-seq (tokenize (str value)))]
    (.write context (doto k (.set t)) one))) 

(defn reducer-reduce [this key ints context]
  (.write context key (reduce + (map #(.get %) ints))))

3x reduction in code size!

(defjob job
  :map my-map
  :map-reader wrap/int-string-map-reader
  :reduce my-reduce
  :input-format :text
  :input "README.txt"
  :output "tmp/out4"
  :replace true)

Compile work flow

Amazon EMR

Launching jobs

(defcluster demo-cluster
  :num-instances 2
  :slave-instance-type "m1.small"
  :master-instance-type "m1.small"
  :ami-version "2.1.1"
  :hadoop-version "0.20.205" 
  :jar-src-path "/home/ronen/code/code-0.1.0-standalone.jar"
  :runtime-jar "s3://clj-demo/jar/code-0.1.0-standalone.jar"
  :bucket "clj-demo"
  :keypair "ec2-keypair")

(defstep demo-step
  :main-class "code.core"
  :args.input "s3n://clj-demo/tom-sawyer.txt"
  :args.output "s3n://clj-demo/${uuid8}")

(fire! demo-cluster demo-step)
[job start] [ec2 up] [running] [shutdown] [s3 result] [gedit]


(defcluster demo-cluster
 :ganglia {
  :num-instances 2 
  :bootstrap-action.1 "s3://elasticmapreduce/bootstrap-actions/install-ganglia"
  :keep-alive? true
 ; ...

Performance tunning

(ns demo
  (:use perforate.core code.core ))

(defgoal tokenization "Testing tokenization speed")

(defcase tokenization :small
  [] (tokenize "I love food!"))		
% lein perforate 
Benchmarking profiles:  nil
Goal:  Testing tokenization speed
Case:  :small
Evaluation count             : 25347480
  Execution time mean : 2.357080 us  95.0% CI: (2.356621 us, 2.357631 us)
  Execution time std-deviation : 44.229907 us  95.0% CI: (43.761714 us, 44.722213 us)
  Execution time lower ci : 2.290371 us  95.0% CI: (2.290371 us, 2.290371 us)
  Execution time upper ci : 2.524936 us  95.0% CI: (2.524936 us, 2.525048 us)

Found 5 outliers in 60 samples (8.3333 %)
  low-severe	 3 (5.0000 %)
  low-mild	 2 (3.3333 %)
 Variance from outliers : 15.7988 % Variance is moderately inflated by outliers


Declerative Hadoop Jobs


  • A Tap
  • Result variables
  • Predicates
  • Operates on tuples

Defining a query <-

(def people [["ben" 35] ["jerry" 41]])		

(<- [?name ?age]
   (people ?name ?age)
   (< ?age 40))

Query execution ?-

(?- (stdout)
  (<- [?name ?age]
    (people ?name ?age)
    (< ?age 40)))		

At one fell swoop

(?<- (stdout)
  [?name ?age]
  (people ?name ?age)
  (< ?age 40))	


Generators (tuple source)

(def generator-seq [["a" 1] 
                    ["b" 2]])

(?<- (stdout) [?a ?b] (generator-seq :> ?a ?b))
; ["a" 1]
; ["b" 2]

Operations (filter or manipulation)

(defmapcatop split [sentence]
  (seq (.split sentence "\\s+")))	

(?<- (stdout) [?word] (sentence ?s)
     (split ?s :> ?word))

Aggregators (sum, min, count)

(?<- (stdout) [?word ?count] (sentence ?s)
  (split ?s :> ?word) (c/count ?count))


  • Clojure is fun and productive
  • EMR+Clojure are fast and aglie
  • Be declarative with Cascalog
  • The ecosystem is there!
