Flume: Getting started with Interceptors

How to create a Flume plugin that can listen to any incoming event and alter events’ content on-the-fly ? Using interceptors as described in Flume-interceptors.
But what is a flume Event, and what an Event interceptor might look like ?

Interceptor implementation

First, one needs to define what’s an event. According to flume definition, “A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes.[…]”. Its JSON representation could be the following:

{"headers":{"k1":"v1"},"body":"this is my body"}

We wish to alter incoming flow and enrich with custom headers (custom attributes) as follows:

{"headers":{"k1":"v1","k2","v2"},"body":"this is my body"}

Create project and dependencies

Let’s create a simple maven project with the following dependency. I am currently using version 1.4.0-cdh4.6.0 (cloudera release) of Flume, but the implementation should be the same regardless the flume version.

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>${flume.core.version}</version>
        </dependency>

Build interception class

I created a CustomHostInterceptor as described below. The goal of this interceptor is to enrich incoming data with hostname value. This, obviously, is already packaged in flume libraries (org.apache.flume.interceptor.HostInterceptor$Builder), but this will give you a good example of how the interception works.
Here is what the Interceptor interface looks like. This will be implemented by our CustomHostInterceptor.class.


public interface Interceptor {

    void initialize();

    Event intercept(Event event);

    List<Event> intercept(List<Event> list);

    void close();

    static interface Builder extends Configurable {
        Interceptor build();
    }
}

static Builder will read properties from Flume context and initialize Interceptor class accordingly. The interceptor logic itself happens in intercept(Event event) method. Since flume is able to handle batch of events, we therefore have to handle batch of interceptions (hence the List intercept(List list) method).

public class CustomHostInterceptor
        implements Interceptor {

    private String hostValue;
    private String hostHeader;

    public CustomHostInterceptor(String hostHeader){
        this.hostHeader = hostHeader;
    }

    @Override
    public void initialize() {
        // At interceptor start up
        try {
            hostValue =
                    InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new FlumeException("Cannot get Hostname", e);
        }
    }

    @Override
    public Event intercept(Event event) {

        // This is the event's body
        String body = new String(event.getBody());

        // These are the event's headers
        Map<String, String> headers = event.getHeaders();

        // Enrich header with hostname
        headers.put(hostHeader, hostValue);

        // Let the enriched event go
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        List<Event> interceptedEvents =
                new ArrayList<Event>(events.size());
        for (Event event : events) {
            // Intercept any event
            Event interceptedEvent = intercept(event);
            interceptedEvents.add(interceptedEvent);
        }

        return interceptedEvents;
    }

    @Override
    public void close() {
        // At interceptor shutdown
    }

    public static class Builder
            implements Interceptor.Builder {

        private String hostHeader;

        @Override
        public void configure(Context context) {
            // Retrieve property from flume conf
            hostHeader = context.getString("hostHeader");
        }

        @Override
        public Interceptor build() {
            return new CustomHostInterceptor(hostHeader);
        }
    }
}

No need to spend time explaining what’s happening there, pretty self-explanatory. The Hostname will be written on event’s header under a custom key (supplied through the flume configuration using key “hostHeader”).

Package / Install Interceptor

In 3 words : mvn clean package 🙂
This above cmd will package your interceptor into a single JAR file that can be read at flume-agent start up. Your jar file must be uploaded on each server running flume-agent. Default path for any flume plugin is /usr/lib/flume-ng/lib/. Path might be slightly different when using Cloudera-manager. The interceptor can be used, but needs to be activated first from the flume configuration (/etc/flume-ng/conf/flume.conf).


# Describe sources
###########################

a1.sources = r1
# .../...
# Attached the interceptor to the source
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.aamend.hadoop.flume.interceptor.CustomHostInterceptor$Builder
a1.sources.r1.interceptors.i1.hostHeader = hostname

# Describe sinks
###########################

# .../...

# Describe channels
###########################

# .../...

Once properly configured, restart the flume-ng daemon and monitor the flume log file (/var/log/flume-ng/flume.log).
Any incoming event should now have been enriched with hostname. Enabling Logging as per the source code available on my GitHub account, you should see something like:

2014-06-11T13:04:48+0100  INFO .flume.interceptor.CustomHostInterceptor:  50 - Before interception : {"headers":{"k1":"v1"},"body":"this is my body"}
2014-06-11T13:04:48+0100  INFO .flume.interceptor.CustomHostInterceptor:  65 - After  interception : {"headers":{"hostname":"localhost.localdomain","k1":"v1"},"body":"this is my body"}

Congratulations, your interceptor works. You can then alter any incoming events, enrich headers, or even redirect to different channels based on header’s values (see multiplexing). The latter will be a matter of discussion in a next post.

Cheers,
Antoine

Advertisements

9 thoughts on “Flume: Getting started with Interceptors

  1. Thank you for sharing this! It worked for me! What I would like to do now is the following.I have a HTTP Flume source that is receiving events having different HTTP headers. I would like to do the same thing as you did in your tutorial but to transform the existing header values.

  2. Sweet write-up. Here’s the import statements required to compile the example interceptor, which my IDE wasn’t too helpful with in compiling:
    “`
    import org.apache.flume.*;
    import org.apache.flume.interceptor.*;

    import java.util.List;
    import java.util.Map;
    import java.util.ArrayList;
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    “`

  3. Hi,
    I compiled the code successfully but I’m getting the error:

    java.lang.ClassNotFoundException: com.aamend.hadoop.flume.interceptor.CustomHostInterceptor$Builder

    I put the jar file in $FLUME_HOME/lib and added the lib directory to the classpath.
    I also put it into $FLUME_HOME/plugin.d/interceptor1/lib

    What am I doing wrong?
    Thx
    Volker

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s