20 June 2013 ~ 0 Comments

[Flume Cookbook] Implementing Custom Source



Flume ships with a lot of bundled Sources. However, there are situations in which we may need to implement a custom Source. Lets look at a sample implementation.

The Source that we shall implement shall be based on Distributed Queue. Application's shall push events to Queue and our custom Source shall read events from Queue and pass on Flume flow. We shall use Hazelcast distributed Queue to achieve the same. However, the same can be achieved using Terracotta Toolkit as well. Let's see a Schematic.

Hazelcast Queue Source

Hazelcast Queue Source

Data Flow

  • Hazelcast Distributed Queue is created
  • Application creates HazelcastClient and connects to Server
  • Application puts data into the Queue
  • Our Custom Flume source pulls data out of Queue and writes to channel for further processing through Flume's flow

Implementation

The complete source is available at HazelcastQueueSource.java

Let's quickly see the code

public class HazelcastQueueSource extends AbstractSource implements Configurable, PollableSource {

    public static final Logger LOGGER = LoggerFactory.getLogger(HazelcastQueueSource.class);

    // for simplicity use only string message to start with
    private BlockingQueue<String> distributedQueue;

    // Hazelcast client
    private HazelcastClient hazelcastClient;

    // Properties for Hazelcast
    private String queueName;
    private String serverIP;
    private String userName;
    private String userPwd;

    @Override
    public void configure(Context context) {
        // Get Hazelcast properties here
        queueName = context.getString("queueName");
        serverIP = context.getString("servers");
        userName = context.getString("user");
        userPwd = context.getString("password");
    }

    @Override
    public synchronized void start() {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getGroupConfig().setName(userName).setPassword(userPwd);
        clientConfig.addAddress(serverIP);
        hazelcastClient = HazelcastClient.newHazelcastClient(clientConfig);
        distributedQueue = hazelcastClient.getQueue(queueName);
    }



    @Override
    public synchronized void stop() {
        hazelcastClient.shutdown();
    }

    @Override
    public Status process() throws EventDeliveryException {
        Status status = Status.READY;

        try {
            String msg = distributedQueue.poll(1000, TimeUnit.MILLISECONDS);
            // not using charset here.
            if(msg == null) {
                return Status.BACKOFF;
            }
            Event event = EventBuilder.withBody(msg.getBytes());
            getChannelProcessor().processEvent(event);
        } catch (InterruptedException e) {
            LOGGER.error("", e);
            status = Status.BACKOFF;
        }

        return status;

    }
}

Configuration

We shall need some configuration like Hazelcast connection details and Queue name. The configuration parameters are defined in Agent config file and are retrieved via Context (as shown in configure()).

Startup

In the start(), we use Hazelcast credentials to acquire a connection to Hazelcast Server and retrieve the Queue.

Shutdown

In shutdowm(), we call shutdown on Hazelcast Client reference. This just disconnects the Source from Server. The data still remains available.

Process

The logic of taking the message from Queue and creating an Event occurs in process(). We do a timed poll on Queue and if an message is available, we create an Event out of it and send it to the channel for processing.

NOTE: The custom source implementation needs further refinement to be production ready.

Make a build, and copy Source jar along with Hazelcast dependencies into Flume's lib folder.

Configuring the Source

Following is the configuration for the custom Source

1.sources.src1.type=com.ashishpaliwal.flume.source.HazelcastQueueSource
a1.sources.src1.queueName=app-flume-src
a1.sources.src1.servers=127.0.0.1
a1.sources.src1.user=dev
a1.sources.src1.password=dev-pass

The complete configuration file can be downloaded from single-node-custom-source.properties

Happy Fluming !


Tags:

Leave a Reply