Flumes ships with some inbuilt interceptors, which are sufficient for most of the real world use cases.
Following are the available interceptors
There are certain situations in which we would like to write a Custom interceptor.
To implementing an Interceptor, we need to take care of following aspects
- Interceptor code
- Configuration
- Interceptor Builder
We shall implement a version of HostInterceptor that ships with Flume. We shall implement a HostTimeInterceptor which shall add HostIP and Timestamp when the event was processed on the Agent.
Interceptor Code
To implement an interceptor, we need to implement the Interceptor interface
To simplify batch event processing, have created a base class AbstractFlumeInterceptor
public abstract class AbstractFlumeInterceptor implements Interceptor {
@Override
public List<Event> intercept(List<Event> events) {
for (Iterator<Event> iterator = events.iterator(); iterator.hasNext(); ) {
Event next = intercept(iterator.next());
if(next == null) {
// remove the element
iterator.remove();
}
}
return events;
}
}
Now lets look at the actual Interceptor code
public class HostTimeInterceptor extends AbstractFlumeInterceptor {
public static final Logger LOGGER = LoggerFactory.getLogger(HostTimeInterceptor.class);
private final String header;
private String host;
public static final String DEFAULT_SEPARATOR = ",";
public static final String DEFAULT_KEYVAL_SEPARATOR = ":";
/**
* Default constructor. Private constructor to avoid being build by outside source
*
* @param headerKey Key for the header to be used
*/
private HostTimeInterceptor(String headerKey) {
header = headerKey;
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
LOGGER.warn("Unable to get Host address", e);
}
}
@Override
public void initialize() {
// NOOP
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String headerValue = headers.get(header);
if(headerValue != null) {
headerValue += DEFAULT_SEPARATOR;
} else {
headerValue = "";
}
headerValue += host + DEFAULT_KEYVAL_SEPARATOR + System.currentTimeMillis();
headers.put(header, headerValue);
return event;
}
@Override
public void close() {
// NOOP
}
}
The constructor is private so that it can be build only via the Builder interface. The code take the key as a property which would become key in the Flume Event header. In the constructor we shall get the host IP address. The same could have been done in initialize() API as well.
Now lets look at the intercept API. The Event is passed to us, we check for the presence of header element and append Host IP along with timestamp. Once Event is modified we return the Event.
If an Event is to be dropped in an Interceptor, just return null from the API.
Configuration
Let's look at the configuration that we need to do for this interceptor.
a1.sources.src1.interceptors = hostTimeInterceptor
a1.sources.src1.interceptors.hostTimeInterceptor.type=com.ashishpaliwal.flume.interceptors.HostTimeInterceptor$Builder
a1.sources.src1.interceptors.hostTimeInterceptor.key=HostTime
This snipper is specific to interceptor configuration. For complete configuration refer to single-node-custom-interceptor.properties
First line tells about the Interceptor chain. On the second line, we tell Flume how to initialize our Interceptor. We shall see the implementation in a moment. On the third line, we specify the Header key to be used, which shall be passed to our Interceptor while creation.
Interceptor Builder
Before we can use the Interceptor, we have to provide an implementation of Interceptor.Builder
Let's look at the code. This is nested class within the Interceptor.
public static class Builder implements Interceptor.Builder {
private String headerkey = "HostTime";
@Override
public Interceptor build() {
return new HostTimeInterceptor(headerkey);
}
@Override
public void configure(Context context) {
headerkey = context.getString("key");
}
}
The properties that we provide in Agent configuration file are passed to configure(). We get the key value and pass it to Interceptor. Flume shall call the Builder implementation while creating the Interceptor chain.
That's pretty much it from the coding perspective.
Using the custom Interceptor
Create the jar and add it to Flume Agent's classpath. Add the configuration and start the agent