Spark / Hadoop: Processing GDELT data using Hadoop InputFormat and SparkSQL

Screen Shot 2014-09-24 at 20.55.34

GDELT

A quick overview of GDELT public data set:
GDELT Project monitors the world’s broadcast, print, and web news from nearly every corner of every country in over 100 languages and identifies the people, locations, organisations, counts, themes, sources, and events driving our global society every second of every day, creating a free open platform for computing on the entire world

Such a dataset is a perfect match for machine learning, and an excellent use case for getting started with Spark and Hadoop Input Format. I won’t report here the data format specification, but have a look there. For this exercise I’m using “GDELT Event Database” data set only that fits into simple JSON events.

The goal of this exercise is to do the following:

  • Create an Hadoop GDELT -> Json InputFormat
  • Read data from Spark shell
  • Convert Json object as Spark SQL table
  • Query table and extract result (Goldstein Scale)

Well, quite a challenge indeed… However, if you’re not visiting my blog for the first time, the concept of InputFormat should be familiar. If not, have a look at my previous posts. Only Spark (and Spark-SQL) is a new concept here.

Create a GDELT -> Json Parser

GDELT Data are tab separated values. We can easily parse data and store each key value pair as a JSON element. However, I want to get nested entities (such as eventGeo, actor1Geo, actor2Geo, actor1Codes, actor2Codes) as their keys will always be the same. My final JSON structure looks like the following:

{
    "isRoot": true,
    "eventGeo": {
        "countryCode": "US",
        .../...
    },
    "actor1Code": {
        "code": "BUS",
        .../...
    },
    "actor2Code": {
        "code": "BUS",
        .../...
    },
    "actor1Geo": {
        "countryCode": "US",
        .../...
    },
    "actor2Geo": {
        "countryCode": "US",
        .../...
    },
    "goldsteinScale": 6,
    "quadClass": 2,
    "month": 201409,
    "GlobalEventID": 313271341,
    .../...
}

To get from a bunch of raw file to structured Json, I’m using nothing more than a standard InputFormat with standard RecordReader. Whenever a new line is consumed, I send it to my Json parser and output the result to the hadoop framework.

        while (pos < end) {

            // Let Hadoop LineReader consume the line into tmp text...
            newSize = in.readLine(tmp, 
                    maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 
                            maxLineLength));

            // ... Take the result and feed my parser
            // tmp is the raw data
            // value will be the expected JSON object
            value.set(parse(tmp.toString()));
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            if (newSize < maxLineLength) {
                break;
            }

Have a look at com.aamend.hadoop.format.GdeltRecordReader class on my Github account. Before you might say anything, note that performance is out of the scope of that study 🙂 . I’m basically using google json-simple as a third part dependency (see below maven deps). Raw files being stored on HDFS, any event read through my GdeltInputFormat.class will be actually seen as a JSON object.

<dependency>
  <groupId>com.googlecode.json-simple</groupId>
  <artifactId>json-simple</artifactId>
  <version>1.1</version>
</dependency>

Do not forget to add these dependencies to your Spark job! I’m building a fat jar for that purpose (refer to previous post).

Read data from spark shell

Assuming you get a spark distribution up and running, simply start a shell session as follows. Note the “–jars” options that will add all your dependencies to your classpath.

spark-shell --jars spark-1.0-SNAPSHOT-jar-with-dependencies.jar

Let’s get deeper into that spark shell session. I’m using newAPIHadoopFile from the sparkContext in order to read Hadoop file with a custom InputFormat. I need to supply here the input path and the InputFormat to be used. I also need to provide spark context with the expected Key / Value class of my InputFormat implementation.

import com.aamend.hadoop.format.GdeltInputFormat
import org.apache.hadoop.io.Text

// Read file from HDFS - Use GdeltInputFormat
val input = sc.newAPIHadoopFile(
   "hdfs://path/to/gdelt",
   classOf[GdeltInputFormat],
   classOf[Text],
   classOf[Text]
)

We now have a HadoopRDD (containing key / value pairs), but still not the bunch of String (json) we expected. Conversion is done as follows

  // Get only the values from Hadoop Key / Values pair
  val values = input.map{ case (k,v) => v}

  // Need to deserialize Text
  val json = values.map(t => t.toString)

OK, we’re good to go. Spark is able to read our Json GDelt files. Not convinced ? Print out the first 3 lines as follows

  values.take(3).foreach(println)

Convert Json object as Spark SQL table

Spark SQL can be used through the Spark SQLContext. Although it does not physically create any Table, it maps the supplied RDDs into a SchemaRDD that can be queried from SQL.

  import org.apache.spark.sql.SQLContext
  
  // Access spark SQL context
  val sqlContext = new SQLContext(sc)

  // Load JSON (String) as a SQL JsonRDD
  val jsonRDD = sqlContext.jsonRDD(json)

We can make sure the extracted data structure is correct using below command. Quite handy, isn’t it ? Fields will be treated as String, Integer or Double, etc.. depending on their Json value.

  jsonRDD.printSchema()
root
 |-- GlobalEventID: integer (nullable = true)
 |-- actor1Code: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- ethnicCode: string (nullable = true)
 |    |-- knownGroupCode: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- religion1Code: string (nullable = true)
 |    |-- religion2Code: string (nullable = true)
 |    |-- type1Code: string (nullable = true)
 |    |-- type2Code: string (nullable = true)
 |    |-- type3Code: string (nullable = true)
 |-- actor1Geo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- actor2Code: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- ethnicCode: string (nullable = true)
 |    |-- knownGroupCode: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- religion1Code: string (nullable = true)
 |    |-- religion2Code: string (nullable = true)
 |    |-- type1Code: string (nullable = true)
 |    |-- type2Code: string (nullable = true)
 |    |-- type3Code: string (nullable = true)
 |-- actor2Geo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- avgTone: double (nullable = true)
 |-- dateAdded: string (nullable = true)
 |-- day: string (nullable = true)
 |-- eventBaseCode: string (nullable = true)
 |-- eventCode: string (nullable = true)
 |-- eventGeo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- eventRootCode: string (nullable = true)
 |-- fracDate: double (nullable = true)
 |-- goldsteinScale: double (nullable = true)
 |-- isRoot: boolean (nullable = true)
 |-- month: integer (nullable = true)
 |-- numArticles: integer (nullable = true)
 |-- numMentions: integer (nullable = true)
 |-- numSources: integer (nullable = true)
 |-- quadClass: integer (nullable = true)
 |-- sourceUrl: string (nullable = true)
 |-- year: integer (nullable = true)

Assuming the data structure is now correct, we register this SchemaRDD as a Spark table. Let’s now play with SQL !

  jsonRDD.registerAsTable("gdelt")

Query table and extract result (Goldstein Scale)

Definition of Goldstein scale taken from GDELT website: “Each CAMEO event code is assigned a numeric score from -10 to +10, capturing the theoretical potential impact that type of event will have on the stability of a country“. Ranks can be found there.

We will try to get an average Goldstein scale for any article involving France as primary actor and United Kingdom as secondary one. Is France willing to declare war to UK based on the past few days events ? Hopefully not, but let’s get some proofs 🙂

  val relations = sqlContext.sql(
    "SELECT day, AVG(goldsteinScale) " +
      "FROM gdelt WHERE " +
      "actor1Geo.countryCode = 'FR' AND actor2Geo.countryCode = 'UK' " +
      "GROUP BY day")

Note the nestedEntity query (actor2Geo.countryCode) used because of my Json structure. Such nested structure makes my SQL even more readable

We finally save our final RDD as a text file(s) in HDFS, … and …

  relations.map(row => row(0) + "\t" + row(1)).saveAsTextFile("hdfs://output/path")

… And plot data into a Graph

Screen Shot 2014-09-24 at 23.51.55

Well, It looks like I can stay in the UK for a while 🙂
Anyway, I’ve learned a lot for this exercise, so I hope you did enjoy reading too. Please find my code on Github / Spark project (Github), and should you have any question, feel free to ask.

Cheers!
Antoine

Use case: Ukraine Russia relations over the past 30 years

Using such a record reader, you can build the following graph. I’ve reported the Ukraine – Russia diplomatic relations for the past 30 years.
Even better, you can smooth such a goldstein scale by using a moving average (5,10 and 30 days in below example). Such moving average has been built on scala / spark (see : MovingAverageGoldstein.scala)

MovingAverageGoldstein

Enjoy !

Spark: Create a simple Spark job

MapReduce is dead, long live Spark !

Following big data new trends, the logical next step for me is to start getting my head around apache Spark. Yarn is yet another resource negotiator indeed, but definitely not yet another big data application… Although you can still execute MapReduce on Yarn, writing a Yarn application is such a pain ! Have a look at below article. I reached out the same conclusion after yet another headache !

http://www.edwardcapriolo.com/roller/edwardcapriolo/entry/yarn_either_it_is_really

Spark runs on top of Yarn, but it does not require writing a new Client, ApplicationManager or Container classes. Anyway, let’s jump to Apache Spark and Scala. I’m describing here how to quickly install Spark / Scala on a Yarn cluster and to submit a Scala object that runs on Spark client.

Install Spark client / Scala dependencies

Install Scala:

wget http://www.scala-lang.org/files/archive/scala-2.10.1.tgz
tar xvf scala-2.10.1.tgz
mv scala-2.10.1 /usr/local/lib/
ln -s /usr/local/lib/scala-2.10.1/ /usr/local/lib/scala

Install Spark

Download a pre-built spark distribution matching your hadoop distro. Alternatively, build your own from source.

wget http://www.apache.org/dyn/closer.cgi/spark/spark-1.1.0/spark-1.1.0-bin-hadoop2.4.tgz
tar xvf spark-1.1.0-bin-hadoop2.4.tgz
mv spark-1.1.0-bin-hadoop2.4 /usr/local/lib/
ln -s /usr/local/lib/spark-1.1.0-bin-hadoop2.4 /usr/local/lib/spark

Update environment

JAVA_HOME=/usr/lib/jvm/j2sdk1.6-oracle
HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
YARN_HOME=/usr/lib/hadoop-yarn
HADOOP_HDFS_HOME=/usr/lib/hadoop-hdfs
HADOOP_COMMON_HOME=/usr/lib/hadoop
HADOOP_CONF_DIR=/etc/hadoop/conf
YARN_CONF_DIR=/etc/hadoop/conf
ZOO_LOG_DIR=/var/log/zookeeper
SCALA_HOME=/usr/lib/scala

Create a maven Spark application

Here are the maven dependencies I’m using. Simply replace hadoop.version with the right hadoop distribution / version. Remember to add the right maven repository to your settings.xml / artifactory / nexus should you want to use non apache distributions (such as Hortonworks, Cloudera)

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>

Since this project will be scala based, we need to tell Maven to compile Scala objects. This can be done using below plugin

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Project structure will be the following

-src
-- main
--- scala
---- com
----- wordpress
------ hadoopi
------- spark
-------- MyFirstJob.scala
-- test
- target

My Simple Spark job is as follows:

package com.wordpress.hadoopi.spark
import org.apache.spark.{SparkConf, SparkContext}

object SparkGrepScala extends App {

  val sc = new SparkContext(new SparkConf().setAppName("Grep"))

  // Retrieve the key words to look for, comma separated values
  val keyWords = args(2).split(",")

  // Open file(s) in that HDFS directory
  val input = sc.textFile(args(0))

  // Filter lines containing at least one of these keywords
  val filteredLines = input.filter(line => keyWords.exists( word => line.contains(word)))

  // Output matching line to HDFS
  filteredLines.saveAsTextFile(args(1))
}

What this simple app does is to retrieve command line arguments (keywords), read a set of input files from HDFS, filter out any line matching these comma separated keywords, and output the line back to HDFS.

Execution

Package this project as mvn clean package and upload it to your spark node (Hadoop node with Spark client installed)

Input data:

Create a new HDFS directory and put hadoop configuration for testing purpose

hadoop fs -mkdir input
hadoop fs -put /etc/hadoop/conf/*.xml input

Execute spark job using spark-submit

/usr/lib/spark/bin/spark-submit --class com.wordpress.hadoopi.spark.SparkGrepScala spark-1.0-SNAPSHOT.jar input output dfs,yarn

This will filter out any line matching either dfs or yarn keyword. Notice how fast spark is compare to MapReduce framework (Even with a tiny dataset).

Output is as follows

hadoop fs -cat output/*

    <name>yarn.scheduler.capacity.maximum-applications</name>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
    <name>yarn.scheduler.capacity.root.queues</name>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <name>yarn.scheduler.capacity.root.default.state</name>
    <name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
    <name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
    <name>yarn.scheduler.capacity.node-locality-delay</name>
    <value>hdfs://vagrant:8020</value>
    <name>dfs.namenode.name.dir</name>
    <value>/hadoop/dfs/nn</value>
    <name>dfs.datanode.data.dir</name>
    <value>/hadoop/dfs/dn</value>
    <name>dfs.webhdfs.enabled</name>
    <name>dfs.replication</name>
    <value>yarn</value>
    <name>yarn.app.mapreduce.am.staging-dir</name>
    <name>yarn.resourcemanager.resource-tracker.address</name>
    <name>yarn.web-proxy.address</name>
    <name>yarn.resourcemanager.address</name>
    <name>yarn.resourcemanager.scheduler.address</name>
    <name>yarn.resourcemanager.admin.address</name>
    <name>yarn.resourcemanager.webapp.address</name>
    <name>yarn.nodemanager.aux-services</name>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <name>yarn.log-aggregation-enable</name>
    <name>yarn.nodemanager.local-dirs</name>
    <value>/var/lib/hadoop-yarn/cache/${user.name}/nm-local-dir</value>
    <name>yarn.nodemanager.log-dirs</name>
    <value>/var/log/hadoop-yarn/containers</value>
    <name>yarn.nodemanager.remote-app-log-dir</name>
    <value>/var/log/hadoop-yarn/apps</value>
    <name>yarn.application.classpath</name>

And voila

It’s fast, simple to install, and it appears to work 🙂 Now, all I have to do is to learn Scala in order to be as efficient as I used to be on standard MapReduce !
Great. So if you followed this article, you created a first Scala project that runs on Spark (command line interface).

Cheers,
Antoine