Using Hadoop Distributed Cache

Hadoop has a distributed cache mechanism to make available file locally that may be needed by Map/Reduce jobs. This post tried to expand a bit more on the information provided by the javadoc of DistributedCache

Use Case

Lets understand our Use Case a bit more in details so that we can follow-up the code snippets.
We have a Key-Value file that we need to use in our Map jobs. For simplicity, lets say we need to replace all keywords that we encounter during parsing, with some other value.

So what we need is

  • A key-values files (Lets use a Properties files)
  • The Mapper code that uses the code

Step 1

Place the key-values file on the HDFS

hadoop fs -put ./keyvalues.properties cache/keyvalues.properties

This path is relative to the user's home folder on HDFS

Step 2

Write the Mapper code that uses it

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text> {

    Properties cache;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());

        if(localCacheFiles != null) {
            // expecting only single file here
            for (int i = 0; i < localCacheFiles.length; i++) {
                Path localCacheFile = localCacheFiles[i];
                cache = new Properties();
                cache.load(new FileReader(localCacheFile.toString()));
            }
        } else {
            // do your error handling here
        }

    }

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // use the cache here
        // if value contains some attribute, cache.get(<value>)
        // do some action or replace with something else
    }

}

Mapper code is simple enough. During the setup phase, we read the file and populate the Properties object. And inside the map() we use the cache to lookup for certain keys and replace them, if they are present.

Step 3

Add the properties file to your driver code

JobConf jobConf = new JobConf();
// set job properties
// set the cache file
DistributedCache.addCacheFile(new URI("cache/keyvalues.properties#keyvalues.properties"), jobConf);

References

4 thoughts on “Using Hadoop Distributed Cache

  1. I’m using hadoop 0.22, and I’m using this code to add a jar to the DistributedCache:

    Job job = Job.getInstance(configuration);

    job.addArchiveToClassPath(new Path(JAR_DIR));

    but don’t work, I have a ClassNotFoundException in the map class when I call the external jar.

    I have to use DistributedCache instead of addArchiveToClassPath?

    • If its a 3rd part jar that you use in your Map job, then this is how I use it with 0.20 (I use Cloudera distro)

      ((JobConf)job.getConfiguration()).setJar(“my_third_party.jar”);

      You can add as many jars as you need, it shall be internally distributed to all Task nodes.

      HTH!

  2. The driver class in my program uses Job instead of JobConf.

    Configuration conf = new Configuration();
    Job job1 = new Job(conf, “distributed cache”);
    // set job1 properties
    DistributedCache.addCacheFile(new URI(“File path”), conf);

    But it does not seem to work. In the setup method of the Mapper code,
    DistributedCache.getLocalCacheFiles(context.getConfiguration());
    gives null as the value.

    Please help how to write the distributed cache code using Configuration and Job.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.