Spark: Use Spark-SQL on SQL Developer

Screen Shot 2014-10-25 at 14.29.50

I’m describing here how I set SQL Developer to connect / query my Spark cluster. I made it work on my local environment below:

  • Ubuntu precise 64 bits (1 master, 2 slaves)
  • Hadoop Hortonworks 2.4.0.2.1.5.0-695
  • Hive version 0.13.0.2.1.5.0-695, metastore hosted on MySQL database
  • Spark 1.1.0 prebuilt for Hadoop 2.4
  • SQL Developer 4.0.3.16

Note that I’ve successfully tested same setup on a 20 nodes cluster on AWS (EMR)

Preparing your Spark environment

First of all, you will have to make sure Spark SQL is able to connect Hive metastore. Simply copy hive-site.xml file in Spark conf directory

sudo cp /etc/hive/conf/hive-site.xml /usr/lib/spark/conf/

Open a hive shell, and create a dummy table

hive> CREATE TABLE dummy (foo STRING, bar STRING);
OK
Time taken: 2.555 seconds

Now open a spark-sql shell, and make sure you can find the newly created table. Note that –master local can be used here since we’re not running any job.

vagrant@vagrant:~$ /usr/lib/spark/bin/spark-sql --master local
../..
spark-sql> show tables;
../..
dummy
../..
Time taken: 3.452 seconds
../..

Great, looks like Spark-SQL is now connected to Hive metastore…

Install Hive JDBC drivers on SQL Developer

oraSqlD01

SQL Developer is quite a nice tool – at least for Oracle databases. In order to make it work with Hive / Spark, we need to download the Hive connectors first. I found them on Cloudera website. Note that I was not able to find same from Hortonworks. This could be an issue if I was using Hive natively, but it worth giving a try on Spark first.

Unzip the downloaded archive (I’m using JDBC4 package), and open SQLDeveloper.
You will have to add third-part connectors from preferences -> database -> Third party JDBC driver. Add all the jars included in your archive

~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/libthrift-0.9.0.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/HiveJDBC4.jar                                           
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/log4j-1.2.14.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/TCLIServiceClient.jar                                   
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/ql.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/hive_metastore.jar                                      
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/slf4j-api-1.5.8.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/hive_service.jar                                        
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/slf4j-log4j12-1.5.8.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/libfb303-0.9.0.jar

Once added, restart SQL Developer, and ohhh, awesome, a new tab “Hive” magically appeared ๐Ÿ™‚

Screen Shot 2014-10-25 at 15.07.30

Open this tab and create a new connection to your HiveServer2 as follows

Screen Shot 2014-10-25 at 14.22.29

And test connection ! Assuming HiveServer2 / Hive Metastore is running, your connection should be established and a new Hive Worksheet should pop up. At this stage, connection From SQL Developer to Hive is up, but SQL are executed on Hive, not Spark !

Screen Shot 2014-10-25 at 15.15.43

Now we need to switch back to Spark !

The goal is now to use Spark thrift interface in lieu of vanilla HiveServer2. Stop HiveServer2 daemon, and start Spark thrift interface as follows..

vagrant@vagrant:~/usr/lib/spark/sbin/start-thriftserver.sh --master yarn-client
Spark assembly has been built with Hive, including Datanucleus jars on classpath
../..
14/10/25 15:11:49 INFO service.AbstractService: Service:OperationManager is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service: SessionManager is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service: CLIService is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service:ThriftBinaryCLIService is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service: HiveServer2 is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service:OperationManager is started.
14/10/25 15:11:49 INFO service.AbstractService: Service:SessionManager is started.
14/10/25 15:11:49 INFO service.AbstractService: Service:CLIService is started.
14/10/25 15:11:49 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.56.101:9083
14/10/25 15:11:49 INFO hive.metastore: Waiting 1 seconds before next connection attempt.
14/10/25 15:11:50 INFO hive.metastore: Connected to metastore.
14/10/25 15:11:50 INFO service.AbstractService: Service:ThriftBinaryCLIService is started.
14/10/25 15:11:50 INFO service.AbstractService: Service:HiveServer2 is started.
14/10/25 15:11:50 INFO thriftserver.HiveThriftServer2: HiveThriftServer2 started
14/10/25 15:11:50 INFO thrift.ThriftCLIService: ThriftBinaryCLIService listening on 0.0.0.0/0.0.0.0:10000
../..

Looking at application master log file, Spark is actually embedding its own HiveServer2 into a Spark job. Connection will be apparently dropped if Spark job ends up, so make sure Spark job is not killed !

I’m using yarn-client to handle my JDBC connections, but this should support spark standalone clusters as well

Now get back to your SQL Developer, and restore connection to Hive (HiveServer is now embedded into a Spark job), and you’re now connected to Spark !

And voila !

For testing purpose, I’ve downloaded some ngrams data from S3 (link), and created a new Table ngrams on Hive. I’m now able to get lightning fast SQL queries on Spark from a user friendly environment (SQL Developer).

Screen Shot 2014-10-25 at 14.21.35

As initially said, this seems quite hacky (Remember I used Cloudera JDBC on Hortonworks environment), but it seems to work, and it seems to be really fast ! Please let me know the environment / distribution you’re using if you’re facing any issue..

Cheers!
Antoine

Advertisements

Spark / Hadoop: Processing GDELT data using Hadoop InputFormat and SparkSQL

Screen Shot 2014-09-24 at 20.55.34

GDELT

A quick overview of GDELT public data set:
GDELT Project monitors the world’s broadcast, print, and web news from nearly every corner of every country in over 100 languages and identifies the people, locations, organisations, counts, themes, sources, and events driving our global society every second of every day, creating a free open platform for computing on the entire world

Such a dataset is a perfect match for machine learning, and an excellent use case for getting started with Spark and Hadoop Input Format. I won’t report here the data format specification, but have a look there. For this exercise I’m using “GDELT Event Database” data set only that fits into simple JSON events.

The goal of this exercise is to do the following:

  • Create an Hadoop GDELT -> Json InputFormat
  • Read data from Spark shell
  • Convert Json object as Spark SQL table
  • Query table and extract result (Goldstein Scale)

Well, quite a challenge indeed… However, if you’re not visiting my blog for the first time, the concept of InputFormat should be familiar. If not, have a look at my previous posts. Only Spark (and Spark-SQL) is a new concept here.

Create a GDELT -> Json Parser

GDELT Data are tab separated values. We can easily parse data and store each key value pair as a JSON element. However, I want to get nested entities (such as eventGeo, actor1Geo, actor2Geo, actor1Codes, actor2Codes) as their keys will always be the same. My final JSON structure looks like the following:

{
    "isRoot": true,
    "eventGeo": {
        "countryCode": "US",
        .../...
    },
    "actor1Code": {
        "code": "BUS",
        .../...
    },
    "actor2Code": {
        "code": "BUS",
        .../...
    },
    "actor1Geo": {
        "countryCode": "US",
        .../...
    },
    "actor2Geo": {
        "countryCode": "US",
        .../...
    },
    "goldsteinScale": 6,
    "quadClass": 2,
    "month": 201409,
    "GlobalEventID": 313271341,
    .../...
}

To get from a bunch of raw file to structured Json, I’m using nothing more than a standard InputFormat with standard RecordReader. Whenever a new line is consumed, I send it to my Json parser and output the result to the hadoop framework.

        while (pos < end) {

            // Let Hadoop LineReader consume the line into tmp text...
            newSize = in.readLine(tmp, 
                    maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 
                            maxLineLength));

            // ... Take the result and feed my parser
            // tmp is the raw data
            // value will be the expected JSON object
            value.set(parse(tmp.toString()));
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            if (newSize < maxLineLength) {
                break;
            }

Have a look at com.aamend.hadoop.format.GdeltRecordReader class on my Github account. Before you might say anything, note that performance is out of the scope of that study ๐Ÿ™‚ . I’m basically using google json-simple as a third part dependency (see below maven deps). Raw files being stored on HDFS, any event read through my GdeltInputFormat.class will be actually seen as a JSON object.

<dependency>
  <groupId>com.googlecode.json-simple</groupId>
  <artifactId>json-simple</artifactId>
  <version>1.1</version>
</dependency>

Do not forget to add these dependencies to your Spark job! I’m building a fat jar for that purpose (refer to previous post).

Read data from spark shell

Assuming you get a spark distribution up and running, simply start a shell session as follows. Note the “–jars” options that will add all your dependencies to your classpath.

spark-shell --jars spark-1.0-SNAPSHOT-jar-with-dependencies.jar

Let’s get deeper into that spark shell session. I’m using newAPIHadoopFile from the sparkContext in order to read Hadoop file with a custom InputFormat. I need to supply here the input path and the InputFormat to be used. I also need to provide spark context with the expected Key / Value class of my InputFormat implementation.

import com.aamend.hadoop.format.GdeltInputFormat
import org.apache.hadoop.io.Text

// Read file from HDFS - Use GdeltInputFormat
val input = sc.newAPIHadoopFile(
   "hdfs://path/to/gdelt",
   classOf[GdeltInputFormat],
   classOf[Text],
   classOf[Text]
)

We now have a HadoopRDD (containing key / value pairs), but still not the bunch of String (json) we expected. Conversion is done as follows

  // Get only the values from Hadoop Key / Values pair
  val values = input.map{ case (k,v) => v}

  // Need to deserialize Text
  val json = values.map(t => t.toString)

OK, we’re good to go. Spark is able to read our Json GDelt files. Not convinced ? Print out the first 3 lines as follows

  values.take(3).foreach(println)

Convert Json object as Spark SQL table

Spark SQL can be used through the Spark SQLContext. Although it does not physically create any Table, it maps the supplied RDDs into a SchemaRDD that can be queried from SQL.

  import org.apache.spark.sql.SQLContext
  
  // Access spark SQL context
  val sqlContext = new SQLContext(sc)

  // Load JSON (String) as a SQL JsonRDD
  val jsonRDD = sqlContext.jsonRDD(json)

We can make sure the extracted data structure is correct using below command. Quite handy, isn’t it ? Fields will be treated as String, Integer or Double, etc.. depending on their Json value.

  jsonRDD.printSchema()
root
 |-- GlobalEventID: integer (nullable = true)
 |-- actor1Code: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- ethnicCode: string (nullable = true)
 |    |-- knownGroupCode: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- religion1Code: string (nullable = true)
 |    |-- religion2Code: string (nullable = true)
 |    |-- type1Code: string (nullable = true)
 |    |-- type2Code: string (nullable = true)
 |    |-- type3Code: string (nullable = true)
 |-- actor1Geo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- actor2Code: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- ethnicCode: string (nullable = true)
 |    |-- knownGroupCode: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- religion1Code: string (nullable = true)
 |    |-- religion2Code: string (nullable = true)
 |    |-- type1Code: string (nullable = true)
 |    |-- type2Code: string (nullable = true)
 |    |-- type3Code: string (nullable = true)
 |-- actor2Geo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- avgTone: double (nullable = true)
 |-- dateAdded: string (nullable = true)
 |-- day: string (nullable = true)
 |-- eventBaseCode: string (nullable = true)
 |-- eventCode: string (nullable = true)
 |-- eventGeo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- eventRootCode: string (nullable = true)
 |-- fracDate: double (nullable = true)
 |-- goldsteinScale: double (nullable = true)
 |-- isRoot: boolean (nullable = true)
 |-- month: integer (nullable = true)
 |-- numArticles: integer (nullable = true)
 |-- numMentions: integer (nullable = true)
 |-- numSources: integer (nullable = true)
 |-- quadClass: integer (nullable = true)
 |-- sourceUrl: string (nullable = true)
 |-- year: integer (nullable = true)

Assuming the data structure is now correct, we register this SchemaRDD as a Spark table. Let’s now play with SQL !

  jsonRDD.registerAsTable("gdelt")

Query table and extract result (Goldstein Scale)

Definition of Goldstein scale taken from GDELT website: “Each CAMEO event code is assigned a numeric score from -10 to +10, capturing the theoretical potential impact that type of event will have on the stability of a country“. Ranks can be found there.

We will try to get an average Goldstein scale for any article involving France as primary actor and United Kingdom as secondary one. Is France willing to declare war to UK based on the past few days events ? Hopefully not, but let’s get some proofs ๐Ÿ™‚

  val relations = sqlContext.sql(
    "SELECT day, AVG(goldsteinScale) " +
      "FROM gdelt WHERE " +
      "actor1Geo.countryCode = 'FR' AND actor2Geo.countryCode = 'UK' " +
      "GROUP BY day")

Note the nestedEntity query (actor2Geo.countryCode) used because of my Json structure. Such nested structure makes my SQL even more readable

We finally save our final RDD as a text file(s) in HDFS, … and …

  relations.map(row => row(0) + "\t" + row(1)).saveAsTextFile("hdfs://output/path")

… And plot data into a Graph

Screen Shot 2014-09-24 at 23.51.55

Well, It looks like I can stay in the UK for a while ๐Ÿ™‚
Anyway, I’ve learned a lot for this exercise, so I hope you did enjoy reading too. Please find my code on Github / Spark project (Github), and should you have any question, feel free to ask.

Cheers!
Antoine

Use case: Ukraine Russia relations over the past 30 years

Using such a record reader, you can build the following graph. I’ve reported the Ukraine – Russia diplomatic relations for the past 30 years.
Even better, you can smooth such a goldstein scale by using a moving average (5,10 and 30 days in below example). Such moving average has been built on scala / spark (see : MovingAverageGoldstein.scala)

MovingAverageGoldstein

Enjoy !

Spark: Create a simple Spark job

MapReduce is dead, long live Spark !

Following big data new trends, the logical next step for me is to start getting my head around apache Spark. Yarn is yet another resource negotiator indeed, but definitely not yet another big data application… Although you can still execute MapReduce on Yarn, writing a Yarn application is such a pain ! Have a look at below article. I reached out the same conclusion after yet another headache !

http://www.edwardcapriolo.com/roller/edwardcapriolo/entry/yarn_either_it_is_really

Spark runs on top of Yarn, but it does not require writing a new Client, ApplicationManager or Container classes. Anyway, letโ€™s jump to Apache Spark and Scala. Iโ€™m describing here how to quickly install Spark / Scala on a Yarn cluster and to submit a Scala object that runs on Spark client.

Install Spark client / Scala dependencies

Install Scala:

wget http://www.scala-lang.org/files/archive/scala-2.10.1.tgz
tar xvf scala-2.10.1.tgz
mv scala-2.10.1 /usr/local/lib/
ln -s /usr/local/lib/scala-2.10.1/ /usr/local/lib/scala

Install Spark

Download a pre-built spark distribution matching your hadoop distro. Alternatively, build your own from source.

wget http://www.apache.org/dyn/closer.cgi/spark/spark-1.1.0/spark-1.1.0-bin-hadoop2.4.tgz
tar xvf spark-1.1.0-bin-hadoop2.4.tgz
mv spark-1.1.0-bin-hadoop2.4 /usr/local/lib/
ln -s /usr/local/lib/spark-1.1.0-bin-hadoop2.4 /usr/local/lib/spark

Update environment

JAVA_HOME=/usr/lib/jvm/j2sdk1.6-oracle
HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
YARN_HOME=/usr/lib/hadoop-yarn
HADOOP_HDFS_HOME=/usr/lib/hadoop-hdfs
HADOOP_COMMON_HOME=/usr/lib/hadoop
HADOOP_CONF_DIR=/etc/hadoop/conf
YARN_CONF_DIR=/etc/hadoop/conf
ZOO_LOG_DIR=/var/log/zookeeper
SCALA_HOME=/usr/lib/scala

Create a maven Spark application

Here are the maven dependencies Iโ€™m using. Simply replace hadoop.version with the right hadoop distribution / version. Remember to add the right maven repository to your settings.xml / artifactory / nexus should you want to use non apache distributions (such as Hortonworks, Cloudera)

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>

Since this project will be scala based, we need to tell Maven to compile Scala objects. This can be done using below plugin

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Project structure will be the following

-src
-- main
--- scala
---- com
----- wordpress
------ hadoopi
------- spark
-------- MyFirstJob.scala
-- test
- target

My Simple Spark job is as follows:

package com.wordpress.hadoopi.spark
import org.apache.spark.{SparkConf, SparkContext}

object SparkGrepScala extends App {

  val sc = new SparkContext(new SparkConf().setAppName("Grep"))

  // Retrieve the key words to look for, comma separated values
  val keyWords = args(2).split(",")

  // Open file(s) in that HDFS directory
  val input = sc.textFile(args(0))

  // Filter lines containing at least one of these keywords
  val filteredLines = input.filter(line => keyWords.exists( word => line.contains(word)))

  // Output matching line to HDFS
  filteredLines.saveAsTextFile(args(1))
}

What this simple app does is to retrieve command line arguments (keywords), read a set of input files from HDFS, filter out any line matching these comma separated keywords, and output the line back to HDFS.

Execution

Package this project as mvn clean package and upload it to your spark node (Hadoop node with Spark client installed)

Input data:

Create a new HDFS directory and put hadoop configuration for testing purpose

hadoop fs -mkdir input
hadoop fs -put /etc/hadoop/conf/*.xml input

Execute spark job using spark-submit

/usr/lib/spark/bin/spark-submit --class com.wordpress.hadoopi.spark.SparkGrepScala spark-1.0-SNAPSHOT.jar input output dfs,yarn

This will filter out any line matching either dfs or yarn keyword. Notice how fast spark is compare to MapReduce framework (Even with a tiny dataset).

Output is as follows

hadoop fs -cat output/*

    <name>yarn.scheduler.capacity.maximum-applications</name>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
    <name>yarn.scheduler.capacity.root.queues</name>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <name>yarn.scheduler.capacity.root.default.state</name>
    <name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
    <name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
    <name>yarn.scheduler.capacity.node-locality-delay</name>
    <value>hdfs://vagrant:8020</value>
    <name>dfs.namenode.name.dir</name>
    <value>/hadoop/dfs/nn</value>
    <name>dfs.datanode.data.dir</name>
    <value>/hadoop/dfs/dn</value>
    <name>dfs.webhdfs.enabled</name>
    <name>dfs.replication</name>
    <value>yarn</value>
    <name>yarn.app.mapreduce.am.staging-dir</name>
    <name>yarn.resourcemanager.resource-tracker.address</name>
    <name>yarn.web-proxy.address</name>
    <name>yarn.resourcemanager.address</name>
    <name>yarn.resourcemanager.scheduler.address</name>
    <name>yarn.resourcemanager.admin.address</name>
    <name>yarn.resourcemanager.webapp.address</name>
    <name>yarn.nodemanager.aux-services</name>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <name>yarn.log-aggregation-enable</name>
    <name>yarn.nodemanager.local-dirs</name>
    <value>/var/lib/hadoop-yarn/cache/${user.name}/nm-local-dir</value>
    <name>yarn.nodemanager.log-dirs</name>
    <value>/var/log/hadoop-yarn/containers</value>
    <name>yarn.nodemanager.remote-app-log-dir</name>
    <value>/var/log/hadoop-yarn/apps</value>
    <name>yarn.application.classpath</name>

And voila

It’s fast, simple to install, and it appears to work ๐Ÿ™‚ Now, all I have to do is to learn Scala in order to be as efficient as I used to be on standard MapReduce !
Great. So if you followed this article, you created a first Scala project that runs on Spark (command line interface).

Cheers,
Antoine

Hadoop: Test MapReduce using MRUnit

When you implement a new MapReduce job, it could be quite handy to test it locally before screwing up your production environment with unstable code.. Although, I too like to live dangerously..

images

You can still package your code and submit your jar file to a running test / dev environment (or even better, to spin up a HadoopMiniCluster), but what if you could validate your mapper / reducer separately on a JUnit ? For that purpose, MRUnit is a perfect tool.

MRUNit

According to cloudera website “MRUnit helps bridge the gap between MapReduce programs and JUnit by providing a set of interfaces and test harnesses, which allow MapReduce programs to be more easily tested using standard tools and practices.”.
Maven dependencies can be found as follows:

<dependency>
  <groupId>org.apache.mrunit</groupId>
  <artifactId>mrunit</artifactId>
  <version>1.0.0</version>
  <classifier>hadoop2</classifier>
  <scope>test</scope>
</dependency>

Pay attention at classifier tag. I’m using version 2 of MapReduce. A common source of misunderstanding in Hadoop is the mapreduce version (basically 1 or 2) and framework used (Map reduce or Yarn). That’s said, start with classifier = hadoop2, and should you encounter the error “Expected an interface, get a class something”, fall back to hadoop1 classifier ๐Ÿ™‚

Implementation

On your JUNit class, get an instance on MapDriver in order to test mapper class only, ReduceDriver for reducer class only, or MapReduceDriver to get a full E2E testing. Remember to use mapred package for Old API or mapreduce package for New API.
Let’s jump in below code:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;

@RunWith(JUnit4.class)
public class WordCountTest {

    private Mapper mapper = new WordCount.WordCountMapper();
    private Reducer reducer = new WordCount.WordCountReducer();
    private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver = MapDriver.newMapDriver(mapper);
    private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);

    @Test
    public void testMapOnly() throws IOException {
        mapDriver.getConfiguration().set("foo","bar");
        mapDriver.addAll(getInput());
        mapDriver.withAllOutput(getOutput("output/wordcount-output-mapper.txt"));
        mapDriver.withCounter("DATA","line", getInputLines());
        mapDriver.runTest();
    }

    @Test
    public void testMapReduce() throws IOException {
        mapReduceDriver.addAll(getInput());
        mapReduceDriver.withAllOutput(getOutput("output/wordcount-output.txt"));
        mapReduceDriver.withCounter("DATA","line", getInputLines());
        mapReduceDriver.withCounter("DATA","word", getOutputRecords());
        mapReduceDriver.runTest();
    }

The “tricky” part is to properly declare Input / Output key values class for all drivers as follows

MapReduceDriver<MapperInputKey.class, MapperInputValue.class, MapperOutputKey.class, MapperOutputValue.class, ReduceOutputKey.class, ReduceOutputValue.class> driver;

As you can see in this code snippet, you can still play with some Hadoop features (such as configuration, counters)..
FYI, I’m using below external files for such a test

wordcount-input.txt

Hadoop mapreduce JUnit testing
Hadoop JUnit testing
JUnit
Hadoop hdfs mapreduce
Hadoop
testing
Hadoop

wordcount-output.txt

hadoop,1
mapreduce,1
junit,1
testing,1
hadoop,1
junit,1
testing,1
junit,1
hadoop,1
hdfs,1
mapreduce,1
hadoop,1
testing,1
hadoop,1

wordcount-output.txt

hadoop,5
hdfs,1
junit,3
mapreduce,2
testing,3

A file will be read to feed MapReduce, and another one to get the expected output key / values pairs using below static methods.


    private static List<Pair<LongWritable, Text>> getInput(){
        Scanner scanner = new Scanner(WordCountTest.class.getResourceAsStream("input/wordcount-input.txt"));
        List<Pair<LongWritable, Text>> input = new ArrayList<Pair<LongWritable, Text>>();
        while(scanner.hasNext()){
            String line = scanner.nextLine();
            input.add(new Pair<LongWritable, Text>(new LongWritable(0), new Text(line)));
        }
        return input;
    }

    private static int getInputLines(){
        Scanner scanner = new Scanner(WordCountTest.class.getResourceAsStream("input/wordcount-input.txt"));
        int line = 0;
        while(scanner.hasNext()){
            scanner.nextLine();
            line++;
        }
        return line;
    }

    private static int getOutputRecords(){
        Scanner scanner = new Scanner(WordCountTest.class.getResourceAsStream("output/wordcount-output.txt"));
        int record = 0;
        while(scanner.hasNext()){
            scanner.nextLine();
            record++;
        }
        return record;
    }

    private static List<Pair<Text,IntWritable>> getOutput(String fileName){
        Scanner scanner = new Scanner(WordCountTest.class.getResourceAsStream(fileName));
        List<Pair<Text,IntWritable>> output = new ArrayList<Pair<Text, IntWritable>>();
        while(scanner.hasNext()){
            String keyValue[] = scanner.nextLine().split(",");
            String word = keyValue[0];
            String count = keyValue[1];
            output.add(new Pair<Text, IntWritable>(new Text(word), new IntWritable(Integer.parseInt(count))));
        }
        return output;
    }

Even though you can supply mapreduce code with a simple key / value pair using add() method, I strongly recommend using several ones using addAll() method. This will ensure Shuffling / partitioning is working well. By doing so, you need however to build these key / values pairs on the exact same order you expect mapreduce output.
Anyway, too much blabla for such a simple tool. Now that you get a working recipe, simply do not test your code into production ๐Ÿ™‚

Cheers!
Antoine

Flume: Getting started with Interceptors

How to create a Flume plugin that can listen to any incoming event and alter events’ content on-the-fly ? Using interceptors as described in Flume-interceptors.
But what is a flume Event, and what an Event interceptor might look like ?

Interceptor implementation

First, one needs to define what’s an event. According to flume definition, “A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes.[…]”. Its JSON representation could be the following:

{"headers":{"k1":"v1"},"body":"this is my body"}

We wish to alter incoming flow and enrich with custom headers (custom attributes) as follows:

{"headers":{"k1":"v1","k2","v2"},"body":"this is my body"}

Create project and dependencies

Let’s create a simple maven project with the following dependency. I am currently using version 1.4.0-cdh4.6.0 (cloudera release) of Flume, but the implementation should be the same regardless the flume version.

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>${flume.core.version}</version>
        </dependency>

Build interception class

I created a CustomHostInterceptor as described below. The goal of this interceptor is to enrich incoming data with hostname value. This, obviously, is already packaged in flume libraries (org.apache.flume.interceptor.HostInterceptor$Builder), but this will give you a good example of how the interception works.
Here is what the Interceptor interface looks like. This will be implemented by our CustomHostInterceptor.class.


public interface Interceptor {

    void initialize();

    Event intercept(Event event);

    List<Event> intercept(List<Event> list);

    void close();

    static interface Builder extends Configurable {
        Interceptor build();
    }
}

static Builder will read properties from Flume context and initialize Interceptor class accordingly. The interceptor logic itself happens in intercept(Event event) method. Since flume is able to handle batch of events, we therefore have to handle batch of interceptions (hence the List intercept(List list) method).

public class CustomHostInterceptor
        implements Interceptor {

    private String hostValue;
    private String hostHeader;

    public CustomHostInterceptor(String hostHeader){
        this.hostHeader = hostHeader;
    }

    @Override
    public void initialize() {
        // At interceptor start up
        try {
            hostValue =
                    InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new FlumeException("Cannot get Hostname", e);
        }
    }

    @Override
    public Event intercept(Event event) {

        // This is the event's body
        String body = new String(event.getBody());

        // These are the event's headers
        Map<String, String> headers = event.getHeaders();

        // Enrich header with hostname
        headers.put(hostHeader, hostValue);

        // Let the enriched event go
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        List<Event> interceptedEvents =
                new ArrayList<Event>(events.size());
        for (Event event : events) {
            // Intercept any event
            Event interceptedEvent = intercept(event);
            interceptedEvents.add(interceptedEvent);
        }

        return interceptedEvents;
    }

    @Override
    public void close() {
        // At interceptor shutdown
    }

    public static class Builder
            implements Interceptor.Builder {

        private String hostHeader;

        @Override
        public void configure(Context context) {
            // Retrieve property from flume conf
            hostHeader = context.getString("hostHeader");
        }

        @Override
        public Interceptor build() {
            return new CustomHostInterceptor(hostHeader);
        }
    }
}

No need to spend time explaining what’s happening there, pretty self-explanatory. The Hostname will be written on event’s header under a custom key (supplied through the flume configuration using key “hostHeader”).

Package / Install Interceptor

In 3 words : mvn clean package ๐Ÿ™‚
This above cmd will package your interceptor into a single JAR file that can be read at flume-agent start up. Your jar file must be uploaded on each server running flume-agent. Default path for any flume plugin is /usr/lib/flume-ng/lib/. Path might be slightly different when using Cloudera-manager. The interceptor can be used, but needs to be activated first from the flume configuration (/etc/flume-ng/conf/flume.conf).


# Describe sources
###########################

a1.sources = r1
# .../...
# Attached the interceptor to the source
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.aamend.hadoop.flume.interceptor.CustomHostInterceptor$Builder
a1.sources.r1.interceptors.i1.hostHeader = hostname

# Describe sinks
###########################

# .../...

# Describe channels
###########################

# .../...

Once properly configured, restart the flume-ng daemon and monitor the flume log file (/var/log/flume-ng/flume.log).
Any incoming event should now have been enriched with hostname. Enabling Logging as per the source code available on my GitHub account, you should see something like:

2014-06-11T13:04:48+0100  INFO .flume.interceptor.CustomHostInterceptor:  50 - Before interception : {"headers":{"k1":"v1"},"body":"this is my body"}
2014-06-11T13:04:48+0100  INFO .flume.interceptor.CustomHostInterceptor:  65 - After  interception : {"headers":{"hostname":"localhost.localdomain","k1":"v1"},"body":"this is my body"}

Congratulations, your interceptor works. You can then alter any incoming events, enrich headers, or even redirect to different channels based on header’s values (see multiplexing). The latter will be a matter of discussion in a next post.

Cheers,
Antoine

Hadoop: Add third-party libraries to MapReduce job

Anybody working with Hadoop should have already faced a same common issue: How to add third-party libraries to your MapReduce job.

Add libjars option

The first solution, maybe the most common one, consists on adding libraries using -libjars parameter on CLI. To make it work, your class MyClass must use GenericOptionsParser class. Easiest way is to implement the Hadoop Tool interface as described in post Hadoop: Implementing the Tool interface for MapReduce driver.

$ export LIBJARS=/path/jar1,/path/jar2
$ hadoop jar /path/to/my.jar com.wordpress.hadoopi.MyClass -libjars ${LIBJARS} value

This will obviously work only when playing with CLI, so how the heck can we add such external jar files when not using CLI ?

Add jar files to Hadoop classpath

You could certainly upload external jar files to each tasktracker and update HADOOOP_CLASSPATH accordingly, but are you really willing to bother Ops team each time you need to add a new jar ? Works well on a single server node, but are you going to upload such jar across all of the 10, 100 or even more Hadoop nodes ? This approach does not scale at all !

Create a fat jar

Another approach is to create a fat jar, which is a JAR that contains your classes as well as your third-party classes (see this Cloudera blog post for more details). Be aware this Jar will not only contain your classes, but might also include all your project dependencies (such as Hadoop libraries) unless you explicitly exclude them (using provided tag).
Here is an example of maven plugin you will need to set up

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>
                             jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

Following a “mvn clean package” command, your fat JAR will be located in maven project’s target directory as follows

drwxr-xr-x  2 antoine  staff        68 Jun 10 09:30 archive-tmp
drwxr-xr-x  3 antoine  staff       102 Jun 10 09:29 classes
drwxr-xr-x  3 antoine  staff       102 Jun 10 09:29 generated-sources
drwxr-xr-x  3 antoine  staff       102 Jun 10 09:29 generated-test-sources
drwxr-xr-x  3 antoine  staff       102 Jun 10 09:29 maven-archiver
drwxr-xr-x  4 antoine  staff       136 Jun 10 09:29 myproject-1.0-SNAPSHOT
-rw-r--r--  1 antoine  staff  63880020 Jun 10 09:30 myproject-1.0-SNAPSHOT-jar-with-dependencies.jar
drwxr-xr-x  4 antoine  staff       136 Jun 10 09:29 surefire-reports
drwxr-xr-x  4 antoine  staff       136 Jun 10 09:29 test-classes

In above example, note the actual size of your JAR file (61MB). Quite fat, isn’t it ?
You can ensure all dependencies have been added by firing up below command

$ jar -tf myproject-1.0-SNAPSHOT-jar-with-dependencies.jar

META-INF/
META-INF/MANIFEST.MF
com/aamend/hadoop/allMyClasses.class
...
com/others/allMyDependencies.class
...

Use Distributed cache

I am always following such approach when using third-party libraries in my MapReduce jobs. One would say such approach is not elegant, but I can work without annoying anyone from Ops team :). I first create a directory “lib” in my HDFS home directory (“/user/hadoopi/”). You could even use “/tmp”, it does not matter. I then create a static method that

  1. Locate the jar file that includes the class I need
  2. Upload this jar to Hadoop HDFS
  3. Add the uploaded jar file to Hadoop distributed cache

Simply add the following lines to some Utils class.

    private static void addJarToDistributedCache(
            Class classToAdd, Configuration conf)
        throws IOException {

        // Retrieve jar file for class2Add
        String jar = classToAdd.getProtectionDomain().
                getCodeSource().getLocation().
                getPath();
        File jarFile = new File(jar);

        // Declare new HDFS location
        Path hdfsJar = new Path("/user/hadoopi/lib/"
                + jarFile.getName());

        // Mount HDFS
        FileSystem hdfs = FileSystem.get(conf);

        // Copy (override) jar file to HDFS
        hdfs.copyFromLocalFile(false, true,
            new Path(jar), hdfsJar);

        // Add jar to distributed classPath
        DistributedCache.addFileToClassPath(hdfsJar, conf);
    }

The only thing you need to remember is to add this class prior to Job submission…


    public static void main(String[] args) throws Exception {

        // Create Hadoop configuration
        Configuration conf = new Configuration();

        // Add 3rd-party libraries
        addJarToDistributedCache(MyFirstClass.class, conf);
        addJarToDistributedCache(MySecondClass.class, conf);

        // Create my job
        Job job = new Job(conf, "Hadoop-classpath");
        .../...
    }

Here you are, your MapReduce is now able to use any external JAR file.

Cheers!
Antoine

Hadoop: Primitive Array Clustering

Hadoop implementation of Canopy Clustering using Levenshtein distance algorithm and other non-mathematical distance measures (such as Jaccard coefficient).

Difference with Mahout

One of the major limitation of Mahout is that the clustering algorithms (K-Means or Canopy clustering) use a mathematical approach in order to compute Clusters’ centers. Each time a new point is added to a cluster, Mahout framework recomputes cluster’s center as an average of data points.

NewCenter[i] = Sum(Vectors)[i] / observations

As a result, only purely mathematical DistanceMeasure can be used. But…

  • What if your data set is composed of non-mathematical data points ?
  • What if an average of points does not make any sense for your business ?
  • Or simply what if you wish to use a non (or less) mathematical distance measure ?

Motivations

I had to create canopies for sequences of IDs (Integer). Let’s take the following example with 2 vectors V1 and V2.

V1={0:123, 1:23, 2:55, 3:141, 4:22}
V2={0:23, 1:55, 2:141, 3:22}

These vectors are totally different using most of standard Mathematical measures Mahout provides (e.g. Euclidean). I can still change the way my vectors are created, but none of the solution I tried were considering my arrays as a sequence of IDs and furthermore a sequence of IDs where the order matters. Levensthein metric (that is usually used for fuzzy string matching) is a perfect match as it compares sequences of IDs and not only IDs as numbers.

I had to create a new set of DistanceMeasure taking arrays as Input parameters.
Besides, assuming both of them belongs to a same cluster, does a new cluster’s center V’ (made as an average of points from V1 and V2) makes sense for sequence analysis ?

V'={0:(23+123)/2, 1:(55+23)/2, 2:(141+55)/2, 3:(22+141)/2, 4:(0+22)/1}

I had to find a way to override Mahout cluster’s center computation. Instead of computing an average of data points, I find the point Pi that minimizes the distance across all cluster’s data points.
Pseudo code:

Point min_point = Pi
float min_dist = Infinity
For each point Pi
  For each point Pj
    Compute distance Pi->Pj
    Update min_point, min_dist if distance < min_dist

Center = minimum

Distance Measures

Supported distance measures are

  • com.aamend.hadoop.clustering.distance.LevenshteinDistance measure
  • com.aamend.hadoop.clustering.distance.TanimotoDistance measure
  • Any Measure implementing com.aamend.hadoop.clustering.distance.DistanceMeasure

Primitive Arrays

Only Integer.class is supported on Version 1.0. It is planned however to support any of the Java primitive arrays (boolean[], char[], int[], double[], long[], float[]). I invite you to actively contribute to this project.

Dependencies

Even though the project has been directly inspired by Mahout canopy clustering, it does not depend on any of Mahout libraries. Instead of using Mahout Vector, I use arrays of Integer, and instead of Mahout VectorWritable, I use Hadoop ArrayPrimitiveWritable. Simply add the maven dependency to your project. Releases versions should be available on Maven Central (synched from Sonatype). Even though this project (actively depends on Hadoop libraries) has been built around Hadoop CDH4 distribution, this can be easily overridden on client side by using maven “exclusion” tag in order to use any of the Hadoop versions / distributions.

    <dependency>
        <groupId>com.aamend.hadoop</groupId>
        <artifactId>hadoop-primitive-clustering</artifactId>
        <version>1.0</version>
        <!-- Should you need to override / exclude hadoop deps. -->
        <!--
        <exclusions>
            <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
            </exclusion>
        </exclusions>
        -->
    </dependency>

Usage

Create canopies

Use buildClusters static method from com.aamend.hadoop.clustering.job.CanopyDriver class

/**
* @param conf the Hadoop Configuration
* @param input the Path containing input PrimitiveArraysWritable
* @param output the final Path where clusters / data will be written to
* @param reducers the number of reducers to use (at least 1)
* @param measure the DistanceMeasure
* @param t1 the float CLUSTER_T1 distance metric
* @param t2 the float CLUSTER_T2 distance metric
* @param cf the minimum observations per cluster
* @return the number of created canopies
*/
public static long buildClusters(
    Configuration conf, Path input,
    Path output, int reducers,
    DistanceMeasure measure,
    float t1, float t2, long cf){
...
}

This will build Canopies using several Map-Reduce jobs (at least 2, driven by the initial number of reducers). Firstly, because we need to keep track of each observed point per clusters in order to minimize intra-distance of data points (obviously cannot fit in memory), Secondly because the measure used here might be fairly inneficient using a single Map job (Levenshtein complexitiy is O(n*m)). In order to allow a smooth run without any hot spot, at each iteration, the number of reducers is 2 times smaller (until reached 1) while {T1,T2} parameters gets slightly larger (starts with half of the required size). Clustering algorithm is defined according to the supplied DistanceMeasure (can be a custom measure implementing DistanceMeasure assuming it is available on Hadoop classpath).

The input data should be a sequenceFile format using any key class (implementing WritableComparable interface) and value should be ArrayPrimitiveWritable (serializing integer array).

The output will be a sequenceFile format using Cluster Id as key (IntWritable) and com.aamend.hadoop.clustering.clusterCanopyWritable as value.

Cluster input data

Once canopies are created, use static clusterData method from com.aamend.hadoop.clustering.job.CanopyDriver class

/**
* @param conf the Configuration
* @param inputData the Path containing input arrays
* @param dataPath the final Path where data will be written to
* @param clusterPath the path where clusters have been written
* @param measure the DistanceMeasure
* @param minSimilarity the minimum similarity to cluster data
* @param reducers the number of reducers to use (at least 1)
*/
public static void clusterData(
    Configuration conf, Path inputData,
    Path dataPath, Path clusterPath,
    DistanceMeasure measure,
    float minSimilarity, int reducers){
...
}

This will retrieve the most probable clusters any point should belongs to. If not 100% identical to cluster’s center, we cluster data if similarity is greater than X% (minSimilarity). Canopies (created at previous steps) are added to Distributed cache.

The output will be a sequenceFile format using Cluster Id as key (IntWritable) and ObjectWritable as value (object pointing to your initial WritableComparable key so that you can keep track of which point belongs to which cluster)

 

Contribution

Source code is available on https://github.com/aamend/hadoop-primitive-clustering

Hadoop: Get a callback on MapReduce job completion

MapReduce jobs might take a long time to complete…ย That’s said, you might have to run your jobs in background, right ?ย You could have a look at Job tracker URL (for MR V1) or Yarn Resource manager (V2) in order to check job completion, but what if you could be notified once job is completed ?

A quick and dirty solution would be to poll JobTracker every X mn as follows


user@hadoop ~ $ hadoop job -status job_201211261732_3134
Job: job_201211261732_3134
file: hdfs://user/lihdop/.staging/job_201211261732_3134/job.xml
tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201211261732_3134
map() completion: 0.0
reduce() completion: 0.0

Working in a support position, I just hate such approach. Getting Cronjobs and deamons for that purpose is always a pain to troubleshoot, always a pain to understand where / why these damned processes did not wake up in time !

Getting a notification instead of polling ? Definitely more elegant…

In your driver class, only 3 lines would enable the callback feature of Hadoop



public class CallBackMR extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        CallBackMR callback = new CallBackMR();
        int res = ToolRunner.run(conf, callback, args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = this.getConf();

        // ==================================
        // Set the callback parameters
        conf.set("job.end.notification.url", "https://hadoopi.wordpress.com/api/hadoop/notification/$jobId?status=$jobStatus");
        conf.setInt("job.end.retry.attempts", 3);
        conf.setInt("job.end.retry.interval", 1000);
        // ==================================

        .../...

        // Submit your job in background
        job.submit();
    }

}


At job completion, an HTTP request will be sent to “job.end.notification.url” value. Can be retrieved from notification URL both the JOB_ID and JOB_STATUS.
Looking at Hadoop server side (see below logs from yarn), a notification SUCCEEDED has been sent every second, max 10 times before it officially failed (The URL I used here was obviously a fake one)


root@hadoopi:/usr/lib/hadoop/logs/userlogs/application_1379509275868_0002# find . -type f | xargs grep hadoopi
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:32,090 INFO [Thread-66] org.mortbay.log: Job end notification trying https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:32,864 WARN [Thread-66] org.mortbay.log: Job end notification to https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED failed with code: 404 and message "Not Found"
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:32,965 INFO [Thread-66] org.mortbay.log: Job end notification trying https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:33,871 WARN [Thread-66] org.mortbay.log: Job end notification to https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED failed with code: 404 and message "Not Found"
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:33,971 INFO [Thread-66] org.mortbay.log: Job end notification trying https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:34,804 WARN [Thread-66] org.mortbay.log: Job end notification to https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED failed with code: 404 and message "Not Found"
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:34,904 INFO [Thread-66] org.mortbay.log: Job end notification trying https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:35,584 WARN [Thread-66] org.mortbay.log: Job end notification to https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED failed with code: 404 and message "Not Found"
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:35,684 WARN [Thread-66] org.mortbay.log: Job end notification failed to notify : https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED


Note that the notification will be triggered for SUCCESS status but also for KILLED or FAILED statuses – that might be quite useful too.
Next is to implement a callback listener on client side…

Cheers,
Antoine

Hadoop: Implementing the Tool interface for MapReduce driver

Most of people usually create their MapReduce job using a driver code that is executed though its static main method. The downside of such implementation is that most of your specific configuration (if any) is usually hardcoded. Should you need to modify some of your configuration properties on the fly (such as changing the number of reducers), you would have to modify your code, rebuild your jar file and redeploy your application. This can be avoided by implementing the Tool interface in your MapReduce driver code.

Hadoop Configuration

By implementing the Tool interface and extending Configured class, you can easily set your hadoop Configuration object via the GenericOptionsParser, thus through the command line interface. This makes your code definitely more portable (and additionally slightly cleaner) as you do not need to hardcode any specific configuration anymore.

Let’s take a couple of example with and without the use of Tool interface.

Without Tool interface


public class ToolMapReduce {

	public static void main(String[] args) throws Exception {

		// Create configuration
		Configuration conf = new Configuration();

		// Create job
		Job job = new Job(conf, "Tool Job");
		job.setJarByClass(ToolMapReduce.class);

		// Setup MapReduce job
		job.setMapperClass(Mapper.class);
		job.setReducerClass(Reducer.class);

		// Set only 1 reduce task
		job.setNumReduceTasks(1);

		// Specify key / value
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);

		// Input
		FileInputFormat.addInputPath(job, new Path(args[0]));
		job.setInputFormatClass(TextInputFormat.class);

		// Output
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setOutputFormatClass(TextOutputFormat.class);

		// Execute job
		int code = job.waitForCompletion(true) ? 0 : 1;
		System.exit(code);
	}
}

Your MapReduce job will be executed as follows. You expect only 2 arguments here, inputPath and outputPath, located at respectively index [0] and [1] on your main method String array.

hadoop jar /path/to/My/jar.jar com.wordpress.hadoopi.ToolMapReduce /input/path /output/path

In that case, the number of reducers (1) is hardcoded (line #17) and therefore cannot be modified on demand.

With Tool interface

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ToolMapReduce extends Configured implements Tool {

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new ToolMapReduce(), args);
		System.exit(res);
	}

	@Override
	public int run(String[] args) throws Exception {

		// When implementing tool
		Configuration conf = this.getConf();

		// Create job
		Job job = new Job(conf, "Tool Job");
		job.setJarByClass(ToolMapReduce.class);

		// Setup MapReduce job
		// Do not specify the number of Reducer
		job.setMapperClass(Mapper.class);
		job.setReducerClass(Reducer.class);

		// Specify key / value
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);

		// Input
		FileInputFormat.addInputPath(job, new Path(args[0]));
		job.setInputFormatClass(TextInputFormat.class);

		// Output
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setOutputFormatClass(TextOutputFormat.class);

		// Execute job and return status
		return job.waitForCompletion(true) ? 0 : 1;
	}
}

ToolsRunner execute your MapReduce job through its static run method.
In this example we do not need to hardcode the number of reducers anymore as it can be specified directly from the CLI (using the “-D” option).

hadoop jar /path/to/My/jar.jar com.wordpress.hadoopi.ToolMapReduce -D mapred.reduce.tasks=1 /input/path /output/path

Note that you still have to supply inputPath and outputPath arguments. Basically GenericOptionParser will separate the generic Tools options from the actual job’s arguments. Whatever the number of generic options you might supply, inputPath and outputPath variables will be still located at index [0] and [1], but in your run method String array (not in your main method).

This -D option can be used for any “official” or custom property values.

conf.set("my.dummy.configuration","foobar");

becomes now…

-D my.dummy.configuration=foobar

HDFS and JobTracker properties

When I need to submit a jar file remotely to a distant hadoop server, I need to specify the below properties in my driver code.

Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "myserver.com:8021");
conf.set("fs.default.name", "hdfs://myserver.com:8020");

Using Tool interface, this is now out of the box as you can supply both -fs and -jt options from the CLI.

hadoop jar myjar.jar com.wordpress.hadoopi.ToolMapReduce -fs hdfs://myserver.com:8020 -jt myserver.com:8021

Thanks to this Tool implementation, my jar file is now 100% portable, and can be executed both locally or remotely without having to hardcode any specific value.

Generic options supported

Some additional useful options can be supplied from CLI.

-conf specify an application configuration file
-D use value for given property
-fs specify a namenode
-jt specify a job tracker
-files specify comma separated files to be copied to the map reduce cluster
-libjars specify comma separated jar files to include in the classpath.
-archives specify comma separated archives to be unarchived on the compute machines.

Should you need to add a specific library, archive file, etc… this Tool interface might be quite useful.
As you can see, it is maybe worth to implement this Tool interface in your driver code as it brings added value without any additional complexity.

Cheers!

Hadoop: Setup Maven project for MapReduce in 5mn

I am sure I am not the only one who ever struggled with Hadoop eclipse plugin installation. This plugin strongly depends on your environment (eclipse, ant, jdk) and hadoop distribution and version. Moreover, it only provides the Old API for MapReduce.
It is so simple to create a maven project for Hadoop that wasting time trying to build this plugin becomes totally useless. I am describing on this article how to setup a first maven hadoop project for Cloudera CDH4 on eclipse.

Prerequisite

maven 3
jdk 1.6
eclipse with m2eclipse plugin installed

Add Cloudera repository

Cloudera jar files are not available on default Maven central repository. You need to explicitly add cloudera repo in your settings.xml (under ${HOME}/.m2/settings.xml).

<?xml version="1.0" encoding="UTF-8"?>
<settings>
    <profiles>
        <profile>
            <id>standard-extra-repos</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <repositories>
                <repository>
                    <!-- Central Repository -->
                    <id>central</id>
                    <url>http://repo1.maven.org/maven2/</url>
                    <releases>
                        <enabled>true</enabled>
                    </releases>
                    <snapshots>
                        <enabled>true</enabled>
                    </snapshots>
                </repository>
                <repository>
                    <!-- Cloudera Repository -->
                    <id>cloudera</id>
                    <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
                    <releases>
                        <enabled>true</enabled>
                    </releases>
                    <snapshots>
                        <enabled>true</enabled>
                   </snapshots>
                </repository>
            </repositories>
        </profile>
    </profiles>
</settings>

Create Maven project

On eclipse, create a new Maven project as follow

maven

maven2

maven3

Add Hadoop Nature

For Cloudera distribution CDH4, open pom.xml file and add the following dependencies


	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>jdk.tools</groupId>
				<artifactId>jdk.tools</artifactId>
				<version>1.6</version>
			</dependency>
			<dependency>
				<groupId>org.apache.hadoop</groupId>
				<artifactId>hadoop-hdfs</artifactId>
				<version>2.0.0-cdh4.0.0</version>
			</dependency>
			<dependency>
				<groupId>org.apache.hadoop</groupId>
				<artifactId>hadoop-auth</artifactId>
				<version>2.0.0-cdh4.0.0</version>
			</dependency>
			<dependency>
				<groupId>org.apache.hadoop</groupId>
				<artifactId>hadoop-common</artifactId>
				<version>2.0.0-cdh4.0.0</version>
			</dependency>
			<dependency>
				<groupId>org.apache.hadoop</groupId>
				<artifactId>hadoop-core</artifactId>
				<version>2.0.0-mr1-cdh4.0.1</version>
			</dependency>
			<dependency>
				<groupId>junit</groupId>
				<artifactId>junit-dep</artifactId>
				<version>4.8.2</version>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-auth</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-core</artifactId>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.1</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
		</plugins>
	</build>

Download dependencies

Now that you have added your Cloudera repository and created your project, download dependencies. This can be easily done by right-clicking on your eclipse project, “update Maven dependencies”.
All these dependencies must have been added on your .m2 repository.

[developer@localhost ~]$ find .m2/repository/org/apache/hadoop -name "*.jar" 
.m2/repository/org/apache/hadoop/hadoop-tools/1.0.4/hadoop-tools-1.0.4.jar
.m2/repository/org/apache/hadoop/hadoop-common/2.0.0-cdh4.0.0/hadoop-common-2.0.0-cdh4.0.0-sources.jar
.m2/repository/org/apache/hadoop/hadoop-common/2.0.0-cdh4.0.0/hadoop-common-2.0.0-cdh4.0.0.jar
.m2/repository/org/apache/hadoop/hadoop-core/2.0.0-mr1-cdh4.0.1/hadoop-core-2.0.0-mr1-cdh4.0.1-sources.jar
.m2/repository/org/apache/hadoop/hadoop-core/2.0.0-mr1-cdh4.0.1/hadoop-core-2.0.0-mr1-cdh4.0.1.jar
.m2/repository/org/apache/hadoop/hadoop-hdfs/2.0.0-cdh4.0.0/hadoop-hdfs-2.0.0-cdh4.0.0.jar
.m2/repository/org/apache/hadoop/hadoop-streaming/1.0.4/hadoop-streaming-1.0.4.jar
.m2/repository/org/apache/hadoop/hadoop-auth/2.0.0-cdh4.0.0/hadoop-auth-2.0.0-cdh4.0.0.jar
[developer@localhost ~]$ 

Create WordCount example

Create your driver code

package com.aamend.hadoop.MapReduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {

		Path inputPath = new Path(args[0]);
		Path outputDir = new Path(args[1]);

		// Create configuration
		Configuration conf = new Configuration(true);

		// Create job
		Job job = new Job(conf, "WordCount");
		job.setJarByClass(WordCountMapper.class);

		// Setup MapReduce
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		job.setNumReduceTasks(1);

		// Specify key / value
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// Input
		FileInputFormat.addInputPath(job, inputPath);
		job.setInputFormatClass(TextInputFormat.class);

		// Output
		FileOutputFormat.setOutputPath(job, outputDir);
		job.setOutputFormatClass(TextOutputFormat.class);

		// Delete output if exists
		FileSystem hdfs = FileSystem.get(conf);
		if (hdfs.exists(outputDir))
			hdfs.delete(outputDir, true);

		// Execute job
		int code = job.waitForCompletion(true) ? 0 : 1;
		System.exit(code);

	}

}

Create Mapper class

package com.aamend.hadoop.MapReduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends
		Mapper<Object, Text, Text, IntWritable> {

	private final IntWritable ONE = new IntWritable(1);
	private Text word = new Text();

	public void map(Object key, Text value, Context context)
			throws IOException, InterruptedException {

		String[] csv = value.toString().split(",");
		for (String str : csv) {
			word.set(str);
			context.write(word, ONE);
		}
	}
}

Create your Reducer class

package com.aamend.hadoop.MapReduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
		Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text text, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable value : values) {
			sum += value.get();
		}
		context.write(text, new IntWritable(sum));
	}
}

Build project

Exporting jar file is actually out of the box using maven. Execute the following command

mvn clean install

You should see same output as below

.../...

[INFO] 
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ MapReduce ---
[INFO] Building jar: /home/developer/Workspace/hadoop/MapReduce/target/MapReduce-0.0.1-SNAPSHOT.jar
[INFO] 
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ MapReduce ---
[INFO] Installing /home/developer/Workspace/hadoop/MapReduce/target/MapReduce-0.0.1-SNAPSHOT.jar to /home/developer/.m2/repository/com/aamend/hadoop/MapReduce/0.0.1-SNAPSHOT/MapReduce-0.0.1-SNAPSHOT.jar
[INFO] Installing /home/developer/Workspace/hadoop/MapReduce/pom.xml to /home/developer/.m2/repository/com/aamend/hadoop/MapReduce/0.0.1-SNAPSHOT/MapReduce-0.0.1-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.159s
[INFO] Finished at: Sat May 25 00:35:56 GMT+02:00 2013
[INFO] Final Memory: 16M/212M
[INFO] ------------------------------------------------------------------------

And your jar file must be available on project’s target directory (additionally in your ${HOME}/.m2 local repository).

maven5

This jar is ready to be executed on your Hadoop environment.

hadoop jar MapReduce-0.0.1-SNAPSHOT.jar com.aamend.hadoop.MapReduce.WordCount input output

Each time I need to create a new Hadoop project, I simply copy pom.xml template described above, and that’s it..