GridDBとKafkaでデータをストリームする

Apache Kafkaは、「レコードストリームのリアルタイム処理」を可能にするツールです。これはどういうことかというと、センサーや様々なツールの一部からリアルタイムにデータを直接何かに送ることができ、この場合はGridDBに直接送ることができます。また、その逆も可能で、GridDBコンテナからデータを直接ロギング分析などの他のツールや様々なツールにストリームすることができます。

以前のブログで、Apache Kafkaを使って、JDBC接続でGridDBサーバに直接リアルタイムデータをロードする方法について説明しました。以下のリンクから以前のブログを読むことができます。KafkaとGridDBの連携JDBCでKafkaのソースとしてGridDBを使用する

今回は、新しくリリースされたGridDB Kafka Connectorを利用して、GridDBとKafkaを連携させる方法をご紹介します。前回はJDBCを介したGridDBとKafkaの連携に焦点を当てましたが、今回リリースされたコネクタでは、JDBCは不要となり、シンクコネクタとソースコネクタを通じてKafkaから直接GridDBとインタフェースできるようになりました。GridDB Kafkaシンクコネクタは、Apache Kafkaのトピックからデータをプッシュし、そのデータをGridDBデータベースのテーブルに永続化します。ソースコネクタはその逆で、GridDBからデータを取り出してKafkaのトピックに格納します。

これを説明するために、2種類のコネクタを使って、kafkaのトピックからGridDBサーバーに直接データをプッシュし(Sink)、その逆(Source)も行います。

この手順では最終的に5種類のターミナルを開くことになります。

  • ターミナル1:GridDB gs_shでGridDBの動作を確認する。
  • ターミナル2、3:Kafka Zookeper、Kafkaサーバーを実行する。
  • ターミナル4:GridDB Kafkaのシンクおよびソースコネクタを実行する。
  • ターミナル5:スクリプトの実行、読み込み、データチェックする。

環境・端末をインストール、セットアップする

このブログでは、すべての事前準備を解説し、適切なサーバ・スクリプトを実行し、Kafka “topics” を作成します。また、データをGridDBサーバ(Sink コネクタ)にプッシュして保存し、ライブショーケースを行い、最後にGridDBから直接Kafkaトピック(Source Connector)にデータを引き込みます。

事前準備

今後の手順をフォローするために必要となる以下のものをあらかじめ用意してください。

  • Java
  • Maven (GridDB Kafka Connectorをソースからビルドする場合のみ)
  • Kafka
  • GridDB

    また、https://github.com/griddbnet/Blogs/tree/kafka から、ソースコードを参照することができます。このレポでは、基本的な部分(設定ファイル、データ、bashスクリプト)にアクセスできますが、GridDB、Kafka、GridDB Kafkaコネクタをダウンロードする必要があります。

$ git clone https://github.com/griddbnet/Blogs.git --branch kafka

もうひとつ、kafkaのbinディレクトリへのパスをPATHに追加しておくことをお勧めします。ターミナルを開くたびにやってもいいですし、プロファイル$ vim ~/.bashrcに追加してもいいでしょう。そして、パスに追加してください。

export PATH=$PATH:/home/israel/kafka_project/kafka_2.13-3.2.1/bin

これで、新しいターミナルを開いたときにいつでもkafkaスクリプトを利用できるようになりました。また、最悪の場合、うまくいかない場合は、$ source ~/.bashrcで強制的に実行することができます。 

GridDB Kafkaコネクタをインストールする

Connectorのレポに移動して、クローンします。

$ git clone https://github.com/griddb/griddb-kafka-connect.git
$ cd griddb-kafka-connect/

Kafkaコネクタをダウンロードしたら、必要な.jarファイルをビルドするか、このブログで先に共有したリポジトリから直接取得し、できたファイルを適切な場所に移動させます。ビルドする方法は以下の通りです。

$ mvn clean install

自分でビルドするのが面倒な場合は、GitHubのリポジトリに同梱されているファイルのコピーを取得することもできます。

.jar ファイル(griddb-kafka-connector-0.5.jar)をkafkaディレクトリ である./libs ディレクトリにコピーしてください。

Kafkaをインストールする

kafkaをインストールします。

$ wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
$ tar xzvf kafka_2.12-3.2.0.tgz
$ cd kafka_2.12-3.2.0
$ export PATH=$PATH:$PWD/bin

PATH環境変数にkafkaを追加するのは、その特定のターミナルセッションでのみ行われます。2つ目(または3つ目など)のターミナルを開いた場合は、環境変数を再エクスポートするか、スクリプトへのエンターパスを手動で使用する必要があります。もちろん、ユーザーの .basrc.bash_profile ファイルに追加することもできます。

GridDBシンク設定ファイルを設定する

GridDBの設定を行うには、Blogsフォルダ(先ほど共有したGitHubリポジトリ)からgriddb-sink.propertiesをkafka/configにコピーします。この部分を手動で編集したい場合は、説明を読み進めてください。

設定ファイルを編集して、稼働中のGridDBのサーバーの認証情報と、Kafkaトピックで取り込むことを目的とするトピックを入力してください。ファイル名は griddb-sink.properties です。

そして、GridDBサーバーの情報を入力します。今回はFIXED_LISTモードなので、通知先メンバーを編集し、hostとportを削除します。最後に、GridDBサーバに取り込むトピックを追加します。

#host=239.0.0.1
#port=31999
cluster.name=myCluster
user=admin
password=admin
notification.member=127.0.0.1:10001
#notification.provider.url=

#topics.regex=csh(.*)
#topics.regex=topic.(.*)
topics=device7,device8,device9,device10

transforms=TimestampConverter
transforms.TimestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format=yyyy-MM-dd hh:mm:ss.SSS
transforms.TimestampConverter.field=ts
transforms.TimestampConverter.target.type=Timestamp

ここでは、GridDB Sink コネクタにこれらのトピックを探すように明示的に指示しています。もちろん、より一般的にしたいのであれば、単に正規表現を使用することもできますが、今回のようなデモの目的では、これで問題ないでしょう。

また、最後にミリ秒を含めるためにタイムスタンプのフォーマットを変更し、タイムスタンプのカラム名(この場合は ts)を設定します。

このファイルを編集したら、コピーを kafka_2.13-3.2.1/config ディレクトリに作成してください。

GridDBソースコンフィグファイルを設定する

もし、先ほど説明したことの逆(保存したGridDBコンテナをKafkaにプッシュする)を行いたい場合は、代わりにGridDBソースコネクタを使用することになります。まず、設定ファイルを編集する必要があります。GRIDDB_KAFKA_CONNECTOR_FOLDER/config にある griddb-source.properties を編集します。GridDB接続の詳細とコンテナ・トピックを変更する必要があります。もちろん、前回の設定ファイルと同様に、今回使用したバージョンはGitHubのリポジトリに含まれています。

コンテナ編では、Kaggleから取り込んだこれらの大規模データセットに直接変更しましょう。

containers=device1,device2,device3,device4

この2つのファイル(シンクとソース)の大きな違いの1つは、ソースでは次のパラメータ(必須)が必要になることです。このパラメータには、タイムスタンプの行のキー(device7ではts)を設定します。

timestamp.column.name=ts
mode=timestamp

mode=batch を使うこともできます。この場合、 timestamp.column.name パラメータは使われず、データセットを何度もループして更新します。一方、前者のモード(timestamp)では、1回だけ(キューがなくなるまで)データを取得することになります。

そして、これでプロセス・サーバーを起動する準備が整いました。

ここでも、kafka_2.13-3.2.1.configディレクトリにコピーしておいてください。

必要なサーバー・システムを立ち上げる

これですべてが整ったので、現在のディレクトリ構造を簡単に振り返ってみましょう。というわけで、このまま進むと、3つのディレクトリが用意されているはずです。

/home/you/kafka_project/ ├─ kafka_2.13-3.2.1/ ├─ griddb-kafka-connect/ ├─ Blogs/

最初のディレクトリ (kafka_2.13-3.2.1) は、kafka のメインディレクトリです。2番目のディレクトリ(griddb-kafka-connect/)はGridDB kafkaコネクタのディレクトリで、このディレクトリにはGridDB固有の設定ファイル(手動で編集するかGitHubから直接取得したファイル、ただしこれらのファイルはkafkaディレクトリにコピーする必要があります)が入っています。3つ目のディレクトリ(Blogs)は、このブログのために作られたもので、Kafkaの設定が含まれており、該当する場合はKafkaディレクトリにコピーしておく必要があります。

それでは、これを動かしてみましょう。

GridDBサーバ(ターミナル1)を起動する

最初に、GridDBを実行します。以下のドキュメントに従って行います。https://docs.griddb.net/gettingstarted/using-apt/

インストールが完了したら、GridDBを起動します。

$ sudo systemctl start gridstore

このターミナルを開いておき、gs_shを実行することで、ターミナルを通してGridDBと対話することができるようになります。そうするためには、

$ sudo su gsadm
$ gs_sh
gs> 

Kafka の準備

次に、これを動作させるためには、Kafkaをインストールした最初のセクションで述べたように、kafkaディレクトリをパスに追加することも必要です。ここでもう一度、そのコマンドを紹介します。

$ export PATH=$PATH:/path/to/kafka_2.13-3.2.1/bin

Kafkaを起動する(ターミナル2、3)

ターミナル2でkafkaのzookeeperとサーバーを起動してみます。kafka_2.13-3.2.1 ディレクトリから、

$ zookeeper-server-start.sh config/zookeeper.properties

そして、別の端末ターミナル(3)で前と同じくkafka_2.13-3.2.1のディレクトリから、

$ kafka-server-start.sh config/server.properties

これでKafkaの準備はほぼ整いました。続けて、4つ目のターミナルを開いてください。

GridDB Kafka コネクタを実行する(ターミナル4)

それでは、いよいよGridDBのシンクコネクタとソースコネクタを同じターミナルで1つのコマンドで立ち上げてみましょう。

kafkaディレクトリから実行します。

 
$ connect-standalone.sh config/connect-standalone.properties PATH_TO_KAFKA/config/griddb-sink.properties PATH_TO_KAFKA/config/griddb-source.properties

GridDB Sink コネクタを使用する

先に説明したように、SINKコネクタはKafkaトピックから直接GridDBにデータを引き込みます。このセクションの動作を理解するために、このことを覚えておき、SOURCEコネクタのセクションでもう一度思い出してください。このセクションでは、バッチインジェストを行い、その後SINKを使用したライブインジェストを紹介する予定です。

バッチを取り込む(Sinkコネクタ)

さて、Sinkコネクタを起動したところで、バッチの使用状況を見てみましょう。まず、いくつかのデータを動かしてみます。

シミュレーションしたセンサデータを設定する

Kafkaはトピックをデータペイロードとして動作させるため、いくつかのサンプルトピックを用意し、何が起こっているのかを確実に把握できるようにしました。では、次に進む前に、同梱のスクリプトと.txtファイルを使用して、いくつかのトピックを作成してみましょう。

Kafkaトピックを作成する(スクリプトを使用)

この部分に含まれるファイルを使用するには、Blogsフォルダからscript_sink.shsimulate_sensor.txtをkafkaのルートフォルダにコピーしてください。

実際の例では、センサーが個別にKafkaトピックを設定し、Kafkaサーバーがそれを拾ってGridDBに送信するのですが、今回はデモなので、bashスクリプトと.txtファイルを使ってトピック生成部分を簡単にシミュレートすることにします。

以下は、simulate_sensor.txtファイルの内容です。

2020-07-12 00:01:34.735 device7 0.0028400886071015706 76.0 false 0.005114383400977071 false 0.013274836704851536 19.700000762939453
2020-07-12 00:02:02.785 device8 0.0029050147565559603 75.80000305175781 false 0.005198697479294309 false 0.013508733329556249 19.700000762939453
2020-07-12 00:02:11.476 device9 0.0029381156266604295 75.80000305175781 false 0.005241481841731117 false 0.013627521132019194 19.700000762939453
2020-07-12 00:02:15.289 device10 0.0028400886071015706 76.0 false 0.005114383400977071 false 0.013274836704851536 19.700000762939453
2020-07-12 00:02:19.641 device7 0.0028400886071015706 76.0 false 0.005114383400977071 false 0.013274836704851536 19.799999237060547
2020-07-12 00:02:28.818 device8 0.0029050147565559603 75.9000015258789 false 0.005198697479294309 false 0.013508733329556249 19.700000762939453
2020-07-12 00:02:33.172 device9 0.0028400886071015706 76.0 false 0.005114383400977071 false 0.013274836704851536 19.799999237060547
2020-07-12 00:02:39.145 device10 0.002872341154862943 76.0 false 0.005156332935627952 false 0.013391176782176004 19.799999237060547
2020-07-12 00:02:47.256 device7 0.0029050147565559603 75.9000015258789 false 0.005198697479294309 false 0.013508733329556249 19.700000762939453

これがGridDBに公開することを目的としたデータです。

次に、script_sink.shというbashスクリプトの内容を見てみましょう。

#!/bin/bash

function echo_payload {
    echo '{"payload": {"ts": "'$1 $2'","sensor": "'$3'","co": '$4',"humidity": '$5',"light": "'$6'","lpg": '$7',"motion": "'$8'","smoke": '$9',"temp": '${10}'},"schema": {"fields": [{"field": "ts","optional": false,"type": "string"},{"field": "sensor","optional": false,"type": "string"},{"field": "co","optional": false,"type": "double"},{"field": "humidity","optional": false,"type": "double"},{"field": "light","optional": false,"type": "boolean"},{"field": "lpg","optional": false,"type": "double"},{"field": "motion","optional": false,"type": "boolean"},{"field": "smoke","optional": false,"type": "double"},{"field": "temp","optional": false,"type": "double"}],"name": "iot","optional": false,"type": "struct"}}'
}

TOPICS=()

for file in `find $1 -name \*simulate_sensor.txt` ; do
    echo $file
    head -10 $file |while read -r line ; do
        SENSOR=`echo ${line} | awk '{ print $3 }'`
        if [[ ! " ${TOPICS[@]} " =~ " ${SENSOR} " ]]; then
            echo Creating topic ${SENSOR}
            kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic  ${SENSOR} 2&>1 /dev/null
            TOPICS+=(${SENSOR})
        fi
        echo_payload ${line} | kafka-console-producer.sh --topic ${SENSOR} --bootstrap-server localhost:9092
    done
done

このスクリプトは、生データのテキストファイルを読み込み、データでトピックを生成し、適切なkafkaプロセスに送信します。

つまり、1つのステップでトピック(device7、device8、device9、device10)を作成し、さらにそこにいくつかのペイロードデータを送信して、それらを使えるようにしています。

スクリプトファイルに適切なパーミッションを追加し、スクリプトを実行してkafkaプロセスにフィードすることができるようになりました。これにより、GridDBのシンク・コネクタが利用可能になった時点で、データがキューに入れられ、取り込むことができるようになります。

$ chmod +x script_sink.sh
$ ./script_sink.sh
Creating topic device7
Creating topic device8
Creating topic device9
Creating topic device10

このようにKafkaを使う場合、基本的にはKafkaのトピックにペイロードを送って溜めておき、Kafkaが使えるようになったら(あるいはこの場合はKafkaのプロセスを実行したら)、トピックを受け取ってそのままGridDBにプッシュします。

このコマンドで生成される大量の出力から、GridDBにトピックが配置されているものが確認できるはずです。

Put records to GridDB with number records 9 (com.github.griddb.kafka.connect.sink.GriddbSinkTask:54)

トピックが不正な形で終了し、それを修正することができない場合は、トピックを削除してプロセスを再開することができます。

$ kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic device7

単体での使用(Sinkコネクタ)

次に、トピック作成後、GridDBシンクを起動した後に、ペイロードを送信してみましょう。目的は、GridDBに挿入されるペイロードをライブで見せることです。Sinkを起動したまま、送信するペイロードを作成してみます。

まず、もしあなたの /griddb-kafka-connect/config/griddb-sink.properties が私のものと一致するならば(つまり、トピックセクションに「明示的」なコンテナ名を使用している)、トピック部分を更新する必要があります。例えば device30 をトピックとして追加したい場合は、シンクの設定ファイルにそれを含めてから、コネクタを再実行します。

新しいトピックを作ることをコネクタに知らせたら、実際にトピックを作るためのコマンドを実行してみましょう。

kafka-console-producer.sh --topic device30 --bootstrap-server 127.0.0.1:9092

そして、Producerはそこに座って、送信する新しいペイロードを待ちます。これで、ペイロードを送信し、実行中のGridDB Sinkがデータを受信したかどうかを確認することができます。

> { "payload": { "ts": "2022-07-12 08:01:34.126", "sensor": "device8", "co": 0.0028400886071015706, "humidity": 76.0, "light": "false", "lpg": 0.005114383400977071, "motion": "false", "smoke": 0.013274836704851536, "temp": 19.700000762939453 }, "schema": { "fields": [ { "field": "ts", "optional": false, "type": "string" }, { "field": "sensor", "optional": false, "type": "string" }, { "field": "co", "optional": false, "type": "double" }, { "field": "humidity", "optional": false, "type": "double" }, { "field": "light", "optional": false, "type": "boolean" }, { "field": "lpg", "optional": false, "type": "double" }, { "field": "motion", "optional": false, "type": "boolean" }, { "field": "smoke", "optional": false, "type": "double" }, { "field": "temp", "optional": false, "type": "double" } ], "name": "iot", "optional": false, "type": "struct" } }

これを送ると、実行中のGridDB Sinkでは、トピックの変更を受信して、そのままGridDBに登録されます。

[2022-11-18 17:40:07,168] INFO [griddb-kafka-sink|task-0] Put 1 record to buffer of container device30 (com.github.griddb.kafka.connect.sink.GriddbBufferedRecords:75)
[2022-11-18 17:40:07,169] INFO [griddb-kafka-sink|task-0] Get Container info of container device30 (com.github.griddb.kafka.connect.dialect.GriddbDatabaseDialect:130)
[2022-11-18 17:40:07,201] INFO [griddb-kafka-sink|task-0] Get Container info of container device30 (com.github.griddb.kafka.connect.dialect.GriddbDatabaseDialect:130)

GridDBソースコネクタを実行する

Source ConnectorはSink Connectorの逆で、GridDBからデータを取り込み、Kafkaトピックに配信します。このデモでは、すでに3つのかなり大きなGridDBコンテナ(device1, device2, device3)があり、PostgreSQLからインポートされた Kaggleのデータが入っています。以前のブログを参照すれば、このデータを取り込むことができます。

最初の方の手順で、Source Connectorの設定ファイルを編集しているときに、これらの特定のコンテナから取得したいことを明示的に述べました。Source Connectorを実行すると、KafkaがGridDB Databaseから関連するすべてのデータをトピックに取り込むというものです。

バッチの使用状況(ソースコネクタ)

成功すると、これを実行した後の出力に次のようなものが表示されます。

[2022-09-23 18:51:44,262] INFO [griddb-kafka-source|task-0] Get Container info of container device1 (com.github.griddb.kafka.connect.dialect.GriddbDatabaseDialect:130)
[2022-09-23 18:51:44,276] INFO [griddb-kafka-source|task-0] Get Container info of container device2 (com.github.griddb.kafka.connect.dialect.GriddbDatabaseDialect:130)
[2022-09-23 18:51:44,277] INFO [griddb-kafka-source|task-0] Get Container info of container device3 (com.github.griddb.kafka.connect.dialect.GriddbDatabaseDialect:130)

READINGセクションで検証しますが、Kafkaトピックには、これらのコンテナとそのコンテンツがKafkaにあるはずです。Kafkaはデータベースから直接データを取り込むだけなので、実質的にはソースを使ったバッチ処理の範囲です。

ライブで取り込む(ソースコネクタ)

では、上記で行ったことのライブでできるかを試してみましょう。プロセスを実行したまま、GridDBにデータを挿入して、トピックが更新されるかどうかを確認します。

Kafkaトピックから直接読み込むターミナルを起動したままにしておきます。そして、コンソールのコンシューマーが読み込んでいるコンテナに直接データを入力してみると、ライブ更新されるのが確認できます。つまり、コンテナにデータを挿入するには、Pythonスクリプトやシェルを使います。

pythonを使って、以下のようにします。

import griddb_python as griddb
from datetime import datetime, timedelta

factory = griddb.StoreFactory.get_instance()
DB_HOST = "127.0.0.1:10001"
DB_CLUSTER = "myCluster"
DB_USER = "admin"
DB_PASS = "admin"

try:
    # (1) Connect to GridDB
    # Fixed list method
    gridstore = factory.get_store(
        notification_member=DB_HOST, cluster_name=DB_CLUSTER, username=DB_USER, password=DB_PASS)

    # (2) Create a timeseries container - define the schema
    conInfo = griddb.ContainerInfo(name="device4",
                                   column_info_list=[["ts", griddb.Type.TIMESTAMP],
                                                     ["co", griddb.Type.DOUBLE],
                                                     ["humidity", griddb.Type.DOUBLE],
                                                     ["light", griddb.Type.BOOL],
                                                     ["lpg", griddb.Type.DOUBLE],
                                                     ["motion", griddb.Type.BOOL],
                                                     ["smoke", griddb.Type.DOUBLE],
                                                     ["temperature", griddb.Type.DOUBLE]],
                                   type=griddb.ContainerType.TIME_SERIES)
    # Create the container
    ts = gridstore.put_container(conInfo)
    print(conInfo.name, "container succesfully created")

    now = datetime.utcnow()

    device4 = gridstore.get_container("device4")
    if device4 == None:
        print("ERROR Container not found.")

    device4.put([now, 0.004978, 51.0, True, 0.00764837, True, 0.0204566, 55.2])

except griddb.GSException as e:
    for i in range(e.get_error_stack_size()):
        print("[", i, "]")
        print(e.get_error_code(i))
        print(e.get_location(i))
        print(e.get_message(i))

データを挿入すると、ライブロードになります。

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"ts"},{"type":"double","optional":true,"field":"co"},{"type":"double","optional":true,"field":"humidity"},{"type":"boolean","optional":true,"field":"light"},{"type":"double","optional":true,"field":"lpg"},{"type":"boolean","optional":true,"field":"motion"},{"type":"double","optional":true,"field":"smoke"},{"type":"double","optional":true,"field":"temperature"}],"optional":false,"name":"device4"},"payload":{"ts":1664308679012,"co":0.004978,"humidity":51.0,"light":true,"lpg":0.00764837,"motion":true,"smoke":0.0204566,"temperature":55.2}}

または、GridDB CLIを使用することもできます。

$ gs_sh
  gs> putrow device4 2022-09-30T12:30:01.234Z 0.003551 22.0 False 0.00754352 False 0.0232432 33.3
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"ts"},{"type":"double","optional":true,"field":"co"},{"type":"double","optional":true,"field":"humidity"},{"type":"boolean","optional":true,"field":"light"},{"type":"double","optional":true,"field":"lpg"},{"type":"boolean","optional":true,"field":"motion"},{"type":"double","optional":true,"field":"smoke"},{"type":"double","optional":true,"field":"temperature"}],"optional":false,"name":"device4"},"payload":{"ts":1664308679229,"co":0.003551,"humidity":22.0,"light":false,"lpg":0.00754352,"motion":false,"smoke":0.0232432,"temperature":34.3}}

データを読み込む

次に、移動するデータの読み込みについて見てみましょう。

GridDBからバッチで読み込む

GridDBから取り出したデータを(Sourceコネクタ経由で)Kafkaのトピックに読み込みます。

$ kafka-topics.sh --list --bootstrap-server localhost:9092
device1
device2
device3
device4

そして、今度は実際にデータを見てみましょう。

$ kafka-console-consumer.sh --topic device4 --from-beginning --bootstrap-server localhost:9092

そして、以下が出力されました。

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"ts"},{"type":"double","optional":true,"field":"co"},{"type":"double","optional":true,"field":"humidity"},{"type":"boolean","optional":true,"field":"light"},{"type":"double","optional":true,"field":"lpg"},{"type":"boolean","optional":true,"field":"motion"},{"type":"double","optional":true,"field":"smoke"},{"type":"double","optional":true,"field":"temp"}],"optional":false,"name":"device2"},"payload":{"ts":1594615046659,"co":0.004940912471056381,"humidity":75.5,"light":false,"lpg":0.007634034459861942,"motion":false,"smoke":0.020363432603022532,"temp":19.399999618530273}}

Processed a total of 2 messages

GridDBからデータをクエリする

また、KafkaトピックからGridDBに挿入されたデータを読み出してみることもできます。GridDB CLIを使って、以下のようにGridDBサーバーに直接問い合わせることができます。

$ sudo su gsadm
$ gs_sh
gs[public]> sql select * from device7;
3 results. (25 ms)
gs[public]> get
ts,sensor,co,humidity,light,lpg,motion,smoke,temp
2020-07-12T00:01:34.735Z,device7,0.0028400886071015706,76.0,false,0.005114383400977071,false,0.013274836704851536,19.700000762939453
2020-07-12T00:02:19.641Z,device7,0.0028400886071015706,76.0,false,0.005114383400977071,false,0.013274836704851536,19.799999237060547
2020-07-12T00:02:47.256Z,device7,0.0029050147565559603,75.9000015258789,false,0.005198697479294309,false,0.013508733329556249,19.700000762939453
The 3 results had been acquired.

ここでは例として、KafkaからGridDBへのデータ挿入が成功したかどうかを確認しています。この時点で、各トピック(deviceX)には、同等のGridDBコンテナが存在するはずです。

まとめ

このように、KafkaをGridDBで利用することで、デバイスから直接GridDBデータベースにリアルタイムでデータを取り込んだり、その逆を行ったりすることが非常に便利にできるようになります。ぜひ使ってみてください。

ブログの内容について疑問や質問がある場合は Q&A サイトである Stack Overflow に質問を投稿しましょう。 GridDB 開発者やエンジニアから速やかな回答が得られるようにするためにも "griddb" タグをつけることをお忘れなく。 https://stackoverflow.com/questions/ask?tags=griddb

Leave a Reply

Your email address will not be published. Required fields are marked *