Introduction
Apache Spark now has support to fully integrate GridDB into its workflow. For those unaware, Spark is FOSS which saw its initial release in 2014. Since then, it has very quickly established itself as an important piece of Big Data processing and analyzing. This blog post is meant give instructions on how to install Spark on your GridDB machine and will also go over some brief queries to provide a tangible look at its usage.
As briefly explained before, Apache Spark is a parallel data processing framework meant to provide fast data analytics. Using the GridDB connector allows a GridDB database to be used as an input source for Spark queries and analytics. Its interactive shell can be used to quickly and easily perform ad-hoc queries by data scientists/developers or can be built into user-facing business applications. Installation is a fairly simple process.
This blog assumes your machine already has a GridDB server, the GridDB Java Client, and the GridDB Hadoop Mapreduce Connector. These items all also each have their own sets of dependencies, so I will post a full list below. And please note, if you have any sorts of issues installing any of these items, please leave a comment below or post on the forums for help.
Full list of dependencies:
- OS: CentOS6.7(x64)
- Maven: apache-maven-3.3.9
- Java: JDK 1.8.0_101
- Apache Hadoop: Version 2.6.5
- Apache Spark: Version 2.1.0
- Scala: Version 2.11.8
- GridDB server and Java client: 3.0 CE
- GridDB connector for Apache Hadoop MapReduce: 1.0
If beginning from scratch, I recommend ensuring all of these items are installed and configured. This tutorial also assumes that your Hadoop, Spark, and Connector are all installed in the [INSTALL_FOLDER]
directory (I used /opt
).
Installation
Once verified, please proceed with the steps outlined below:
We start this process off with adding the following environment variables to .bashrc
$ nano ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/[JDK folder] export HADOOP_HOME=[INSTALL_FOLDER]/hadoop-2.6.5 export SPARK_HOME=[INSTALL_FOLDER]/spark-2.1.0-bin-hadoop2.6 export GRIDDB_SPARK=[INSTALL_FOLDER]/griddb_spark export GRIDDB_SPARK_PROPERTIES=$GRIDDB_SPARK/gd-config.xml export PATH=$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$SPARK_HOME/bin:$PATH export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_HOME/lib/native"
$ source ~/.bashrc
Once those are added, modify the gd-config.xml
file.
$ cd $GRIDDB_SPARK $ nano gd-config.xml
<!-- GridDB properties --> <property> <name>gs.user </name> <value>[GridDB user] </value> </property> <property> <name>gs.password </name> <value>[GridDB password] </value> </property> <property> <name>gs.cluster.name </name> <value>[GridDB cluster name] </value> </property> <!-- Define address and port for multicast method, leave it blank if using other method --> <property> <name>gs.notification.address </name> <value>[GridDB notification address(default is 239.0.0.1)] </value> </property> <property> <name>gs.notification.port </name> <value>[GridDB notification port(default is 31999)] </value> </property>
Build The Connector + An Example
Next up, refer to this configuration page for a quick definition of each of the GridDB properties.
To build a GridDB Java client and a GridDB connector for Hadoop MapReduce, place the following files under the $GRIDDB_SPARK/gs-spark-datasource/lib
directory.
gridstore.jar gs-hadoop-mapreduce-client-1.0.0.jar
(Note: these .jar
files should have been created when you built your GridDB client and the GridDB Mapreduce Connector. You can find gridstore.jar
in /usr/griddb-X.X.X/bin
, for example)
Once that’s complete, add the SPARK_CLASSPATH to “spark-env.sh”
$ cd $SPARK_HOME $ nano conf/spark-env.sh
SPARK_CLASSPATH=.:$GRIDDB_SPARK/gs-spark-datasource/target/gs-spark-datasource.jar:$GRIDDB_SPARK/gs-spark-datasource/lib/gridstore.jar:$GRIDDB_SPARK/gs-spark-datasource/lib/gs-hadoop-mapreduce-client-1.0.0.jar
Now that we’ve got the prerequisites out of the way, we can continue on to build the connector and an example to ensure everything is working properly.
To begin, we will need to edit our Init.java
file to add the correct authentication credientials.
$ cd $SPARK_HOME/gs-spark-datasource-example/src/ $ nano Init.java
And add in your credentials:
Properties props = new Properties(); props.setProperty("notificationAddress", "239.0.0.1"); props.setProperty("notificationPort", "31999"); props.setProperty("clusterName", "Spark-Cluster"); props.setProperty("user", "admin"); props.setProperty("password", "hunter2"); GridStore store = GridStoreFactory.getInstance().getGridStore(props);
And now we can run the mvn command like so:
$ cd $GRIDDB_SPARK $ mvn package
which will create the following .jar
files:
gs-spark-datasource/target/gs-spark-datasource.jar gs-spark-datasource-example/target/example.jar
Now proceed with running the example program. First start your GridDB cluster. And then:
Put some data into the server with the GridDB Java client
$ cd $GRIDDB_SPARK $ java -cp ./gs-spark-datasource-example/target/example.jar:gs-spark-datasource/lib/gridstore.jar Init
Queries
Now you can run queries with your GridDB connector for Spark:
$ spark-submit --class Query ./gs-spark-datasource-example/target/example.jar
We will go over some brief examples of Apache Spark’s API. Examples are pulled from the official page.
Spark’s defining feature is its RDD (Resilient Distributed Datasets) and the accompanying API. RDDs are immutable data structures that can be run in parallel on commodity hardware — essentially it is exactly what allows Spark to run its queries in parallel and outperform MapReduce. Here’s a very basic example; it will showcase how to build an RDD of the numbers 1 – 5
Listdata = Arrays.asList(1, 2, 3, 4, 5); JavaRDD distData = sc.parallelize(data);
With this, you can now run that small array in parallel. Pretty cool, huh?
Command Line Query
A “must-run” query in the Big Data scene is running a word count, so here’s what it looks like on Spark. For this example, let’s try using the shell (example taken from: here). To run this, please be sure you place a text file input.txt
into your $GRIDDB_SPARK
directory. Fill it with whatever text you like; I used the opening chapter of Moby Dick . Now fire up the spark shell:
$ spark-shell
scala> val inputfile = sc.textFile ("input.txt") inputfile: org.apache.spark.rdd.RDD[String] = input.txt MapPartitionsRDD[1] at textFile at:24 scala> val counts = inputfile.flatMap (line => line.split (" " )).map (word => (word, 1)).reduceByKey(_+_) counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :26 scala> counts.saveAsTextFile ("output")
And now if you head back into $GRIDDB_SPARK
, you should find the output
dir. Now just run a simple cat
on the file in there to retrieve the word count results of your text file.
$ cd $GRIDDB_SPARK $ cd output $ cat part-00000 (Ah!,1) (Let,1) (dreamiest,,1) (dotings,1) (cooled,1) (spar,1) (previous,2) (street,,1) (old,6) (left,,1) (order,2) (told,1) (marvellous,,1) (Now,,1) (virtue,1) (Take,1)
TS Query
Of course, Spark is also capable of handling much more complex queries. Because GridDB ideally deals mostly in TimeSeries (TS) data, how about we take a look into a TS query? Here’s a sample query taken from here:
val tsRdd: TimeSeriesRDD = ... // Find a sub-slice between two dates val zone = ZoneId.systemDefault() val subslice = tsRdd.slice( ZonedDateTime.of(LocalDateTime.parse("2015-04-10T00:00:00"), zone) ZonedDateTime.of(LocalDateTime.parse("2015-04-14T00:00:00"), zone)) // Fill in missing values based on linear interpolation val filled = subslice.fill("linear") // Use an AR(1) model to remove serial correlations val residuals = filled.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series))
Using Spark with GridDB as an input source is very easy and will prove to be a very useful implementation. We hope the marriage of both services yields lots of productive analysis work.
If you have any questions about the blog, please create a Stack Overflow post here https://stackoverflow.com/questions/ask?tags=griddb .
Make sure that you use the “griddb” tag so our engineers can quickly reply to your questions.