Clojure for Big Data Processing

A functional take on Hadoop

Created by Ronen Narkis / @narkisr

Agenda

  • Introducing Clojure
  • Clojure Hadoop
  • Clojure on EMR
  • Cascalog

Clojure

A JVM Lisp dialect Developed in 2003 by Rich Hickey

Hello world

 
(println "hello")
		

Defining a function

 
(defn say [arg] (println arg))

(say "hello world")
		

Datastructures

 
[1 2 3]     ; vector
{:a 1 :b 2} ; hash
'(1 2 3)    ; list
#{1 2}      ; set
		

Basic Iteration


(doseq [i [1 2 3]] (println i))

; 1 2 3
		

High order functions

 
(filter #(> % 1) [1 2 3])

; 2 3 
		

Java interop


(.length (String. "hello"))

; 5
		

Clojure Hadoop

Java Word count

public static class Map extends Mapper {
  private final static IntWritable one = new IntWritable();
  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context) 
		throws IOException, InterruptedException {
   String line = value.toString();
   StringTokenizer tokenizer = new StringTokenizer(line);
   while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
	context.write(word, one);
    }
   }
} 
		
public static class Reduce extends Reducer {
  public void reduce(Text key, Iterable values, Context context) 
   throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
     sum += val.get();
   }
   context.write(key, new IntWritable(sum));
 }
}     
		
(defn tokenize [s] (StringTokenizer. s))

(defn mapper-map [this ^Text key ^Text value ^MapContext context]
  (doseq [t (enumeration-seq (tokenize (str value)))]
    (.write context (doto k (.set t)) one))) 

(defn reducer-reduce [this key ints context]
  (.write context key (reduce + (map #(.get %) ints))))
		

3x reduction in code size!

 
(imp/import-conf)
(imp/import-io)
(imp/import-fs)
(imp/import-mapreduce)
(imp/import-mapreduce-lib)
		
 
(defjob job
  :map my-map
  :map-reader wrap/int-string-map-reader
  :reduce my-reduce
  :input-format :text
  :input "README.txt"
  :output "tmp/out4"
  :replace true)
		

Compile work flow

Amazon EMR

Launching jobs

(defcluster demo-cluster
  :num-instances 2
  :slave-instance-type "m1.small"
  :master-instance-type "m1.small"
  :ami-version "2.1.1"
  :hadoop-version "0.20.205" 
  :jar-src-path "/home/ronen/code/code-0.1.0-standalone.jar"
  :runtime-jar "s3://clj-demo/jar/code-0.1.0-standalone.jar"
  :bucket "clj-demo"
  :keypair "ec2-keypair")

(defstep demo-step
  :main-class "code.core"
  :args.input "s3n://clj-demo/tom-sawyer.txt"
  :args.output "s3n://clj-demo/${uuid8}")

(fire! demo-cluster demo-step)
		
[job start] [ec2 up] [running] [shutdown] [s3 result] [gedit]

Monitoring


(defcluster demo-cluster
 :ganglia {
  :num-instances 2 
  :bootstrap-action.1 "s3://elasticmapreduce/bootstrap-actions/install-ganglia"
  :keep-alive? true
 }
 ; ...
) 

Performance tunning

(ns demo
  (:use perforate.core code.core ))

(defgoal tokenization "Testing tokenization speed")

(defcase tokenization :small
  [] (tokenize "I love food!"))		
 
% lein perforate 
Benchmarking profiles:  nil
======================
Goal:  Testing tokenization speed
-----
Case:  :small
Evaluation count             : 25347480
  Execution time mean : 2.357080 us  95.0% CI: (2.356621 us, 2.357631 us)
  Execution time std-deviation : 44.229907 us  95.0% CI: (43.761714 us, 44.722213 us)
  Execution time lower ci : 2.290371 us  95.0% CI: (2.290371 us, 2.290371 us)
  Execution time upper ci : 2.524936 us  95.0% CI: (2.524936 us, 2.525048 us)

Found 5 outliers in 60 samples (8.3333 %)
  low-severe	 3 (5.0000 %)
  low-mild	 2 (3.3333 %)
 Variance from outliers : 15.7988 % Variance is moderately inflated by outliers

Cascalog

Declerative Hadoop Jobs

Query

  • A Tap
  • Result variables
  • Predicates
  • Operates on tuples

Defining a query <-

 
(def people [["ben" 35] ["jerry" 41]])		

(<- [?name ?age]
   (people ?name ?age)
   (< ?age 40))
		

Query execution ?-

 
(?- (stdout)
  (<- [?name ?age]
    (people ?name ?age)
    (< ?age 40)))		

At one fell swoop

 
(?<- (stdout)
  [?name ?age]
  (people ?name ?age)
  (< ?age 40))	
		

Predicates

Generators (tuple source)

 
(def generator-seq [["a" 1] 
                    ["b" 2]])

(?<- (stdout) [?a ?b] (generator-seq :> ?a ?b))
; ["a" 1]
; ["b" 2]
		

Operations (filter or manipulation)


(defmapcatop split [sentence]
  (seq (.split sentence "\\s+")))	

(?<- (stdout) [?word] (sentence ?s)
     (split ?s :> ?word))
		

Aggregators (sum, min, count)

 
(?<- (stdout) [?word ?count] (sentence ?s)
  (split ?s :> ?word) (c/count ?count))
		

Summary

  • Clojure is fun and productive
  • EMR+Clojure are fast and aglie
  • Be declarative with Cascalog
  • The ecosystem is there!

Me