Crunching Data with Apache Crunch – Part 5 – Inverted Index

In this post we look at creating Inverted Index using Crunch. Please refer other post in the series for some basic stuff

This example is an extension to Word Count example. There are various examples of creating Inverted Index using Hadoop on the net.
Here is the logical sequence we need
1. Split the sentences in words and output the word along with file name
2. Aggregate the Words and all file names, write the index

Lets see the code (eliminating glu-code for simplicity)

protected PTable<String, String> getDocumentIndex(PCollection<String> forIndexing) {
    PTypeFamily tf = forIndexing.getTypeFamily();
    return forIndexing.parallelDo(new DoFn<String, Pair<String, String>>() {
        String fileName = null;
        @Override
        public void initialize() {
            super.initialize();
            if(getContext() instanceof MapContext) {
                InputSplit inputSplit = ((MapContext)getContext()).getInputSplit();
                    if(inputSplit instanceof FileSplit) {
                        fileName = ((FileSplit)inputSplit).getPath().getName();
                    }
            }
        }

        @Override
        public void process(String input, Emitter<Pair<String, String>> emitter) {
            for (String word : input.split("\s+")) {
                if(fileName != null &amp;amp;amp;amp;amp;&amp;amp;amp;amp;amp; !Strings.isNullOrEmpty(word)) {
                    emitter.emit(Pair.of(word, fileName));
                }
            }
        }
    }, tf.tableOf(tf.strings(), tf.strings()));
}

Here we get the File name from the InputSplit during initialization. We do it in init part as for each Input split is processed by a Mapper and we can get this in one. The process() is simple. We split the word and emit a Pair of word and file name

Lets see the remaining code

Pipeline pipeline = new MRPipeline(InvertedIndexExample.class, getConf());
PCollection<String> docToBeIndexed = pipeline.readTextFile(args[0]);

PTable<String, String> index = getDocumentIndex(docToBeIndexed);

PTable<String, String> groupedTable = index.groupByKey().combineValues(new CombineFn<String, String>(){
    @Override
    public void process(Pair<String, Iterable<String>> input, Emitter<Pair<String, String>> emitter) {
        Set<String> fileNames = new HashSet<String>();

        for(String fileName: input.second()) {
            fileNames.add(fileName);
        }

        Joiner joiner = Joiner.on(",");
        String files = joiner.join(fileNames.iterator());

        Pair<String, String> pair = Pair.of(input.first(), files);
        emitter.emit(pair);
    }
});

pipeline.writeTextFile(groupedTable, args[1]);
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();

return result.succeeded() ? 0 : 1;

Here we first group the words by key, so that we get all the words and files together. It's very similar to Reducer code. Here we use Combiner function to combine all the file names. After joining all the file names, we write the output to a file.

Leave a Reply

Your email address will not be published. Required fields are marked *