Crunching Data with Apache Crunch – Part 4

So far we have looked at Basic stuff regarding Crunch. In this post, lets look at Join feature of Crunch. Please refer other post in the series for some basic stuff Part 1, Part 2 and Part 3

Let's prepare some background on Data before we jump into code.

For the purpose of join, I shall take example from the world of online Ads. Basically, we shall have merge the info, which Ads got clicks or impressions. This has been cooked up for the post, real world application would have more complex data set. We have following data

1. Ads Returned - The file contains a Unique Ad Id and a timestamp (when Ad was returned). You can have more data like Publisher Id and other Ad Attributes
2. Impressions - This file contains only Ad Id's for which we received Impressions

Final out come would be

Ad Id and 1 if the Ad received an impression

The sample data was generated using the following program. Used UUID as unique Ad Id

public static void main(String[] args) throws Exception {
    File adsReturned = new File("adsReturned.txt");
    File impressions = new File("impressions.txt");

    BufferedWriter adsWriter = new BufferedWriter(new FileWriter(adsReturned));
    BufferedWriter impressionWriter = new BufferedWriter(new FileWriter(impressions));

    int count = 0;

    while(count++ < 1000) {
        UUID uuid = UUID.randomUUID();
        adsWriter.write(uuid.toString() + "," + System.currentTimeMillis());

Lets look at our crunch code for the Join. For simplicity, have removed the glue code, which is same as previous posts.

// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(FullJoinExample.class, getConf());

PCollection<String> adsReturned = pipeline.readTextFile(args[0]);
PCollection<String> impressions = pipeline.readTextFile(args[1]);

PTable<String, Long> adsReturnedTable = getAdsReturnedAsPTable(adsReturned);

PTable<String, Long> impressionTable = impressions.parallelDo(new DoFn<String, Pair<String, Long>>() {
    public void process(String input, Emitter<Pair<String, Long>> emitter) {
        emitter.emit(Pair.of(input, 1L));
}, impressions.getTypeFamily().tableOf(Writables.strings(), Writables.longs()));

PTable<String, Pair<Long, Long>> joinedData = Join.leftJoin(adsReturnedTable, impressionTable);

// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(joinedData, args[2]);
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();

return result.succeeded() ? 0 : 1;
protected PTable<String, Long> getAdsReturnedAsPTable(PCollection<String> adsReturnedFiles) {
    PTypeFamily tf = adsReturnedFiles.getTypeFamily();
        return adsReturnedFiles.parallelDo(new DoFn<String, Pair<String, Long>>() {
            public void process(String input, Emitter<Pair<String, Long>> emitter) {
                String[] parts = StringUtils.split(input, ',');
                emitter.emit(Pair.of(parts[0], Long.parseLong(parts[1])));
        }, tf.tableOf(tf.strings(), tf.longs()));

Lets dissect the code

  • We read both the files as PCollections
  • we then convert both as PTables. Since Impression's doesn't has any other data, we just add hardcoded 1, as specified in function getAdsReturnedAsPTable()
  • We join both the tables using Join.leftJoin() API, as on Line# 16
  • Run the pipeline, and we have the joined output

The output looks something like this

001dcff3-558e-4cda-926c-5bd6f0da7d4b    [1353062206451,1]

Crunch has simplified the join a lot. Next in the series we shall look at customizing the output.

Leave a Reply

Your email address will not be published. Required fields are marked *