15 May 2012 ~ 0 Comments

Hadoop Recipe – Implementing Custom Writable



This Recipe is about implementing a custom Writable to be used in MapReduce code.

Hadoop provides a lot of implementations of Writables out-of-the-box which suffice to most of the cases. However, at time we need to implement custom Objects to be passed. They are the implementations of Hadoop's Writable interface. Let's see how to implement one.

Use Case:

We want to pass Request Information as a whole which consists of a request Id, request type and timestamp. We can use it as a key or just pass it as a value for a key.

NOTE: This is only a custom writable and does not implement WritableComparable, which we shall cover in a different post.

Let's see the code

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * A custom Writable implementation for Request information.
 *
 * This is simple Custom Writable, and does not implement Comparable or RawComparator
 */
public class RequestInfo implements Writable {

    // Request ID as a String
    private Text requestId;

    // Request Type
    private Text requestType;

    // request timestamp
    LongWritable timestamp;

    public RequestInfo() {
        this.requestId = new Text();
        this.requestType = new Text();
        this.timestamp = new LongWritable();
    }

    public RequestInfo(Text requestId, Text requestType, LongWritable timestamp) {
        this.requestId = requestId;
        this.requestType = requestType;
        this.timestamp = timestamp;
    }

    public RequestInfo(String requestId, String requestType, long timestamp) {
        this.requestId = new Text(requestId);
        this.requestType = new Text(requestType);
        this.timestamp = new LongWritable(timestamp);
    }

    public void write(DataOutput dataOutput) throws IOException {
        requestId.write(dataOutput);
        requestType.write(dataOutput);
        timestamp.write(dataOutput);
    }

    public void readFields(DataInput dataInput) throws IOException {
        requestId.readFields(dataInput);
        requestType.readFields(dataInput);
        timestamp.readFields(dataInput);
    }

    public Text getRequestId() {
        return requestId;
    }

    public Text getRequestType() {
        return requestType;
    }

    public LongWritable getTimestamp() {
        return timestamp;
    }

    public void setRequestId(Text requestId) {
        this.requestId = requestId;
    }

    public void setRequestType(Text requestType) {
        this.requestType = requestType;
    }

    public void setTimestamp(LongWritable timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public int hashCode() {
        // This is used by HashPartitioner, so implement it as per need
        // this one shall hash based on request id
        return requestId.hashCode();
    }
}


The code is fairly simple. We implement the Writable interface, and write the logic in readFields() and write() API. In write() API, we dump the current state on the Object and in readFields() we read it back.

Hadoop treats String's in different way, so decided to use Text class. For a more authoratative discussion on this topic, please refer Hadoop: The Definitive Guide, Chapter 4 - Hadoop I/O

Note about hashCode()

Current implementation uses requestId's hashCode(), but you should implement this API carefully, if you plan to use the Object as a Key in your MapReduce code, as this shall be used by default HashPartitioner to partition the keys.

Also, equals() method has been left for the users to implement.


References:

Must have, if you want to know Hadoop in detail

Leave a Reply