[Flume Cookbook] Extracting/Validating File Channel data

Apache Flume’s File Channel prevents data loss in case of an agent shutdown. There have been instances where due to some malformed events, the Sink or downstream system throws an Exception, leading to Flume Agent clogging the Channel. The crude way of dealing with this situation is to delete the data and run the agent again. This would lead to data loss. Recently released File Channel Integrity tool enhancements provide a way handle the situation.

Validating the File Channel data

This need comes from real world usage, more at FLUME-2613, where data was stuck due to unexpected formatting. The downstream system would always reject the malformed data, leading to the channel data being stuck.

Flume 1.6 release has enhancement in File Channel Integrity tool that support adding a custom validator. We provide our custom validator to the tool and if the event doesn’t confirm to the our validation rules, we just have to return false and the Event shall be dropped from the Channel.

Let’s quickly look at the Event validator interface

/**
 * Event Validator interface to be used for validating Events
 * per custom logic
 */
public interface EventValidator {

  /**
   * Validate the Event in a application specific manner
   *
   * @param event Flume Event
   * @return  true if Event is valid as per App Logic
   */
  boolean validateEvent(Event event);

  EventValidator NOOP_VALIDATOR = new EventValidator() {
    @Override
    public boolean validateEvent(Event event) {
      return true;
    }
  };

  interface Builder extends Configurable {
    EventValidator build();
  }
}

The Event Validator has a NOOP implementation which doesn’t validate the Event at all. By default, all events pass the validation.

Now let’s look at a practical scenario. We have JSON Events and need to validate that all the Events have valid JSON. It could be any custom validation like XML Validation or any specific data level validation as well. Let’s quickly look at a JSON validator.

public static class JsonEventVerifier implements EventValidator {

    private static final Gson gson = new Gson();

    @Override
    public boolean validateEvent(Event event) {
      byte[] data = event.getBody();
      
      // Just a simple way
      try {
          gson.fromJson(new String(data), Object.class);
          return true;
      } catch(com.google.gson.JsonSyntaxException ex) { 
          return false;
      }
    }

    public static class Builder implements EventValidator.Builder {

      @Override
      public EventValidator build() {
        return new JsonEventVerifier();
      }

      @Override
      public void configure(Context context) {
        // We have nothing to do here, leaving it as is
      }
    }
  }

Compile the code and put it in Flume lib folder. Now from the command line, run the File Channel integrity tool

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e  org.apache.flume.tools.JsonEventVerifier

Once you run this, only the Events which confirms to the verifier shall be part of Channel, rest all shall be dropped.

Extracting Data from File Channel

Looking at example above, we can implement EventValidator interface with a side-effect of dumping data to a file or any other media/format. So if we add a log statement in validateEvent() API, we can dump the content using a logger. This is not limited to logging, we can use any format as needed to extract and dump the content of the channel. The interface gives us handle to the Channel data.

One thought on “[Flume Cookbook] Extracting/Validating File Channel data

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.