Crunching Data with Apache Crunch – Part 1

Apache Crunch (incubating) is a Java library for writing, testing, and running MapReduce pipelines, based on Google's FlumeJava. Its goal is to make pipelines that are composed of many user-defined functions simple to write, easy to test, and efficient to run.

This multipart series takes a deep dive into the new upcoming tool. The first post is a Getting started, using the word count example provided with the release.

Pre-requisite
Apache Crunch 0.4 (it's currently being voted for release)
Hadoop 1.0.4

Hadoop can be used as standalone, Pseudo-Distributed or Fully Distributed mode.

We are going to use WordCount example shipped with Crunch release to get started, and further we shall explore detailed usage.

Word Count example - The input is set of text document and the program output is total number of occurrence for each word.

Lets see the sequence how would have we done it in raw Hadoop world.

  • Write a Mapper, which reads the line and split it into word
  • The Mapper would emit, word followed by a count (one)
  • In the Reducer, we would sum-up all the counts and emit word and total count
  • Set the Input/Output format and submit the job

Now, we move to Crunch and see how it provides a higher level abstraction to achieve the same.

Lets see the code (picked from wordcount example shipped with Crunch)

// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(args[0]);

// Define a function that splits each line in a PCollection of Strings into
// PCollection made up of the individual words in the file.
PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
    public void process(String line, Emitter<String> emitter) {
       for (String word : line.split("\s+")) {
           emitter.emit(word);
       }
    }
}, Writables.strings()); // Indicates the serialization format

// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
// Best of all, the count() function doesn't need to know anything about
// the kind of data stored in the input PCollection.
PTable<String, Long> counts = words.count();

// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, args[1]);
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();

Step 1: We create a MRPipeline at line# 2
Step 2: Set where to read the input the data from. This is done on Line# 5. (Similar to setting the Input Paths in Job Configuration). PCollection doesn't hold all the data in memory, it's a reference to the data
Step 3: Next we define how we want to process the data. This is like our Mapper function. Refer Line# 9-15. In the DoFn(), we split the line, and emit the word. The code is fairly similar to WordCount Mapper code. THe 2nd parameter to parallelDo() is the type of output emitted.
Step 4: Line# 21 we go for the count on the PCollection that we created. Internally it shall aggregate the results, and return a PTable with Word and its Count.
Step 5: We write the output to a file

Crunch shall internally convert this to MapReduce jobs and run it on the Hadoop Cluster.

Running the example

Instruction to run this example is given on Page here

Leave a Reply

Your email address will not be published. Required fields are marked *