[Learning Apache Spark with Examples] Simple Aggregation

In the last we saw Left Outer join. Let's look at a simple aggregation. Enhancing our Ad example, we would like to see how many Ads from a particular Ad provider did we served. This is a simple scenario of aggregation. We have already seen simple aggregation as part of Word Count example.

The code can be found at SimpleAggregation.java

Let's look at the code

// Now we have non-empty lines, lets split them into words
JavaPairRDD<String, Integer> adsRDD = adInput.mapToPair(new PairFunction<String, String, Integer>() {
  @Override
  public Tuple2<String, Integer> call(String s) {
    CSVReader csvReader = new CSVReader(new StringReader(s));
   // lets skip error handling here for simplicity
   try {
     String[] adDetails = csvReader.readNext();
     return new Tuple2<String, Integer>(adDetails[1], 1);
   } catch (IOException e) {
     e.printStackTrace();
     // noop
   }
  // Need to explore more on error handling
  return new Tuple2<String, Integer>("-1", 1);
  }
});

JavaPairRDD<String, Integer> adsAggregated = adsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
  @Override
  public Integer call(Integer integer, Integer integer2) throws Exception {
    return integer + integer2;
  }
});

adsAggregated.saveAsTextFile("./output/ads-aggregated-provider");

The code has minor changes from the join example. Here we make the Ad Provider as key and emit 1 as value, very similar to word count example. To generate the aggregates, we simply call reduceByKey API.

Compile and run

$mvn clean package
$~/cots/spark-1.2.0-bin-hadoop2.4/bin/spark-submit --class org.learningspark.simple.SimpleAggregation --master local[1] target/learningspark-1.0-SNAPSHOT.jar ./src/main/resources/ads.csv

For the sample data, we get an output like

(AdProvider3,2)
(AdProvider4,1)
(AdProvider1,5)
(AdProvider5,1)
(AdProvider2,1)

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.