[Learning Spark with Examples] Line Count With Filtering

In the last we saw the Line Count example, now lets add filtering to the example, to filter out empty lines.

The code can be found here LineCountWithFiltering.java

Lets look at the code

public class LineCountWithFiltering {
  public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("File Copy");
    JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

    // Read the source file
    JavaRDD<String> input = sparkContext.textFile(args[0]);

    // RDD is immutable, let's create a new RDD which doesn't contain empty lines
    // the function needs to return true for the records to be kept
    JavaRDD<String> nonEmptyLines = input.filter(new Function<String, Boolean>() {
      @Override
      public Boolean call(String s) throws Exception {
        if(s == null || s.trim().length() < 1) {
          return false;
        }
        return true;
      }
    });

    long count = nonEmptyLines.count();

    System.out.println(String.format("Total lines in %s is %d",args[0],count));
  }
}

The code is same as Line count example. The additional part is the filter API call which takes a function that returns Boolean. The function has to return true for the records that need to be kept, else the records are dropped.

Let's compile the program

$mvn clean package

Once the build is successful, run the program as follows (run from where pom.xml is present)

$~/cots/spark-1.2.0-bin-hadoop2.4/bin/spark-submit --class org.learningspark.simple.LineCountWithFiltering --master local[1] target/learningspark-1.0-SNAPSHOT.jar /Users/ashishpaliwal/open-spource/flume/trunk/CHANGELOG 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.