Quick Start with AsyncHBase

Recently while fixing a JIRA in Apache Flume, I came across AsyncHBase library. It's quite a bit of fun to work with it. After fixing the issue, spend some time with it writing directly to HBase, and the blog is a summary of it.

For the blog, we shall divide section into initialization, a put request and a get request.

To use AsyncHBase via maven, need to add following in pom.xml

<dependency>
  <groupId>org.hbase</groupId>
  <artifactId>asynchbase</artifactId>
  <version>1.4.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase</artifactId>
  <version>0.92.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.0.1</version>
</dependency>

Let start on the code.

A little of skeleton code, just some variables.

public class AsyncHBaseQuickStart {
  public static final String DEFAULT_ZK_DIR = "/hbase";

  // HBase Client
  private HBaseClient hBaseClient;
  private String tableName;
  private String columnFamily;
  private String zkQuorum;

  public AsyncHBaseQuickStart(String tableName, String columnFamily, String zkQuorum) {
    this.tableName = tableName;
    this.columnFamily = columnFamily;
    this.zkQuorum = zkQuorum;
  }
}

Now, lets look at initialization code, where we connect to HBase

public void init() throws Exception {
  if(zkQuorum == null || zkQuorum.isEmpty()) {
    // Follow the default path
    Configuration conf = HBaseConfiguration.create();
    zkQuorum = ZKConfig.getZKQuorumServersString(conf);
  }
  hBaseClient = new HBaseClient(zkQuorum, DEFAULT_ZK_DIR, Executors.newCachedThreadPool());

  // Lets ensure that Table and Cf exits
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicBoolean fail = new AtomicBoolean(false);
  hBaseClient.ensureTableFamilyExists(tableName, columnFamily).addCallbacks(
            new Callback<Object, Object>() {
              @Override
              public Object call(Object arg) throws Exception {
                latch.countDown();
                return null;
              }
            },
            new Callback<Object, Object>() {
              @Override
              public Object call(Object arg) throws Exception {
                fail.set(true);
                latch.countDown();
                return null;
              }
            }
    );

    try {
      latch.await();
    } catch (InterruptedException e) {
      throw new Exception("Interrupted", e);
    }

    if(fail.get()) {
      throw new Exception("Table or Column Family doesn't exist");
    }
  }

Picked up this recipe from Apache Flume. We create an instance of HBaseClient, passing required parameter. I have coded this example as a playground, for production code, you would need to make things more configuration and add error handling. Next we verify that the Table name and Column Family specified exists. We can add Callbacks to the API calls, the first one is for Success scenario and 2nd one is for Exception case. HBaseClient would call our callbacks as soon as the API call completes. We use CountDownLatch to wait for operation to complete. There are other ways of achieving the same functionality.

Let's look at the put code

public void putData(byte[] rowKey, String data) throws Exception {
    PutRequest putRequest = new PutRequest(tableName.getBytes(Charsets.UTF_8), rowKey,
                                            columnFamily.getBytes(Charsets.UTF_8), "payload".getBytes(Charsets.UTF_8),
                                            data.getBytes(Charsets.UTF_8));
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicBoolean fail = new AtomicBoolean(false);
    hBaseClient.put(putRequest).addCallbacks(
            new Callback<Object, Object>() {
              @Override
              public Object call(Object arg) throws Exception {
                latch.countDown();
                return null;
              }
            },
            new Callback<Object, Exception>() {
              @Override
              public Object call(Exception arg) throws Exception {
                fail.set(true);
                latch.countDown();
                return null;
              }
            }
    );
    try {
      latch.await();
    } catch (InterruptedException e) {
      throw new Exception("Interrupted", e);
    }

    if(fail.get()) {
      throw new Exception("put request failed");
    }
  }

The code is very similar to initialization code. We utilize the same code to wait for the operation to complete. We need to specify the parameters for the PutRequest.

Lets look at the get API.

public byte[] getData(byte[] rowKey) throws Exception {
  GetRequest getRequest = new GetRequest(tableName, rowKey);
  ArrayList<KeyValue> kvs = hBaseClient.get(getRequest).join();
  return kvs.get(0).value();
}

To get data from HBase, we create an instance of GetRequest and pass table name and Row Key. The get() shows one more way of waiting for result via join(). Once the result is available, we return the value.

Link to code

References

Deferred details

One thought on “Quick Start with AsyncHBase

  1. Dear ashish,

    I have worked with asynhbase but opening and closing de connection every time I use.
    I’m thinking to make something like your solution, but I have the doubt if it’s correct to maintain the connection always oppened. Did you have any issue using HBaseClient hBaseClient as an attribute and maintaining it oppened for a long time?

    Thanks! great blog,
    Jon Ander.

Leave a Reply

Your email address will not be published. Required fields are marked *