KafkaとGridDBの連携

はじめに

今日のアプリケーションやIoTデバイスは、多くのビッグデータを生成しています。このデータの大半は非構造化データです。非常に長い間、構造化されたデータを保存するために、リレーショナルデータベース管理システムが使用されてきました。これらのデータベース管理システムは、データを行と列で構成されるテーブルに整理しています。しかし、非構造化データは行と列でうまく整理できないため、リレーショナルデータベースは非構造化データの保存に最適な選択肢ではありません。NoSQLデータベースは、リレーショナルデータベースで生じたギャップを埋めるために開発されたので非構造化データの保存に使いやすくなっています。GridDBは、NoSQLデータベースの良い例であり、IoTやビッグデータ向けに最適化されています。GridDBを使用する場合、データのストリームインとアウトが必要になります。Apache Kafkaは、これを実現するためのデータストリーミングプラットフォームです。今回は、KafkaとGridDBデータベースを使用する際に、ソフトウェアシステムのさまざまな要素間の通信をどのように整理するかについて説明します。

Apache Kafkaとは?

Apache Kafkaは、データ統合、データパイプライン、およびストリーミング分析に使用される分散イベントストリーミングプラットフォームです。ストリーミングデータをリアルタイムに取り込み、処理することができます。ストリーミングデータとは、データソースによって継続的に生成され、同時にデータレコードを送信するデータのことを指します。Kafkaは、ユーザーに以下の機能を提供します。

  • レコードストリームのパブリッシュとサブスクライブ
  • レコードストリームを生成された順番で効率的に保存する
  • レコードストリームのリアルタイム処理

Kafkaは、ストレージ、メッセージング、ストリーム処理を組み合わせて、リアルタイムおよびヒストリカルデータの分析を容易にします。

GridDBとは?

GridDBは、IoTやビッグデータの保存に最適化された時系列NoSQLデータベースです。キーコンテナモデルを採用し、時系列データとそれに対応するメタデータを扱います。また、GridDBはDiskとIn-Memoryのアーキテクチャを組み合わせることで、最大限のパフォーマンスを発揮します。並列処理を用いて大規模なデータセットを複数のノードに分散させるため、高いパフォーマンスを発揮することができます。また、コモディティハードウェア上で、直線的にも水平的にもうまくスケールして、優れたパフォーマンスを提供します。他のNoSQLデータベースとは異なり、GridDBはコンテナ内で強力な一貫性を提供し、トランザクションのACID特性を遵守しています。

JavaアプリケーションとGridDB、Kafka間の通信をどのように構成するか?

ここでは、CSVファイルからGridDBへ、さらにKafkaを介してコンシューマへデータを移動させる方法を紹介します。このプロセスは、以下のステップを踏みます。

ステップ1: JavaアプリケーションがCSVファイルからデータを読み込み、GridDBに書き込みます。ステップ2: ソースコネクタはGridDBデータベースからデータを読み込みます。ステップ3: ソースコネクタは、データをKafkaにフィードします。ステップ4: Kafkaはコンソールにデータを出力します。

インストール

始める前に、まず必要なツールをすべてインストールしましょう。Kafkaのセットアップ、ZooKeeperサーバの起動、そして最後にKafkaサーバの起動が必要です。Kafkaは以下のページからダウンロードすることができます。また、お使いのコンピュータにJava 1.8がインストールされていることを確認してください。ダウンロードが完了したら、以下のコマンドでuntarしてください。

tar xzvf kafka_2.12-2.5.0.tgz

解凍したフォルダにディレクトリを変更し、binフォルダのパスを設定します。

cd kafka_2.12-2.5.0
export PATH=$PATH:/path/to/kafka_2.12-2.5.0/bin

次に、ZooKeeperとKafkaのサーバを起動します。

zookeeper-server-start.sh  --daemon config/zookeeper.properties
kafka-server-start.sh --daemon config/server.properties

その後、mvn パッケージを使用して、Git から Kafka-connect-GridDB JDBC コネクタをビルドします。

.jarファイルをビルドした後、kafka/libsディレクトリに移動します。また、GridDB JDBCドライバを同ディレクトリに追加します。

CSVデータをGridDBに書き込む

Shopifyストアのデータをorders.csvというCSVファイルで保存しています。このデータは、Shopifyの店舗で行われた特定の商品に対するさまざまな注文の詳細を示しています。

Javaを使ってCSVファイルからデータを読み込み、GridDBに格納します。まず、使用するJavaパッケージをインポートしましょう。


import java.io.IOException;
import java.util.Properties;
import java.util.Collection;
import java.util.Scanner;
import java.io.File;

import com.toshiba.mwcloud.gs.Collection;
import com.toshiba.mwcloud.gs.GridStore;
import com.toshiba.mwcloud.gs.GSException;
import com.toshiba.mwcloud.gs.GridStoreFactory;
import com.toshiba.mwcloud.gs.RowKey;
import com.toshiba.mwcloud.gs.Query;
import com.toshiba.mwcloud.gs.RowSet;
//The data will be written into a GridDB container, so let’s create a static class that represents the container:
public static class KafkaGridDB{
    
         @RowKey String name;
         String title;
         String price;
         String quantity;
    }

上のクラスをGridDBコンテナ、変数をカラムとして見てください。それでは、GridDBインスタンスへの接続を確立してみましょう。GridDBインスタンスを、GridDBをインストールした際の認証情報で作成します。


Properties pp = new Properties();
pp.setProperty("notificationAddress", "239.0.0.1");
pp.setProperty("notificationPort", "31999");
pp.setProperty("clusterName", "defaultCluster");
pp.setProperty("user", "admin");
pp.setProperty("password", "mypassword");
GridStore store = GridStoreFactory.getInstance().getGridStore(pp);
//Let us select the KafkaGridDB container:
Collection<String, KafkaGridDB> cl = store.putCollection("col01", KafkaGridDB.class);
//We have selected the KafkaGridDB container and created its instance named cl. We can now use this instance to read data from the orders.csv file:
File myfile = new File("orders.csv");
                Scanner scn = new Scanner(myfile);
                String data = scn.next();
 
                while (scn.hasNext()){
                        String scData = scn.next();
                        String list[] = scData.split(",");
                        String name = list[0];
                        String title = list[1];
                        String price = list[2];
                        String quantity = list[3];
                        
                        KafkaGridDB kafkagriddb = new KafkaGridDB();
    
                        kafkagriddb.name = name;
                        kafkagriddb.title = title;
                        kafkagriddb.price = price;
                        kafkagriddb.quantity = quantity;
                        cl.append(kafkagriddb);
                 }

GridDB コンテナに追加される kafkagriddb オブジェクトを作成しました。

GridDBソースコネクタの設定

いよいよGridDBシンクコネクタの設定ファイルを作成します。設定ファイルには、KafkaとGridDB間の通信を円滑にするためのパラメータが記述されます。config/connect-jdbc.properties ファイルに以下の内容を追加します。

bootstrap.servers=localhost:9092
name=griddb-sources
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
key.converter.schemas.enable=true
value.converter.schemas.enable=true
batch.size=1
mode=bulk
topic.prefix=gridstore-03-
table.whitelist="KafkaGridDB"
connection.url=jdbc:gs://239.0.0.1:41999/defaultCluster/public
connection.user=admin
connection.password=mypassword
auto.create=true
transforms=TimestampConverter
transforms.TimestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format=yyyy-MM-dd hh:mm:ss
transforms.TimestampConverter.field=datetime
transforms.TimestampConverter.target.type=Timestamp

次に、GridDBからデータを読み込む方法について説明します。

GridDBのデータを読み込む

ソースコネクタを使って、GridDBからデータを問い合わせ、Kafkaに送り込みます。まずは、ソースコネクタを起動してみましょう。Kafkaのディレクトリから以下のコマンドを実行します。

./bin/connect-standalone.sh config/connect-standalone.properties config/connect-jdbc.properties

このコマンドはコネクタを起動し、ホワイトリストに登録されたテーブルの検索を開始します。もし見つからなければ、エラーを返します。

データが入力された後、コネクタは以下の出力を返します。

[2021-11-06 09:07:12,132] INFO WorkerSourceTask{id=griddb-sources-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:419)
[2021-11-06  09:07:12,148] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:237)
[2021-11-06  09:07:12,148] INFO WorkerSourceTask{id=griddb-sources-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:215)
[2021-11-06 09:07:12,149] INFO Begin using SQL query: SELECT * FROM "KafkaGridDB" (io.confluent.connect.jdbc.source.TableQuerier:144)

上記のメッセージは、データベースへの接続が成功したことを明確に示しています。

次に、Kafkaからデータを読み込んでいきます。

Kafkaからデータを読み込む

Kafkaの内容をターミナルに表示させたいと思います。以下のコマンドを実行するだけです。

bin/kafka-console-consumer.sh --topic gridstore-03-KafkaGridDB --from-beginning --bootstrap-server localhost:9092

このコマンドは、GridDBコンテナの内容をKafkaメッセージとして返す必要があります。

このように、CSVデータをGridDBに書き込み、GridDBのデータをクエリし、Kafkaに送り込むことができました。

終わりに

本記事では、Javaアプリケーション、GridDB、Kafkaを使用する際の通信を整理する手順を説明しました。CSVデータをGridDBデータに書き込んで、Apache Kafkaに送り込む方法を学ぶことの一助になれば幸いです。

Leave a Reply

Your email address will not be published.