Tag: フレームワーク

  • Apache SparkのためのGridDBコネクタ

    Apache SparkのためのGridDBコネクタ

    Apache Spark は、GridDBをワークフローに統合するためのサポート機能を追加しました。Sparkは以前は FOSS として知られ、2014年に最初にリリースされて以降、ビッグデータの処理と分析において重要な役割を担っています。このブログでは、GridDBマシンにSparkをインストールする方法と、いくつかの簡単なクエリを用いて具体的な使い方の一例をご紹介します。

    Apache Sparkは、高速データ分析を提供するためのパラレルデータ処理フレームワークです。 GridDBコネクタを使用すると、GridDBデータベースをSparkのクエリと分析の入力ソースとして使用できます。そのインタラクティブシェルは、データ科学者/開発者によるアドホッククエリを素早く簡単に実行するために使用できます。また、ユーザー指向のビジネスアプリケーションに組み込むこともできます。インストールの手順は簡単です。

    このブログでは、あなたのマシンに GridDBサーバー、GridDB Javaクライアント、 GridDB Hadoop Mapreduce Connector があることを前提としています。これらのソフトウェアにはそれぞれ独自の依存関係がありますので、以下では完全なリストを記載します。また、これらのソフトウェアをインストールする際に問題が発生した場合は、下にコメントを残すか、フォーラムに投稿してください。

    依存関係の完全なリスト:

    • OS: CentOS6.7(x64)
    • Maven: apache-maven-3.3.9
    • Java: JDK 1.8.0_101
    • Apache Hadoop: Version 2.6.5
    • Apache Spark: Version 2.1.0
    • Scala: Version 2.11.8
    • GridDB server and Java client: 3.0 CE
    • GridDB connector for Apache Hadoop MapReduce: 1.0

    最初に、これらのソフトウェアがすべてインストールされ、設定されていることを確認しておくことをお勧めします。 このチュートリアルでは、Hadoop、Spark、およびConnectorがすべて[INSTALL_FOLDER] ディレクトリにインストールされていることを前提としています。(私は /opt ディレクトリを使いました)

    インストール

    確認が終わったら、次の手順を実行してください。

    まず、次の環境変数を .bashrc に追加します。

    $ nano ~/.bashrc
     export JAVA_HOME=/usr/lib/jvm/[JDK folder]
     export HADOOP_HOME=[INSTALL_FOLDER]/hadoop-2.6.5
     export SPARK_HOME=[INSTALL_FOLDER]/spark-2.1.0-bin-hadoop2.6
     export GRIDDB_SPARK=[INSTALL_FOLDER]/griddb_spark
     export GRIDDB_SPARK_PROPERTIES=$GRIDDB_SPARK/gd-config.xml
     
     export PATH=$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$SPARK_HOME/bin:$PATH
     
     export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
     export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_HOME/lib/native"
    
    $ source ~/.bashrc

    追加できたら、 gd-config.xml ファイルを変更します。

    $ cd $GRIDDB_SPARK
    $ nano gd-config.xml
     <!-- GridDB properties -->
     <property>
     	 <name>gs.user </name>
     	 <value>[GridDB user] </value>
     </property>
     <property>
     	 <name>gs.password </name>
     	 <value>[GridDB password] </value>
     </property>
     <property>
     	 <name>gs.cluster.name </name>
     	 <value>[GridDB cluster name] </value>
     </property>
      <!-- Define address and port for multicast method, leave it blank if using other method -->
     <property>
     	 <name>gs.notification.address </name>
     	 <value>[GridDB notification address(default is 239.0.0.1)] </value>
     </property>
     <property>
     	 <name>gs.notification.port </name>
     	 <value>[GridDB notification port(default is 31999)] </value>
     </property>
    

    コネクタの構築と例

    次に、各GridDBプロパティの簡単な定義については、この設定ページを参照してください。

    Hadoop MapReduce用GridDB JavaクライアントとGridDBコネクタを構築するには、 $ GRIDDB_SPARK/gs-spark-datasource/libディレクトリの下に次のファイルを配置します。

    gridstore.jar
    gs-hadoop-mapreduce-client-1.0.0.jar
    

    gridstore.jar は、/usr/libディレクトリにあります(これらの .jar ファイルは、GridDBクライアントとGridDB Mapreduce Connectorをビルドしたときに作成されているはずです) griddb-XXX / bin など)
    完了したら、 “spark-env.sh”にSPARK_CLASSPATHを追加します。

    $ cd $SPARK_HOME
    $ nano conf/spark-env.sh
     SPARK_CLASSPATH=.:$GRIDDB_SPARK/gs-spark-datasource/target/gs-spark-datasource.jar:$GRIDDB_SPARK/gs-spark-datasource/lib/gridstore.jar:$GRIDDB_SPARK/gs-spark-datasource/lib/gs-hadoop-mapreduce-client-1.0.0.jar
    

    これで前提条件が揃ったので、コネクタと、すべてが正しく動作することを確認するための例を作成します。

    まず、正しい認証クレデンシャルを追加するために、 Init.java ファイルを編集する必要があります。

    $ cd $SPARK_HOME/gs-spark-datasource-example/src/
    $ nano Init.java
    

    あなたの資格情報を追加してください。

    Properties props = new Properties();
    props.setProperty("notificationAddress", "239.0.0.1");
    props.setProperty("notificationPort", "31999");
    props.setProperty("clusterName", "Spark-Cluster");
    props.setProperty("user", "admin");
    props.setProperty("password", "hunter2");
    GridStore store = GridStoreFactory.getInstance().getGridStore(props);
    

    これでmvnコマンドを次のように実行できます。

    $ cd $GRIDDB_SPARK
    $ mvn package
    

    次の .jar ファイルが作成されます。

    gs-spark-datasource/target/gs-spark-datasource.jar
    gs-spark-datasource-example/target/example.jar
    

    次に、サンプルプログラムを実行します。 まず、GridDBクラスタを起動します。 その後、GridDB Javaクライアントを使用していくつかのデータをサーバーに入れます。

    $ cd $GRIDDB_SPARK
    $ java -cp ./gs-spark-datasource-example/target/example.jar:gs-spark-datasource/lib/gridstore.jar Init
    

    クエリ

    これで、SparkのGridDBコネクタでクエリを実行できます。

    $ spark-submit --class Query ./gs-spark-datasource-example/target/example.jar
    

    Apache SparkのAPIの簡単な例を紹介します。 例は公式ページから抜粋しています。

    Sparkの特徴は、RDD(Resilient Distributed Datasets)とこれに付属するAPIです。 RDDはコモディティハードウェア上で並行して実行できる不変のデータ構造です。本質的に、Sparkがクエリを並行して実行し、MapReduceより優れた性能を発揮するものです。 ここで簡単な例として、1から5までの数字のRDDを構築する方法を紹介します。

    List data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD distData = sc.parallelize(data);
    

    これで、その小さな配列を並行して実行できるようになりました。

    コマンドラインクエリ

    ビッグデータの世界で必須のクエリといえば、単語数のカウントがあります。Spark上でどのように単語数カウントができるかを見てみましょう。ここでは、(例:こちらにあるShellを使って考えます。 これを実行するには、テキストファイル input.txt $ GRIDDB_SPARK ディレクトリに配置してください。 何か適当に文章を入力してみてください。 私は Moby Dick の小説の第一章の文章を引用しました。

    $ spark-shell 

    scala> val inputfile = sc.textFile ("input.txt")
    inputfile: org.apache.spark.rdd.RDD[String] = input.txt MapPartitionsRDD[1] at textFile at :24
    
    scala> val counts = inputfile.flatMap (line => line.split (" " )).map (word => (word, 1)).reduceByKey(_+_)
    counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :26
    
    scala> counts.saveAsTextFile ("output")
    

    $ GRIDDB_SPARK に戻ると、出力ディレクトリが見つかります。 ファイル内でcatを実行すると、テキストファイルの単語数の結果を取得することができます。

    $ cd $GRIDDB_SPARK
    $ cd output
    $ cat part-00000 
    (Ah!,1)
    (Let,1)
    (dreamiest,,1)
    (dotings,1)
    (cooled,1)
    (spar,1)
    (previous,2)
    (street,,1)
    (old,6)
    (left,,1)
    (order,2)
    (told,1)
    (marvellous,,1)
    (Now,,1)
    (virtue,1)
    (Take,1)

    TSクエリ

    もちろん、Sparkはもっと複雑なクエリを処理することもできます。 GridDBはTimeSeries(TS)データを扱うのに最も適しているので、次はTSクエリを見てみましょう。 こちらから取得したサンプルクエリは次のとおりです。

    val tsRdd: TimeSeriesRDD = ...
    
    // Find a sub-slice between two dates 
    val zone = ZoneId.systemDefault()
    val subslice = tsRdd.slice(
      ZonedDateTime.of(LocalDateTime.parse("2015-04-10T00:00:00"), zone)
      ZonedDateTime.of(LocalDateTime.parse("2015-04-14T00:00:00"), zone))
    
    // Fill in missing values based on linear interpolation
    val filled = subslice.fill("linear")
    
    // Use an AR(1) model to remove serial correlations
    val residuals = filled.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series))
    

    これまでの内容から、Sparkを入力ソースとしてGridDBと使用することがとても簡単で役立つことがお分かりいただけたでしょうか。SparkとGridDBを一緒にお使いいただくことで生産性の高い分析作業が可能となります。ぜひお試しください。