19 June 2013 ~ 0 Comments

[Flume Cookbook] Implementing Custom Interceptors



Flumes ships with some inbuilt interceptors, which are sufficient for most of the real world use cases.
Following are the available interceptors

There are certain situations in which we would like to write a Custom interceptor.

To implementing an Interceptor, we need to take care of following aspects

  • Interceptor code
  • Configuration
  • Interceptor Builder

We shall implement a version of HostInterceptor that ships with Flume. We shall implement a HostTimeInterceptor which shall add HostIP and Timestamp when the event was processed on the Agent.

Interceptor Code

To implement an interceptor, we need to implement the Interceptor interface

To simplify batch event processing, have created a base class AbstractFlumeInterceptor

public abstract class AbstractFlumeInterceptor implements Interceptor {
    @Override
    public List<Event> intercept(List<Event> events) {
        for (Iterator<Event> iterator = events.iterator(); iterator.hasNext(); ) {
            Event next =  intercept(iterator.next());
            if(next == null) {
                // remove the element
                iterator.remove();
            }

        }
        return events;
    }
}


Now lets look at the actual Interceptor code

public class HostTimeInterceptor extends AbstractFlumeInterceptor {

    public static final Logger LOGGER = LoggerFactory.getLogger(HostTimeInterceptor.class);

    private final String header;
    private String host;
    public static final String DEFAULT_SEPARATOR = ",";
    public static final String DEFAULT_KEYVAL_SEPARATOR = ":";


    /**
     * Default constructor. Private constructor to avoid being build by outside source
     *
     * @param headerKey Key for the header to be used
     */
    private HostTimeInterceptor(String headerKey) {
        header = headerKey;
        try {
            host = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            LOGGER.warn("Unable to get Host address", e);
        }
    }

    @Override
    public void initialize() {
        // NOOP
    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();

        String headerValue = headers.get(header);
        if(headerValue != null) {
            headerValue += DEFAULT_SEPARATOR;
        } else {
            headerValue = "";
        }
        headerValue += host + DEFAULT_KEYVAL_SEPARATOR + System.currentTimeMillis();
        headers.put(header, headerValue);
        return event;
    }

    @Override
    public void close() {
        // NOOP
    }
}

The constructor is private so that it can be build only via the Builder interface. The code take the key as a property which would become key in the Flume Event header. In the constructor we shall get the host IP address. The same could have been done in initialize() API as well.

Now lets look at the intercept API. The Event is passed to us, we check for the presence of header element and append Host IP along with timestamp. Once Event is modified we return the Event.

If an Event is to be dropped in an Interceptor, just return null from the API.

Configuration

Let's look at the configuration that we need to do for this interceptor.

a1.sources.src1.interceptors = hostTimeInterceptor
a1.sources.src1.interceptors.hostTimeInterceptor.type=com.ashishpaliwal.flume.interceptors.HostTimeInterceptor$Builder
a1.sources.src1.interceptors.hostTimeInterceptor.key=HostTime

This snipper is specific to interceptor configuration. For complete configuration refer to single-node-custom-interceptor.properties

First line tells about the Interceptor chain. On the second line, we tell Flume how to initialize our Interceptor. We shall see the implementation in a moment. On the third line, we specify the Header key to be used, which shall be passed to our Interceptor while creation.

Interceptor Builder

Before we can use the Interceptor, we have to provide an implementation of Interceptor.Builder

Let's look at the code. This is nested class within the Interceptor.

public static class Builder implements Interceptor.Builder {
    private String headerkey = "HostTime";

    @Override
    public Interceptor build() {
        return new HostTimeInterceptor(headerkey);
    }

    @Override
    public void configure(Context context) {
        headerkey = context.getString("key");
    }
}

The properties that we provide in Agent configuration file are passed to configure(). We get the key value and pass it to Interceptor. Flume shall call the Builder implementation while creating the Interceptor chain.

That's pretty much it from the coding perspective.

Using the custom Interceptor

Create the jar and add it to Flume Agent's classpath. Add the configuration and start the agent :)


Leave a Reply