Hi! Welcome...

Thread.currentThread().join() I am Ashish. I am Member of Apache MINA PMC, ASF Committer and avid Code hacker.

17 May 2012 ~ 0 Comments

Hadoop Recipe – Implementing Custom Partitioner


This recipe is about implementing custom Parititoner

A Partitioner in MapReduce world partitions the key space. The partitioner is used to derive the partition to which a key-value pair belongs. It is responsible for bring records with same key to same partition so that they can be processed together by a reducer.

To implement a Custom Partitioner,we need to extend the Partitioner class.
Let's look at the code for Partitioner class

public abstract class Partitioner<KEY, VALUE> {

  /**
   * Get the partition number for a given key (hence record) given the total
   * number of partitions i.e. number of reduce-tasks for the job.
   *
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * @param key the key to be partioned.
   * @param value the entry value.
   * @param numPartitions the total number of partitions.
   * @return the partition number for the <code>key</code>.
   */
  public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}

The default partitioner is HashPartitioner, which finds a partition based on hash of the key class

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() &amp;amp;amp;amp;amp;amp; Integer.MAX_VALUE) % numReduceTasks;
  }
}

Ok, now lets implement a custom partitioner. Assume that we have a Text key, and we want to use first char as deciding factor for the determining the partition.

public class FirstCharTextPartitioner<Text, Text> extends Partitioner<K, V> {

  public int getPartition(Text key, Text value,
                          int numReduceTasks) {
    return (key.toString().charAt(0)) % numReduceTasks;
  }
}

// Set this to the JobConf

The code takes first char from the key as a deciding factor for determining the partition. So based on this, all keys that begin with 'A' shall be processed by same reducer. The next step is to set our custom class this class to the Job and framework shall use our custom Partitioning logic.

Based on the needs we can choose our own way of partitioning the key space.

Before implementing Custom Partitioner, its best to check following Partitioner provided by Hadoop

  1. BinaryPartitioner
  2. HashPartitioner
  3. KeyFieldBasedPartitioner
  4. TotalOrderPartitioner

References


15 May 2012 ~ 0 Comments

Hadoop Recipe – Implementing Custom Writable


This Recipe is about implementing a custom Writable to be used in MapReduce code.

Hadoop provides a lot of implementations of Writables out-of-the-box which suffice to most of the cases. However, at time we need to implement custom Objects to be passed. They are the implementations of Hadoop's Writable interface. Let's see how to implement one.

Use Case:

We want to pass Request Information as a whole which consists of a request Id, request type and timestamp. We can use it as a key or just pass it as a value for a key.

NOTE: This is only a custom writable and does not implement WritableComparable, which we shall cover in a different post.

Let's see the code

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * A custom Writable implementation for Request information.
 *
 * This is simple Custom Writable, and does not implement Comparable or RawComparator
 */
public class RequestInfo implements Writable {

    // Request ID as a String
    private Text requestId;

    // Request Type
    private Text requestType;

    // request timestamp
    LongWritable timestamp;

    public RequestInfo() {
        this.requestId = new Text();
        this.requestType = new Text();
        this.timestamp = new LongWritable();
    }

    public RequestInfo(Text requestId, Text requestType, LongWritable timestamp) {
        this.requestId = requestId;
        this.requestType = requestType;
        this.timestamp = timestamp;
    }

    public RequestInfo(String requestId, String requestType, long timestamp) {
        this.requestId = new Text(requestId);
        this.requestType = new Text(requestType);
        this.timestamp = new LongWritable(timestamp);
    }

    public void write(DataOutput dataOutput) throws IOException {
        requestId.write(dataOutput);
        requestType.write(dataOutput);
        timestamp.write(dataOutput);
    }

    public void readFields(DataInput dataInput) throws IOException {
        requestId.readFields(dataInput);
        requestType.readFields(dataInput);
        timestamp.readFields(dataInput);
    }

    public Text getRequestId() {
        return requestId;
    }

    public Text getRequestType() {
        return requestType;
    }

    public LongWritable getTimestamp() {
        return timestamp;
    }

    public void setRequestId(Text requestId) {
        this.requestId = requestId;
    }

    public void setRequestType(Text requestType) {
        this.requestType = requestType;
    }

    public void setTimestamp(LongWritable timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public int hashCode() {
        // This is used by HashPartitioner, so implement it as per need
        // this one shall hash based on request id
        return requestId.hashCode();
    }
}

The code is fairly simple. We implement the Writable interface, and write the logic in readFields() and write() API. In write() API, we dump the current state on the Object and in readFields() we read it back.

Hadoop treats String's in different way, so decided to use Text class. For a more authoratative discussion on this topic, please refer Hadoop: The Definitive Guide, Chapter 4 - Hadoop I/O

Note about hashCode()

Current implementation uses requestId's hashCode(), but you should implement this API carefully, if you plan to use the Object as a Key in your MapReduce code, as this shall be used by default HashPartitioner to partition the keys.

Also, equals() method has been left for the users to implement :)

References:

Must have, if you want to know Hadoop in detail

11 May 2012 ~ 0 Comments

Hadoop Recipe – Using Custom Java Counters


Starting the Hadoop Recipe series, in which I shall pick up a topic and provide sample code around it. Each shall be small and concise, would provide ready to use hints on topics covered.

The post covers usage of Custom Counters in Java in Hadoop world. Counters are very helpful in MapReduce world. We can use them as a way of watching the progress or as a way of indirect debugging or validation as well. For example, you may want to have specific counter to know how many types of specific record were processed from the complete data, like how many request had a specific keyword in the apache access log. There can be many other similar Use Cases.

Hadoop provides some inbuilt counters that are always there like number of Map input records, number of bytes processed etc.

Lets see how we can use a custom Counter in Java code

Define the counter

public static enum COUNTERS {
    ERROR_COUNT,
    MISSING_FIELDS_RECORD_COUNT
}

Counter definition is very simple, we define an enum and all the Counters that we want to use.

Using the Counters

 @Override
 public void map(LongWritable key, Text value, Context context)
                 throws IOException, InterruptedException {
     // mapper code here

     // if error condition, increment the error counter
     if(error) {
         context.getCounter(COUNTERS.ERROR_COUNT).increment(1);
     }

     // if missing records conditions
     if(missingRecords) {
         context.getCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT).increment(1);
     }
}

Usage of Counters is again simple. For a given condition you can increment it. In the example, we are incrementing it by one, but you can increment it by higher values as well.


Viewing the Counter values

You can view the counter values in JobTracker UI or programatically as well. You can print all the Counters or a specific Counter as well. Following example shows how to print value of a specific Counter


// Code in the Job Driver Class
Counter errorCounter = job.getCounters().findCounter(COUNTERS.ERROR_COUNT);
System.out.println("Error Counter = "+errorCounter.getValue());


23 April 2012 ~ 0 Comments

Playing with JClouds transient Blobstore


JClouds BlobStore API provides a portable way of managing key-value providers like Amazon S3. This post is a getting started guide with the API. We shall explore a bit about the API and create a simple program to use the same.

Before we begin, lets get hold of some concepts
Service - It's refers to the provider where we host our key-value data, like Amazon S3

Containers - They are namespace for the data, where we store our Blobs. For example, in Amazon S3, Containers == buckets

Blob - This is the unstructured data that we store inside Containers

There are some more details about the API, but they are not covered here to keep things simple.

Lets briefly see the steps that we need use the API

  • We create a BlobStoreContext. In simple words, it's our handle to Service provider
  • We get BlobStore handle from BlobStoreContext
  • We create a Container
  • We add our data/Blobs to the Container

Lets look at the code in these steps.

The best way to play with JClouds API is clone the examples git-hub repo (https://github.com/jclouds/jclouds-examples) and modify the code to fit the needs, this is what I have done

NOTE: We shall be using JClouds in-memory blobstore for our playing around so that we don't have to pay for the usage charges of BlobStore providers

Step 1: Creating the BlobStoreContext

 String provider = "transient";
 String identity = "Unused";
 String credential = Optional.absent().toString();

 // Init
 BlobStoreContext context = ContextBuilder.newBuilder(provider)
                .credentials(identity, credential)
                .build(BlobStoreContext.class);

The code is simple enough. The transient provider represents the in-memory BlobStore provider, and the other options are specific to this transient provider. If we need to use S3, we replace these options.

Step 2: Get the handle to BlobStore

BlobStore blobStore = context.getBlobStore();

From the BobStoreContext, we get the handle to BlobStore, which we shall use to perform subsequent operations.

Step 3: Creating a Container

String containerName = "dummybase";
blobStore.createContainerInLocation(null, containerName);

This shall create a Container with name "dummybase"

Step 4: Adding Blob to Container

// Add Blob
Blob blob = blobStore.blobBuilder("test").payload("testdata").build();
blobStore.putBlob(containerName, blob);

// Add Blob
Blob blob2 = blobStore.blobBuilder("test1").payload("testdata1").build();
blobStore.putBlob(containerName, blob2);

We create two blobs, with String payload and add to the Container. We can use different API's to add payload, like using a File or an InputStream

That's it, we have added it the Blob to transient BlobStore provider

Let's list the contents

for (StorageMetadata resourceMd : blobStore.list()) {
    if (resourceMd.getType() == StorageType.CONTAINER || resourceMd.getType() == StorageType.FOLDER) {
        // Use Map API
        Map<String, InputStream> containerMap = context.createInputStreamMap(resourceMd.getName());
        System.out.printf("  %s: %s entries%n", resourceMd.getName(), containerMap.size());
     }
}


The complete code together

public static void main(String[] args) throws IOException {
    String provider = "transient";
    String identity = "Unused";
    String credential = Optional.absent().toString();
    String containerName = "dummybase";

    // Init
    BlobStoreContext context = ContextBuilder.newBuilder(provider)
                .credentials(identity, credential)
                .build(BlobStoreContext.class);

    try {
        // Create Container
        BlobStore blobStore = context.getBlobStore();
        blobStore.createContainerInLocation(null, containerName);

        // Add Blob
        Blob blob = blobStore.blobBuilder("test").payload("testdata").build();
        blobStore.putBlob(containerName, blob);

        // Add Blob
        Blob blob2 = blobStore.blobBuilder("test1").payload("testdata1").build();
        blobStore.putBlob(containerName, blob2);

        // List Container
        for (StorageMetadata resourceMd : blobStore.list()) {
            if (resourceMd.getType() == StorageType.CONTAINER || resourceMd.getType() == StorageType.FOLDER) {
                // Use Map API
                Map<String, InputStream> containerMap = context.createInputStreamMap(resourceMd.getName());
                System.out.printf("  %s: %s entries%n", resourceMd.getName(), containerMap.size());
             }
         }
   } finally {
    // Close connecton
    context.close();
    System.exit(0);
  }
}


17 April 2012 ~ 3 Comments

Using Hadoop Distributed Cache


Hadoop has a distributed cache mechanism to make available file locally that may be needed by Map/Reduce jobs. This post tried to expand a bit more on the information provided by the javadoc of DistributedCache

Use Case

Lets understand our Use Case a bit more in details so that we can follow-up the code snippets.
We have a Key-Value file that we need to use in our Map jobs. For simplicity, lets say we need to replace all keywords that we encounter during parsing, with some other value.

So what we need is

  • A key-values files (Lets use a Properties files)
  • The Mapper code that uses the code

Step 1

Place the key-values file on the HDFS

hadoop fs -put ./keyvalues.properties cache/keyvalues.properties

This path is relative to the user's home folder on HDFS

Step 2

Write the Mapper code that uses it

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text> {

    Properties cache;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());

        if(localCacheFiles != null) {
            // expecting only single file here
            for (int i = 0; i < localCacheFiles.length; i++) {
                Path localCacheFile = localCacheFiles[i];
                cache = new Properties();
                cache.load(new FileReader(localCacheFile.toString()));
            }
        } else {
            // do your error handling here
        }

    }

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // use the cache here
        // if value contains some attribute, cache.get(<value>)
        // do some action or replace with something else
    }

}

Mapper code is simple enough. During the setup phase, we read the file and populate the Properties object. And inside the map() we use the cache to lookup for certain keys and replace them, if they are present.

Step 3

Add the properties file to your driver code

JobConf jobConf = new JobConf();
// set job properties
// set the cache file
DistributedCache.addCacheFile(new URI("cache/keyvalues.properties#keyvalues.properties"), jobConf);