[Flume Cookbook] Implementing Custom Source

Flume ships with a lot of bundled Sources. However, there are situations in which we may need to implement a custom Source. Lets look at a sample implementation.

The Source that we shall implement shall be based on Distributed Queue. Application's shall push events to Queue and our custom Source shall read events from Queue and pass on Flume flow. We shall use Hazelcast distributed Queue to achieve the same. However, the same can be achieved using Terracotta Toolkit as well. Let's see a Schematic.

Hazelcast Queue Source

Hazelcast Queue Source

Data Flow

  • Hazelcast Distributed Queue is created
  • Application creates HazelcastClient and connects to Server
  • Application puts data into the Queue
  • Our Custom Flume source pulls data out of Queue and writes to channel for further processing through Flume's flow


The complete source is available at HazelcastQueueSource.java

Let's quickly see the code

public class HazelcastQueueSource extends AbstractSource implements Configurable, PollableSource {

    public static final Logger LOGGER = LoggerFactory.getLogger(HazelcastQueueSource.class);

    // for simplicity use only string message to start with
    private BlockingQueue<String> distributedQueue;

    // Hazelcast client
    private HazelcastClient hazelcastClient;

    // Properties for Hazelcast
    private String queueName;
    private String serverIP;
    private String userName;
    private String userPwd;

    public void configure(Context context) {
        // Get Hazelcast properties here
        queueName = context.getString("queueName");
        serverIP = context.getString("servers");
        userName = context.getString("user");
        userPwd = context.getString("password");

    public synchronized void start() {
        ClientConfig clientConfig = new ClientConfig();
        hazelcastClient = HazelcastClient.newHazelcastClient(clientConfig);
        distributedQueue = hazelcastClient.getQueue(queueName);

    public synchronized void stop() {

    public Status process() throws EventDeliveryException {
        Status status = Status.READY;

        try {
            String msg = distributedQueue.poll(1000, TimeUnit.MILLISECONDS);
            // not using charset here.
            if(msg == null) {
                return Status.BACKOFF;
            Event event = EventBuilder.withBody(msg.getBytes());
        } catch (InterruptedException e) {
            LOGGER.error("", e);
            status = Status.BACKOFF;

        return status;



We shall need some configuration like Hazelcast connection details and Queue name. The configuration parameters are defined in Agent config file and are retrieved via Context (as shown in configure()).


In the start(), we use Hazelcast credentials to acquire a connection to Hazelcast Server and retrieve the Queue.


In shutdowm(), we call shutdown on Hazelcast Client reference. This just disconnects the Source from Server. The data still remains available.


The logic of taking the message from Queue and creating an Event occurs in process(). We do a timed poll on Queue and if an message is available, we create an Event out of it and send it to the channel for processing.

NOTE: The custom source implementation needs further refinement to be production ready.

Make a build, and copy Source jar along with Hazelcast dependencies into Flume's lib folder.

Configuring the Source

Following is the configuration for the custom Source


The complete configuration file can be downloaded from single-node-custom-source.properties

Happy Fluming !

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>