7.3 Hadoop and Spark Connector
Overview
The Hadoop MapReduce GridDB connector is a Java library for using GridDB as an input source and output destination for Hadoop MapReduce jobs. This library allows the GridDB performance to be used directly by MapReduce jobs through in-memory processing.
Operating Environment
Building of the library and execution of the sample programs are checked in the environment below.
OS: CentOS6.7(x64) Java: JDK 1.8.0_60 Maven: apache-maven-3.3.9 Hadoop: CDH5.7.1(YARN)
QuickStart
Preparations
Build a GridDB Java client and place the created gridstore.jar under the lib directory.
Build
Run the mvn command like the following: $ mvn package and create the following jar files.
gs-hadoop-mapreduce-client/target/gs-hadoop-maprduce-client-1.0.0.jar
gs-hadoop-mapreduce-examples/target/gs-hadoop-maprduce-examples-1.0.0.jar
Running The Sample Program
An operating example to run the WordCount program using GridDB is shown below. GridDB and Hadoop (HDFS and YARN) need to be started in advance. Run the following in an environment in which these and hadoop commands can be used.
$ cd gs-hadoop-mapreduce-examples
$ ./exec-example.sh \
> --job wordcount \
> --define notificationAddress= \
> --define notificationPort= \
> --define clusterName= \
> --define user= \
> --define password= \
> pom.xml 2> /dev/null | sort -r
5
5
3 org.apache.hadoop
3 com.toshiba.mwcloud.gs.hadoop
...
The first number is the number of occurrences while the right side is a word in the file (pom.xml) specified as a processing target. See gs-hadoop-mapreduce-examples/README.md for details about the sample programs.
Preparations
Build a GridDB Java client and place the created gridstore.jar under the lib directory.
Build
Run the mvn command like the following: $ mvn package and create the following jar files.
gs-hadoop-mapreduce-client/target/gs-hadoop-maprduce-client-1.0.0.jar gs-hadoop-mapreduce-examples/target/gs-hadoop-maprduce-examples-1.0.0.jar
Running The Sample Program
An operating example to run the WordCount program using GridDB is shown below. GridDB and Hadoop (HDFS and YARN) need to be started in advance. Run the following in an environment in which these and hadoop commands can be used.
$ cd gs-hadoop-mapreduce-examples $ ./exec-example.sh \ > --job wordcount \ > --define notificationAddress=\ > --define notificationPort= \ > --define clusterName= \ > --define user= \ > --define password= \ > pom.xml 2> /dev/null | sort -r 5 5 3org.apache.hadoop 3com.toshiba.mwcloud.gs.hadoop ...
The first number is the number of occurrences while the right side is a word in the file (pom.xml) specified as a processing target. See gs-hadoop-mapreduce-examples/README.md for details about the sample programs.
Spark Connector
The Spark Connector can be downloaded from our downloads page.
Full list of dependencies:- OS: CentOS6.7(x64)
- Maven: apache-maven-3.3.9
- Java: JDK 1.8.0_101
- Apache Hadoop: Version 2.6.5
- Apache Spark: Version 2.1.0
- Scala: Version 2.11.8
- GridDB server and Java client: 3.0 CE
- GridDB connector for Apache Hadoop MapReduce: 1.0
If beginning from scratch, I recommend ensuring all of these items are installed and configured. This tutorial also assumes that your Hadoop, Spark, and Connector are all installed in the [INSTALL_FOLDER]
directory (I used /opt
).
Installation
Once verified, please proceed with the steps outlined below:
We start this process off with adding the following environment variables to .bashrc
$ nano ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/[JDK folder] export HADOOP_HOME=[INSTALL_FOLDER]/hadoop-2.6.5 export SPARK_HOME=[INSTALL_FOLDER]/spark-2.1.0-bin-hadoop2.6 export GRIDDB_SPARK=[INSTALL_FOLDER]/griddb_spark export GRIDDB_SPARK_PROPERTIES=$GRIDDB_SPARK/gd-config.xml export PATH=$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$SPARK_HOME/bin:$PATH export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_HOME/lib/native"
$ source ~/.bashrc
Once those are added, modify the gd-config.xml
file.
$ cd $GRIDDB_SPARK $ nano gd-config.xml
<!-- GridDB properties --> <property> <name>gs.user </name> <value>[GridDB user] </value> </property> <property> <name>gs.password </name> <value>[GridDB password] </value> </property> <property> <name>gs.cluster.name </name> <value>[GridDB cluster name] </value> </property> <!-- Define address and port for multicast method, leave it blank if using other method --> <property> <name>gs.notification.address </name> <value>[GridDB notification address(default is 239.0.0.1)] </value> </property> <property> <name>gs.notification.port </name> <value>[GridDB notification port(default is 31999)] </value> </property>
Build The Connector + An Example
Next up, refer to this configuration page for a quick definition of each of the GridDB properties.
To build a GridDB Java client and a GridDB connector for Hadoop MapReduce, place the following files under the $GRIDDB_SPARK/gs-spark-datasource/lib
directory.
gridstore.jar gs-hadoop-mapreduce-client-1.0.0.jar
(Note: these .jar
files should have been created when you built your GridDB client and the GridDB Mapreduce Connector. You can find gridstore.jar
in /usr/griddb-X.X.X/bin
, for example)
Once that's complete, add the SPARK_CLASSPATH to "spark-env.sh"
$ cd $SPARK_HOME $ nano conf/spark-env.sh
SPARK_CLASSPATH=.:$GRIDDB_SPARK/gs-spark-datasource/target/gs-spark-datasource.jar:$GRIDDB_SPARK/gs-spark-datasource/lib/gridstore.jar:$GRIDDB_SPARK/gs-spark-datasource/lib/gs-hadoop-mapreduce-client-1.0.0.jar
Now that we've got the prerequisites out of the way, we can continue on to build the connector and an example to ensure everything is working properly.
To begin, we will need to edit our Init.java
file to add the correct authentication credientials.
$ cd $SPARK_HOME/gs-spark-datasource-example/src/ $ nano Init.java
And add in your credentials:
Properties props = new Properties(); props.setProperty("notificationAddress", "239.0.0.1"); props.setProperty("notificationPort", "31999"); props.setProperty("clusterName", "Spark-Cluster"); props.setProperty("user", "admin"); props.setProperty("password", "hunter2"); GridStore store = GridStoreFactory.getInstance().getGridStore(props);
And now we can run the mvn command like so:
$ cd $GRIDDB_SPARK $ mvn package
which will create the following .jar
files:
gs-spark-datasource/target/gs-spark-datasource.jar gs-spark-datasource-example/target/example.jar
Now proceed with running the example program. First start your GridDB cluster. And then:
Put some data into the server with the GridDB Java client
$ cd $GRIDDB_SPARK $ java -cp ./gs-spark-datasource-example/target/example.jar:gs-spark-datasource/lib/gridstore.jar Init
Queries
Now you can run queries with your GridDB connector for Spark:
$ spark-submit --class Query ./gs-spark-datasource-example/target/example.jar
We will go over some brief examples of Apache Spark's API. Examples are pulled from the official page.
Spark's defining feature is its RDD (Resilient Distributed Datasets) and the accompanying API. RDDs are immutable data structures that can be run in parallel on commodity hardware -- essentially it is exactly what allows Spark to run its queries in parallel and outperform MapReduce. Here's a very basic example; it will showcase how to build an RDD of the numbers 1 - 5
Listdata = Arrays.asList(1, 2, 3, 4, 5); JavaRDD distData = sc.parallelize(data);
With this, you can now run that small array in parallel. Pretty cool, huh?
Command Line Query
A "must-run" query in the Big Data scene is running a word count, so here's what it looks like on Spark. For this example, let's try using the shell (example taken from: here). To run this, please be sure you place a text file input.txt
into your $GRIDDB_SPARK
directory. Fill it with whatever text you like; I used the opening chapter of Moby Dick . Now fire up the spark shell:
$ spark-shell
scala> val inputfile = sc.textFile ("input.txt") inputfile: org.apache.spark.rdd.RDD[String] = input.txt MapPartitionsRDD[1] at textFile at:24 scala> val counts = inputfile.flatMap (line => line.split (" " )).map (word => (word, 1)).reduceByKey(_+_) counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :26 scala> counts.saveAsTextFile ("output")
And now if you head back into $GRIDDB_SPARK
, you should find the output
dir. Now just run a simple cat
on the file in there to retrieve the word count results of your text file.
$ cd $GRIDDB_SPARK $ cd output $ cat part-00000 (Ah!,1) (Let,1) (dreamiest,,1) (dotings,1) (cooled,1) (spar,1) (previous,2) (street,,1) (old,6) (left,,1) (order,2) (told,1) (marvellous,,1) (Now,,1) (virtue,1) (Take,1)
TS Query
Of course, Spark is also capable of handling much more complex queries. Because GridDB ideally deals mostly in TimeSeries (TS) data, how about we take a look into a TS query? Here's a sample query taken from here:
val tsRdd: TimeSeriesRDD = ... // Find a sub-slice between two dates val zone = ZoneId.systemDefault() val subslice = tsRdd.slice( ZonedDateTime.of(LocalDateTime.parse("2015-04-10T00:00:00"), zone) ZonedDateTime.of(LocalDateTime.parse("2015-04-14T00:00:00"), zone)) // Fill in missing values based on linear interpolation val filled = subslice.fill("linear") // Use an AR(1) model to remove serial correlations val residuals = filled.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series))
License
The Hadoop MapReduce GridDB connector source license is Apache License, version 2.0.
The Hadoop MapReduce GridDB connector source license is Apache License, version 2.0.