Flume: Getting started with Interceptors

How to create a Flume plugin that can listen to any incoming event and alter events’ content on-the-fly ? Using interceptors as described in Flume-interceptors.
But what is a flume Event, and what an Event interceptor might look like ?

Interceptor implementation

First, one needs to define what’s an event. According to flume definition, “A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes.[…]”. Its JSON representation could be the following:

{"headers":{"k1":"v1"},"body":"this is my body"}

We wish to alter incoming flow and enrich with custom headers (custom attributes) as follows:

{"headers":{"k1":"v1","k2","v2"},"body":"this is my body"}

Create project and dependencies

Let’s create a simple maven project with the following dependency. I am currently using version 1.4.0-cdh4.6.0 (cloudera release) of Flume, but the implementation should be the same regardless the flume version.

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>${flume.core.version}</version>
        </dependency>

Build interception class

I created a CustomHostInterceptor as described below. The goal of this interceptor is to enrich incoming data with hostname value. This, obviously, is already packaged in flume libraries (org.apache.flume.interceptor.HostInterceptor$Builder), but this will give you a good example of how the interception works.
Here is what the Interceptor interface looks like. This will be implemented by our CustomHostInterceptor.class.


public interface Interceptor {

    void initialize();

    Event intercept(Event event);

    List<Event> intercept(List<Event> list);

    void close();

    static interface Builder extends Configurable {
        Interceptor build();
    }
}

static Builder will read properties from Flume context and initialize Interceptor class accordingly. The interceptor logic itself happens in intercept(Event event) method. Since flume is able to handle batch of events, we therefore have to handle batch of interceptions (hence the List intercept(List list) method).

public class CustomHostInterceptor
        implements Interceptor {

    private String hostValue;
    private String hostHeader;

    public CustomHostInterceptor(String hostHeader){
        this.hostHeader = hostHeader;
    }

    @Override
    public void initialize() {
        // At interceptor start up
        try {
            hostValue =
                    InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new FlumeException("Cannot get Hostname", e);
        }
    }

    @Override
    public Event intercept(Event event) {

        // This is the event's body
        String body = new String(event.getBody());

        // These are the event's headers
        Map<String, String> headers = event.getHeaders();

        // Enrich header with hostname
        headers.put(hostHeader, hostValue);

        // Let the enriched event go
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        List<Event> interceptedEvents =
                new ArrayList<Event>(events.size());
        for (Event event : events) {
            // Intercept any event
            Event interceptedEvent = intercept(event);
            interceptedEvents.add(interceptedEvent);
        }

        return interceptedEvents;
    }

    @Override
    public void close() {
        // At interceptor shutdown
    }

    public static class Builder
            implements Interceptor.Builder {

        private String hostHeader;

        @Override
        public void configure(Context context) {
            // Retrieve property from flume conf
            hostHeader = context.getString("hostHeader");
        }

        @Override
        public Interceptor build() {
            return new CustomHostInterceptor(hostHeader);
        }
    }
}

No need to spend time explaining what’s happening there, pretty self-explanatory. The Hostname will be written on event’s header under a custom key (supplied through the flume configuration using key “hostHeader”).

Package / Install Interceptor

In 3 words : mvn clean package 🙂
This above cmd will package your interceptor into a single JAR file that can be read at flume-agent start up. Your jar file must be uploaded on each server running flume-agent. Default path for any flume plugin is /usr/lib/flume-ng/lib/. Path might be slightly different when using Cloudera-manager. The interceptor can be used, but needs to be activated first from the flume configuration (/etc/flume-ng/conf/flume.conf).


# Describe sources
###########################

a1.sources = r1
# .../...
# Attached the interceptor to the source
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.aamend.hadoop.flume.interceptor.CustomHostInterceptor$Builder
a1.sources.r1.interceptors.i1.hostHeader = hostname

# Describe sinks
###########################

# .../...

# Describe channels
###########################

# .../...

Once properly configured, restart the flume-ng daemon and monitor the flume log file (/var/log/flume-ng/flume.log).
Any incoming event should now have been enriched with hostname. Enabling Logging as per the source code available on my GitHub account, you should see something like:

2014-06-11T13:04:48+0100  INFO .flume.interceptor.CustomHostInterceptor:  50 - Before interception : {"headers":{"k1":"v1"},"body":"this is my body"}
2014-06-11T13:04:48+0100  INFO .flume.interceptor.CustomHostInterceptor:  65 - After  interception : {"headers":{"hostname":"localhost.localdomain","k1":"v1"},"body":"this is my body"}

Congratulations, your interceptor works. You can then alter any incoming events, enrich headers, or even redirect to different channels based on header’s values (see multiplexing). The latter will be a matter of discussion in a next post.

Cheers,
Antoine