Spark: Create a simple Spark job

MapReduce is dead, long live Spark !

Following big data new trends, the logical next step for me is to start getting my head around apache Spark. Yarn is yet another resource negotiator indeed, but definitely not yet another big data application… Although you can still execute MapReduce on Yarn, writing a Yarn application is such a pain ! Have a look at below article. I reached out the same conclusion after yet another headache !

http://www.edwardcapriolo.com/roller/edwardcapriolo/entry/yarn_either_it_is_really

Spark runs on top of Yarn, but it does not require writing a new Client, ApplicationManager or Container classes. Anyway, let’s jump to Apache Spark and Scala. I’m describing here how to quickly install Spark / Scala on a Yarn cluster and to submit a Scala object that runs on Spark client.

Install Spark client / Scala dependencies

Install Scala:

wget http://www.scala-lang.org/files/archive/scala-2.10.1.tgz
tar xvf scala-2.10.1.tgz
mv scala-2.10.1 /usr/local/lib/
ln -s /usr/local/lib/scala-2.10.1/ /usr/local/lib/scala

Install Spark

Download a pre-built spark distribution matching your hadoop distro. Alternatively, build your own from source.

wget http://www.apache.org/dyn/closer.cgi/spark/spark-1.1.0/spark-1.1.0-bin-hadoop2.4.tgz
tar xvf spark-1.1.0-bin-hadoop2.4.tgz
mv spark-1.1.0-bin-hadoop2.4 /usr/local/lib/
ln -s /usr/local/lib/spark-1.1.0-bin-hadoop2.4 /usr/local/lib/spark

Update environment

JAVA_HOME=/usr/lib/jvm/j2sdk1.6-oracle
HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
YARN_HOME=/usr/lib/hadoop-yarn
HADOOP_HDFS_HOME=/usr/lib/hadoop-hdfs
HADOOP_COMMON_HOME=/usr/lib/hadoop
HADOOP_CONF_DIR=/etc/hadoop/conf
YARN_CONF_DIR=/etc/hadoop/conf
ZOO_LOG_DIR=/var/log/zookeeper
SCALA_HOME=/usr/lib/scala

Create a maven Spark application

Here are the maven dependencies I’m using. Simply replace hadoop.version with the right hadoop distribution / version. Remember to add the right maven repository to your settings.xml / artifactory / nexus should you want to use non apache distributions (such as Hortonworks, Cloudera)

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>

Since this project will be scala based, we need to tell Maven to compile Scala objects. This can be done using below plugin

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Project structure will be the following

-src
-- main
--- scala
---- com
----- wordpress
------ hadoopi
------- spark
-------- MyFirstJob.scala
-- test
- target

My Simple Spark job is as follows:

package com.wordpress.hadoopi.spark
import org.apache.spark.{SparkConf, SparkContext}

object SparkGrepScala extends App {

  val sc = new SparkContext(new SparkConf().setAppName("Grep"))

  // Retrieve the key words to look for, comma separated values
  val keyWords = args(2).split(",")

  // Open file(s) in that HDFS directory
  val input = sc.textFile(args(0))

  // Filter lines containing at least one of these keywords
  val filteredLines = input.filter(line => keyWords.exists( word => line.contains(word)))

  // Output matching line to HDFS
  filteredLines.saveAsTextFile(args(1))
}

What this simple app does is to retrieve command line arguments (keywords), read a set of input files from HDFS, filter out any line matching these comma separated keywords, and output the line back to HDFS.

Execution

Package this project as mvn clean package and upload it to your spark node (Hadoop node with Spark client installed)

Input data:

Create a new HDFS directory and put hadoop configuration for testing purpose

hadoop fs -mkdir input
hadoop fs -put /etc/hadoop/conf/*.xml input

Execute spark job using spark-submit

/usr/lib/spark/bin/spark-submit --class com.wordpress.hadoopi.spark.SparkGrepScala spark-1.0-SNAPSHOT.jar input output dfs,yarn

This will filter out any line matching either dfs or yarn keyword. Notice how fast spark is compare to MapReduce framework (Even with a tiny dataset).

Output is as follows

hadoop fs -cat output/*

    <name>yarn.scheduler.capacity.maximum-applications</name>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
    <name>yarn.scheduler.capacity.root.queues</name>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <name>yarn.scheduler.capacity.root.default.state</name>
    <name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
    <name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
    <name>yarn.scheduler.capacity.node-locality-delay</name>
    <value>hdfs://vagrant:8020</value>
    <name>dfs.namenode.name.dir</name>
    <value>/hadoop/dfs/nn</value>
    <name>dfs.datanode.data.dir</name>
    <value>/hadoop/dfs/dn</value>
    <name>dfs.webhdfs.enabled</name>
    <name>dfs.replication</name>
    <value>yarn</value>
    <name>yarn.app.mapreduce.am.staging-dir</name>
    <name>yarn.resourcemanager.resource-tracker.address</name>
    <name>yarn.web-proxy.address</name>
    <name>yarn.resourcemanager.address</name>
    <name>yarn.resourcemanager.scheduler.address</name>
    <name>yarn.resourcemanager.admin.address</name>
    <name>yarn.resourcemanager.webapp.address</name>
    <name>yarn.nodemanager.aux-services</name>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <name>yarn.log-aggregation-enable</name>
    <name>yarn.nodemanager.local-dirs</name>
    <value>/var/lib/hadoop-yarn/cache/${user.name}/nm-local-dir</value>
    <name>yarn.nodemanager.log-dirs</name>
    <value>/var/log/hadoop-yarn/containers</value>
    <name>yarn.nodemanager.remote-app-log-dir</name>
    <value>/var/log/hadoop-yarn/apps</value>
    <name>yarn.application.classpath</name>

And voila

It’s fast, simple to install, and it appears to work 🙂 Now, all I have to do is to learn Scala in order to be as efficient as I used to be on standard MapReduce !
Great. So if you followed this article, you created a first Scala project that runs on Spark (command line interface).

Cheers,
Antoine

Advertisements

2 thoughts on “Spark: Create a simple Spark job

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s