[Learning Spark with Examples] Inner Join

In the last post, we saw the famous Word Count example. Let's move ahead and look at join's in Spark. Before looking into the join lets look at the data we shall use for joining. The data is factious and kept simple. There are two inputs

  • Ad Input - CSV file with a unique Ad Served Id and Ad provider
  • Impressions - CSV file with UUID of Ad and publisher information

We shall see the inner join, which would result in data present in both the data sets only. Let's look at the code

The complete code can be found at InnerJoin.java

Let's read the input data set

// Read the source file
JavaRDD<String> adInput = sparkContext.textFile(args[0]);

// Now we have non-empty lines, lets split them into words
JavaPairRDD<String, String> adsRDD = adInput.mapToPair(new PairFunction<String, String, String>() {
  @Override
  public Tuple2<String, String> call(String s) {
    CSVReader csvReader = new CSVReader(new StringReader(s));
   // lets skip error handling here for simplicity
   try {
    String[] adDetails = csvReader.readNext();
    return new Tuple2<String, String>(adDetails[0], adDetails[1]);
   } catch (IOException e) {
    e.printStackTrace();
    // noop
   }
   // Need to explore more on error handling
   return new Tuple2<String, String>("-1", "1");
 }
});

Here we read the Ads file and use a CSV parser to split the record and emit a tuple with key as the UUID of the Ad.
We process the Impressions file in a similar manner. Here we use mapToPair() API for transformation.

Join's are supported on RDD in Apache Spark, no additional work needs to be done. Let's look at it

// Lets go for an inner join, to hold data only for Ads which received an impression
JavaPairRDD<String, Tuple2<String, String>> joinedData = adsRDD.join(impressionsRDD);

joinedData.saveAsTextFile("./output");

The output look something like this. The output contains Ads for which impressions were received.

(00832901-21a6-4888-b06b-1f43b9d1acac,(AdProvider1,Publisher1))
(9a1786e1-ab21-43e3-b4b2-4193f572acbc,(AdProvider1,Publisher1))
(aca88cd0-fe50-40eb-8bda-81965b377827,(AdProvider1,Publisher1))
(611cf585-a8cf-43e9-9914-c9d1dc30dab5,(AdProvider1,Publisher1))
(940c138a-88d3-4248-911a-7dbe6a074d9f,(AdProvider3,Publisher3))
(983bb5e5-6d5b-4489-85b3-00e1d62f6a3a,(AdProvider3,Publisher3))
(5de3ae82-d56a-4f70-8738-7e787172c018,(AdProvider1,Publisher1))
(f1b6c6f4-8221-443d-812e-de857b77b2f4,(AdProvider2,Publisher2))

One thought on “[Learning Spark with Examples] Inner Join

  1. This is excellent tutoria!!l. One question if i have three RDDs. If we take you example :

    JavaPairRDD<String, Tuple2> joinedData = adsRDD.join(impressionsRDD);

    I create a new RRD and Join this way:

    JavaPairRDD<String, Tuple2> third_join = adsRDD.join(MyimpressionsRDD);

    How would i join the JoinedData with third_join?

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.