[Flume Cookbook] Implementing Custom Interceptors

Flumes ships with some inbuilt interceptors, which are sufficient for most of the real world use cases.
Following are the available interceptors

There are certain situations in which we would like to write a Custom interceptor.

To implementing an Interceptor, we need to take care of following aspects

  • Interceptor code
  • Configuration
  • Interceptor Builder

We shall implement a version of HostInterceptor that ships with Flume. We shall implement a HostTimeInterceptor which shall add HostIP and Timestamp when the event was processed on the Agent.

Interceptor Code

To implement an interceptor, we need to implement the Interceptor interface

To simplify batch event processing, have created a base class AbstractFlumeInterceptor

public abstract class AbstractFlumeInterceptor implements Interceptor {
    public List<Event> intercept(List<Event> events) {
        for (Iterator<Event> iterator = events.iterator(); iterator.hasNext(); ) {
            Event next =  intercept(iterator.next());
            if(next == null) {
                // remove the element

        return events;

Now lets look at the actual Interceptor code

public class HostTimeInterceptor extends AbstractFlumeInterceptor {

    public static final Logger LOGGER = LoggerFactory.getLogger(HostTimeInterceptor.class);

    private final String header;
    private String host;
    public static final String DEFAULT_SEPARATOR = ",";
    public static final String DEFAULT_KEYVAL_SEPARATOR = ":";

     * Default constructor. Private constructor to avoid being build by outside source
     * @param headerKey Key for the header to be used
    private HostTimeInterceptor(String headerKey) {
        header = headerKey;
        try {
            host = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            LOGGER.warn("Unable to get Host address", e);

    public void initialize() {
        // NOOP

    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();

        String headerValue = headers.get(header);
        if(headerValue != null) {
            headerValue += DEFAULT_SEPARATOR;
        } else {
            headerValue = "";
        headerValue += host + DEFAULT_KEYVAL_SEPARATOR + System.currentTimeMillis();
        headers.put(header, headerValue);
        return event;

    public void close() {
        // NOOP

The constructor is private so that it can be build only via the Builder interface. The code take the key as a property which would become key in the Flume Event header. In the constructor we shall get the host IP address. The same could have been done in initialize() API as well.

Now lets look at the intercept API. The Event is passed to us, we check for the presence of header element and append Host IP along with timestamp. Once Event is modified we return the Event.

If an Event is to be dropped in an Interceptor, just return null from the API.


Let's look at the configuration that we need to do for this interceptor.

a1.sources.src1.interceptors = hostTimeInterceptor

This snipper is specific to interceptor configuration. For complete configuration refer to single-node-custom-interceptor.properties

First line tells about the Interceptor chain. On the second line, we tell Flume how to initialize our Interceptor. We shall see the implementation in a moment. On the third line, we specify the Header key to be used, which shall be passed to our Interceptor while creation.

Interceptor Builder

Before we can use the Interceptor, we have to provide an implementation of Interceptor.Builder

Let's look at the code. This is nested class within the Interceptor.

public static class Builder implements Interceptor.Builder {
    private String headerkey = "HostTime";

    public Interceptor build() {
        return new HostTimeInterceptor(headerkey);

    public void configure(Context context) {
        headerkey = context.getString("key");

The properties that we provide in Agent configuration file are passed to configure(). We get the key value and pass it to Interceptor. Flume shall call the Builder implementation while creating the Interceptor chain.

That's pretty much it from the coding perspective.

Using the custom Interceptor

Create the jar and add it to Flume Agent's classpath. Add the configuration and start the agent :)

One thought on “[Flume Cookbook] Implementing Custom Interceptors

Leave a Reply

Your email address will not be published. Required fields are marked *