Spark: Connect Tableau Desktop to SparkSQL

Last (but not least) post of 2014, and a new Hacking challenge. Based on the work I’ve done on SQLDeveloper (https://hadoopi.wordpress.com/2014/10/25/use-spark-sql-on-sql-developer/), I was wondering how to connect Tableau Desktop to my SparkSQL cluster.

Install Tableau Desktop

I’m quite new to Tableau, but it’s worth giving a try. However, spending $999 for a challenge isn’t worth it, but free edition offers a 14 days trial (this implementation took me a couple of days :)).

I’m using MacOS edition which – I believe – is not as stable as the Windows version (could explain some of the bugs / limitations mentioned hereafter). Don’t hesitate to let me know how it behaves on a Windows environment.

Anyway, installation is quite straightforward, and nothing to set on Tableau to make it work with SparkSQL. The main logic is on Spark itself. Simply make sure Hive is an available connection option on Tableau’s connection page (see below screenshot)

Screenshot 2014-12-31 17.10.28

Create a Hive Table

Spark SQL (although it works in standalone mode) is using Hive metastore. For that purpose, create a dummy Hive table and insert some data (alternatively, use Hive external table). The data set I’m using is the UFO survey available here that contains around 60’000 documented reports of unidentified flying object. My Hive table is as follows

col_name data_type comment 
-------------- --------- ------- 
date_observed string 
date_reported string 
location_city string 
location_state string 
shape string 
duration int

With the following first 10 lines:

1995/10/09 1995/10/09 Iowa City IA NA
1995/10/10 1995/10/11 Milwaukee WI 120
1995/01/01 1995/01/03 Shelton WA NA
1995/05/10 1995/05/10 Columbia MO 120
1995/06/11 1995/06/14 Seattle WA NA
1995/10/25 1995/10/24 Brunswick County ND 1800
1995/04/20 1995/04/19 Fargo ND 120
1995/09/11 1995/09/11 Las Vegas NV NA
1995/01/15 1995/02/14 Morton WA NA
1995/09/15 1995/09/15 Redmond WA 360

Getting this exact table required a pre-processing of the original CSV, but out of scope for this article. Tableau will work with any kind of flat structure such as CSV.

Start Spark Thrift interface

Assuming Hive metastore is up and running, you need to copy Hive config files to Spark configuration directory.

sudo cp /etc/hive/hive-site.xml /usr/lib/spark/conf/

Now make sure Spark can access your Hive tables (not that your spark distribution should have been built with Hive support enabled)

antoine@dataphi:~ $ spark-sql --master local
Spark assembly has been built with Hive, including Datanucleus jars on classpath
spark-sql> show tables;
ufo
ufo_date
ufo_tmp
Time taken: 1.563 seconds
spark-sql>

As Spark can connect Hive metastore, let’s start Spark Thrift service as follows

/usr/lib/spark/sbin/start-thriftserver.sh --master ${MASTER}

I’m using MASTER=spark://localhost:7077, but it should also work on yarn-client. Once done, make sure a Spark job is running (from Spark UI).

Screenshot 2014-12-31 17.26.25

 

Note that Spark is using same port as HiveServer2. Should you have any issue with HiveServer running on the same box, change spark port with option : –hiveconf hive.server2.thrift.port=10001

At this stage, a SQL client with the right hive driver set should be able to connect your SparkSQL through the Spark Thrift Service you’ve just initiated

Install Hive ODBC package

On my MacOS, I get the ODBC packages from Hortonworks website. Demo effect, the download page is currently not available, but trust me, I made it work using package available there. I could delay this post by a few days to give you the exact URL, but I’m quite in a rush if I want to publish this one still in 2014 🙂

Have fun with Tableau

Now that your environment is ready, let’s get into Tableau Desktop. The connection to use is the “Hive Connection for Hortonworks”.  Simply add host / port of your thrift interface, and select HiveServer2. Authentication on Spark is disabled by default. However, I’m adding username only to make sure Tableau will act on behalf of my user in regards to my Hive connection.

Screenshot 2014-12-31 17.38.50

Simply click on connect, and assuming you did it right (mainly that you’ve found the right driver), you should get access to your Hive databases / tables.

Screenshot 2014-12-31 17.39.19

Go to your worksheet, and start playing with visualisations. Note how fast it is compare to a vanilla Hive connection when playing with numbers / String (see below errors with dates). Here is an example of UFO sightings in the US over the past 30 years.

Screenshot 2014-12-31 17.46.44

 

Limitations

The main limitation I’ve found out are about the date format. I’m getting below errors anytime I’m playing with date hierarchy (year, quarter, month, etc…), and I’m note brave enough to investigate it any further today.

Screenshot 2014-12-31 17.58.21

I made it work, great, but it is far not stable enough to be considered as a supported Tableau connection. However, the official connection will be released soon (http://www.tableausoftware.com/about/blog/2014/10/tableau-spark-sql-big-data-just-got-even-more-supercharged-33799), and I’ve requested a Beta program access to Tableau.

So far, getting a awesome tool like Tableau on a great big data stack sounds truly exiting, so once again, 2015 will be SPARKling!

With you all the best for the new year!

Antoine

 

 

Spark: Use Spark-SQL on SQL Developer

Screen Shot 2014-10-25 at 14.29.50

I’m describing here how I set SQL Developer to connect / query my Spark cluster. I made it work on my local environment below:

  • Ubuntu precise 64 bits (1 master, 2 slaves)
  • Hadoop Hortonworks 2.4.0.2.1.5.0-695
  • Hive version 0.13.0.2.1.5.0-695, metastore hosted on MySQL database
  • Spark 1.1.0 prebuilt for Hadoop 2.4
  • SQL Developer 4.0.3.16

Note that I’ve successfully tested same setup on a 20 nodes cluster on AWS (EMR)

Preparing your Spark environment

First of all, you will have to make sure Spark SQL is able to connect Hive metastore. Simply copy hive-site.xml file in Spark conf directory

sudo cp /etc/hive/conf/hive-site.xml /usr/lib/spark/conf/

Open a hive shell, and create a dummy table

hive> CREATE TABLE dummy (foo STRING, bar STRING);
OK
Time taken: 2.555 seconds

Now open a spark-sql shell, and make sure you can find the newly created table. Note that –master local can be used here since we’re not running any job.

vagrant@vagrant:~$ /usr/lib/spark/bin/spark-sql --master local
../..
spark-sql> show tables;
../..
dummy
../..
Time taken: 3.452 seconds
../..

Great, looks like Spark-SQL is now connected to Hive metastore…

Install Hive JDBC drivers on SQL Developer

oraSqlD01

SQL Developer is quite a nice tool – at least for Oracle databases. In order to make it work with Hive / Spark, we need to download the Hive connectors first. I found them on Cloudera website. Note that I was not able to find same from Hortonworks. This could be an issue if I was using Hive natively, but it worth giving a try on Spark first.

Unzip the downloaded archive (I’m using JDBC4 package), and open SQLDeveloper.
You will have to add third-part connectors from preferences -> database -> Third party JDBC driver. Add all the jars included in your archive

~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/libthrift-0.9.0.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/HiveJDBC4.jar                                           
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/log4j-1.2.14.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/TCLIServiceClient.jar                                   
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/ql.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/hive_metastore.jar                                      
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/slf4j-api-1.5.8.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/hive_service.jar                                        
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/slf4j-log4j12-1.5.8.jar
~/Downloads/Cloudera_HiveJDBC_2.5.4.1006/Cloudera_HiveJDBC4_2.5.4.1006/libfb303-0.9.0.jar

Once added, restart SQL Developer, and ohhh, awesome, a new tab “Hive” magically appeared 🙂

Screen Shot 2014-10-25 at 15.07.30

Open this tab and create a new connection to your HiveServer2 as follows

Screen Shot 2014-10-25 at 14.22.29

And test connection ! Assuming HiveServer2 / Hive Metastore is running, your connection should be established and a new Hive Worksheet should pop up. At this stage, connection From SQL Developer to Hive is up, but SQL are executed on Hive, not Spark !

Screen Shot 2014-10-25 at 15.15.43

Now we need to switch back to Spark !

The goal is now to use Spark thrift interface in lieu of vanilla HiveServer2. Stop HiveServer2 daemon, and start Spark thrift interface as follows..

vagrant@vagrant:~/usr/lib/spark/sbin/start-thriftserver.sh --master yarn-client
Spark assembly has been built with Hive, including Datanucleus jars on classpath
../..
14/10/25 15:11:49 INFO service.AbstractService: Service:OperationManager is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service: SessionManager is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service: CLIService is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service:ThriftBinaryCLIService is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service: HiveServer2 is inited.
14/10/25 15:11:49 INFO service.AbstractService: Service:OperationManager is started.
14/10/25 15:11:49 INFO service.AbstractService: Service:SessionManager is started.
14/10/25 15:11:49 INFO service.AbstractService: Service:CLIService is started.
14/10/25 15:11:49 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.56.101:9083
14/10/25 15:11:49 INFO hive.metastore: Waiting 1 seconds before next connection attempt.
14/10/25 15:11:50 INFO hive.metastore: Connected to metastore.
14/10/25 15:11:50 INFO service.AbstractService: Service:ThriftBinaryCLIService is started.
14/10/25 15:11:50 INFO service.AbstractService: Service:HiveServer2 is started.
14/10/25 15:11:50 INFO thriftserver.HiveThriftServer2: HiveThriftServer2 started
14/10/25 15:11:50 INFO thrift.ThriftCLIService: ThriftBinaryCLIService listening on 0.0.0.0/0.0.0.0:10000
../..

Looking at application master log file, Spark is actually embedding its own HiveServer2 into a Spark job. Connection will be apparently dropped if Spark job ends up, so make sure Spark job is not killed !

I’m using yarn-client to handle my JDBC connections, but this should support spark standalone clusters as well

Now get back to your SQL Developer, and restore connection to Hive (HiveServer is now embedded into a Spark job), and you’re now connected to Spark !

And voila !

For testing purpose, I’ve downloaded some ngrams data from S3 (link), and created a new Table ngrams on Hive. I’m now able to get lightning fast SQL queries on Spark from a user friendly environment (SQL Developer).

Screen Shot 2014-10-25 at 14.21.35

As initially said, this seems quite hacky (Remember I used Cloudera JDBC on Hortonworks environment), but it seems to work, and it seems to be really fast ! Please let me know the environment / distribution you’re using if you’re facing any issue..

Cheers!
Antoine

Spark / Hadoop: Processing GDELT data using Hadoop InputFormat and SparkSQL

Screen Shot 2014-09-24 at 20.55.34

GDELT

A quick overview of GDELT public data set:
GDELT Project monitors the world’s broadcast, print, and web news from nearly every corner of every country in over 100 languages and identifies the people, locations, organisations, counts, themes, sources, and events driving our global society every second of every day, creating a free open platform for computing on the entire world

Such a dataset is a perfect match for machine learning, and an excellent use case for getting started with Spark and Hadoop Input Format. I won’t report here the data format specification, but have a look there. For this exercise I’m using “GDELT Event Database” data set only that fits into simple JSON events.

The goal of this exercise is to do the following:

  • Create an Hadoop GDELT -> Json InputFormat
  • Read data from Spark shell
  • Convert Json object as Spark SQL table
  • Query table and extract result (Goldstein Scale)

Well, quite a challenge indeed… However, if you’re not visiting my blog for the first time, the concept of InputFormat should be familiar. If not, have a look at my previous posts. Only Spark (and Spark-SQL) is a new concept here.

Create a GDELT -> Json Parser

GDELT Data are tab separated values. We can easily parse data and store each key value pair as a JSON element. However, I want to get nested entities (such as eventGeo, actor1Geo, actor2Geo, actor1Codes, actor2Codes) as their keys will always be the same. My final JSON structure looks like the following:

{
    "isRoot": true,
    "eventGeo": {
        "countryCode": "US",
        .../...
    },
    "actor1Code": {
        "code": "BUS",
        .../...
    },
    "actor2Code": {
        "code": "BUS",
        .../...
    },
    "actor1Geo": {
        "countryCode": "US",
        .../...
    },
    "actor2Geo": {
        "countryCode": "US",
        .../...
    },
    "goldsteinScale": 6,
    "quadClass": 2,
    "month": 201409,
    "GlobalEventID": 313271341,
    .../...
}

To get from a bunch of raw file to structured Json, I’m using nothing more than a standard InputFormat with standard RecordReader. Whenever a new line is consumed, I send it to my Json parser and output the result to the hadoop framework.

        while (pos < end) {

            // Let Hadoop LineReader consume the line into tmp text...
            newSize = in.readLine(tmp, 
                    maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 
                            maxLineLength));

            // ... Take the result and feed my parser
            // tmp is the raw data
            // value will be the expected JSON object
            value.set(parse(tmp.toString()));
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            if (newSize < maxLineLength) {
                break;
            }

Have a look at com.aamend.hadoop.format.GdeltRecordReader class on my Github account. Before you might say anything, note that performance is out of the scope of that study 🙂 . I’m basically using google json-simple as a third part dependency (see below maven deps). Raw files being stored on HDFS, any event read through my GdeltInputFormat.class will be actually seen as a JSON object.

<dependency>
  <groupId>com.googlecode.json-simple</groupId>
  <artifactId>json-simple</artifactId>
  <version>1.1</version>
</dependency>

Do not forget to add these dependencies to your Spark job! I’m building a fat jar for that purpose (refer to previous post).

Read data from spark shell

Assuming you get a spark distribution up and running, simply start a shell session as follows. Note the “–jars” options that will add all your dependencies to your classpath.

spark-shell --jars spark-1.0-SNAPSHOT-jar-with-dependencies.jar

Let’s get deeper into that spark shell session. I’m using newAPIHadoopFile from the sparkContext in order to read Hadoop file with a custom InputFormat. I need to supply here the input path and the InputFormat to be used. I also need to provide spark context with the expected Key / Value class of my InputFormat implementation.

import com.aamend.hadoop.format.GdeltInputFormat
import org.apache.hadoop.io.Text

// Read file from HDFS - Use GdeltInputFormat
val input = sc.newAPIHadoopFile(
   "hdfs://path/to/gdelt",
   classOf[GdeltInputFormat],
   classOf[Text],
   classOf[Text]
)

We now have a HadoopRDD (containing key / value pairs), but still not the bunch of String (json) we expected. Conversion is done as follows

  // Get only the values from Hadoop Key / Values pair
  val values = input.map{ case (k,v) => v}

  // Need to deserialize Text
  val json = values.map(t => t.toString)

OK, we’re good to go. Spark is able to read our Json GDelt files. Not convinced ? Print out the first 3 lines as follows

  values.take(3).foreach(println)

Convert Json object as Spark SQL table

Spark SQL can be used through the Spark SQLContext. Although it does not physically create any Table, it maps the supplied RDDs into a SchemaRDD that can be queried from SQL.

  import org.apache.spark.sql.SQLContext
  
  // Access spark SQL context
  val sqlContext = new SQLContext(sc)

  // Load JSON (String) as a SQL JsonRDD
  val jsonRDD = sqlContext.jsonRDD(json)

We can make sure the extracted data structure is correct using below command. Quite handy, isn’t it ? Fields will be treated as String, Integer or Double, etc.. depending on their Json value.

  jsonRDD.printSchema()
root
 |-- GlobalEventID: integer (nullable = true)
 |-- actor1Code: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- ethnicCode: string (nullable = true)
 |    |-- knownGroupCode: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- religion1Code: string (nullable = true)
 |    |-- religion2Code: string (nullable = true)
 |    |-- type1Code: string (nullable = true)
 |    |-- type2Code: string (nullable = true)
 |    |-- type3Code: string (nullable = true)
 |-- actor1Geo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- actor2Code: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- ethnicCode: string (nullable = true)
 |    |-- knownGroupCode: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- religion1Code: string (nullable = true)
 |    |-- religion2Code: string (nullable = true)
 |    |-- type1Code: string (nullable = true)
 |    |-- type2Code: string (nullable = true)
 |    |-- type3Code: string (nullable = true)
 |-- actor2Geo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- avgTone: double (nullable = true)
 |-- dateAdded: string (nullable = true)
 |-- day: string (nullable = true)
 |-- eventBaseCode: string (nullable = true)
 |-- eventCode: string (nullable = true)
 |-- eventGeo: struct (nullable = true)
 |    |-- adm1Code: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- featureId: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: integer (nullable = true)
 |-- eventRootCode: string (nullable = true)
 |-- fracDate: double (nullable = true)
 |-- goldsteinScale: double (nullable = true)
 |-- isRoot: boolean (nullable = true)
 |-- month: integer (nullable = true)
 |-- numArticles: integer (nullable = true)
 |-- numMentions: integer (nullable = true)
 |-- numSources: integer (nullable = true)
 |-- quadClass: integer (nullable = true)
 |-- sourceUrl: string (nullable = true)
 |-- year: integer (nullable = true)

Assuming the data structure is now correct, we register this SchemaRDD as a Spark table. Let’s now play with SQL !

  jsonRDD.registerAsTable("gdelt")

Query table and extract result (Goldstein Scale)

Definition of Goldstein scale taken from GDELT website: “Each CAMEO event code is assigned a numeric score from -10 to +10, capturing the theoretical potential impact that type of event will have on the stability of a country“. Ranks can be found there.

We will try to get an average Goldstein scale for any article involving France as primary actor and United Kingdom as secondary one. Is France willing to declare war to UK based on the past few days events ? Hopefully not, but let’s get some proofs 🙂

  val relations = sqlContext.sql(
    "SELECT day, AVG(goldsteinScale) " +
      "FROM gdelt WHERE " +
      "actor1Geo.countryCode = 'FR' AND actor2Geo.countryCode = 'UK' " +
      "GROUP BY day")

Note the nestedEntity query (actor2Geo.countryCode) used because of my Json structure. Such nested structure makes my SQL even more readable

We finally save our final RDD as a text file(s) in HDFS, … and …

  relations.map(row => row(0) + "\t" + row(1)).saveAsTextFile("hdfs://output/path")

… And plot data into a Graph

Screen Shot 2014-09-24 at 23.51.55

Well, It looks like I can stay in the UK for a while 🙂
Anyway, I’ve learned a lot for this exercise, so I hope you did enjoy reading too. Please find my code on Github / Spark project (Github), and should you have any question, feel free to ask.

Cheers!
Antoine

Use case: Ukraine Russia relations over the past 30 years

Using such a record reader, you can build the following graph. I’ve reported the Ukraine – Russia diplomatic relations for the past 30 years.
Even better, you can smooth such a goldstein scale by using a moving average (5,10 and 30 days in below example). Such moving average has been built on scala / spark (see : MovingAverageGoldstein.scala)

MovingAverageGoldstein

Enjoy !