[Kafka Cookbook] Simple Consumer

In the last post we saw Simple Producer, now let’s take a look at a Simple Consumer. Please refer Simple Producer for Pre-conditions.

Jumping to next step, lets quickly look at the code.

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Simple Consumer
 */
public class KafkaMessageConsumer {
  public static void main(String[] args) {
    Properties config = new Properties();
    config.put("zookeeper.connect", "localhost:2181");
    config.put("group.id", "default");
    config.put("partition.assignment.strategy", "roundrobin");
    config.put("bootstrap.servers", "localhost:9092");
    config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(config);

    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("testTopic", 1);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);

    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get("testTopic");

    KafkaStream<byte[], byte[]> stream = streamList.get(0);

    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    while(iterator.hasNext()) {
      System.out.println(new String(iterator.next().message()));
    }

  }

  public static void processRecords(Map<String, ConsumerRecords<String, String>> records) {
    List<ConsumerRecord<String, String>> messages = records.get("testTopic").records();
    if(messages != null) {
      for (ConsumerRecord<String, String> next : messages) {
        try {
          System.out.println(next.value());
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    } else {
      System.out.println("No messages");
    }
  }
}

Let’s look at the configuration options. We won’t dig deeper into them as of now

Here we have to provide additional information like
zookeeper.connect (Zookeeper info)
group.id – User specified Consumer group name, we just choose default as name
partition.assignment.strategy – How the partition assignment shall work

With the above configuration options, we create a ConsumerConfig. Before we can do anything, we need to create the message stream. Let’s see the logical flow

  • Create a ConsumerConnector
  • Create a message stream by passing map of topics of interest
  • The step above returns list of KafkaStreams for a topic
  • Using the list process the KafkaStream for incoming messages
  • In our example we just print the messages

This it it for now. The Consumer needs a deeper look, which we shall cover in subsequent sections.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.