GridDB v5.5のリリースで、GridDBはSQLバッチインサートを追加しました。このリリースは多くの点で素晴らしいですが、非常に明確な利点は、汎用のKafka JDBCコネクタをすぐにGridDBに接続できることです。このリリース以前は、一度に1つのKafkaメッセージしかインサートできませんでしたが、今では一度に最大1,000データポイントまでバッチ更新できます。
この記事では、GridDBとKafkaのセットアップについて説明します。このトピックは以前にも取り上げているので、こちらもご参照いただきたい: データインジェスト。今回はバッチサイズを1,000に設定します。
注意点として、公式のGridDB Kafka Connectorを汎用のJDBCコネクタとして動作するように少し変更する必要がありました。そのためのレポは、https://github.com/Imisrael/jdbc/tree/master。
このプロジェクトを立ち上げるには、kafka (broker)、kafka-connect、kafka (zookeeper)、kafka schema registry、GridDBをインストール/セットアップする必要があります。また、kafka-connect サービスを使って、汎用 JDBC ライブラリと GridDB JDBC ライブラリを追加します。次に、Kafkaインスタンスがどのようなデータを扱っているのか、そしてそのデータがどこに行くのか(JDBC経由でGridDBへ)を知るために、Kafka Sinkプロパティを設定する必要があります。そして最後に、Kafkaトピックを作成し、そこにシミュレートしたデータをプッシュし、GridDBにデータが流れることを期待します。
プロジェクトの実行方法
このプロジェクトを実行したい場合は、docker composeを使って、関連するすべてのkafkaサービスを実行することができます。これらのコンテナには、それぞれのライブラリファイルも同梱されています。これらのコンテナには、それぞれのライブラリファイルもバンドルされています。
ソースコード
ソースコードは GitHub:https://github.com/griddbnet/Blogs にあります。
$ git clone https://github.com/griddbnet/Blogs.git --branch jdbc_sink
また、GridDB JDBC Connectorを独自にビルドする必要があるかもしれません。このリポジトリをクローンしてください: https://github.com/Imisrael/jdbc/tree/masterをクローンし、ソースからjarをビルドします。ライブラリファイルを入手したら、このプロジェクトのビルドプロセスの一部としてインクルードすることができます。
前提条件
もちろん、kafkaやzookeeperなどを手動でインストールしてこのプロジェクトを実行することもできますが、このプロジェクトはDockerを使ってビルドされているので、このプロジェクトに必要なのはDockerだけです。
Dockerはウェブサイトからダウンロードできます: https://docs.docker.com/get-docker/。
ビルドと実行
Dockerイメージをビルドして起動し、最後に関連するトピックにデータをプッシュして、GridDBサーバ(これもDockerコンテナで動作している)に流します。
$ docker compose build
$ docker compose up -d
そして、すべて(つまりブローカー)が起動したら、pythonスクリプトを実行してkafkaトピックを作成し、そこにデータをプッシュします:
$ cd gateway-sim
$ python3 -m pip install kafka-python
$ python3 kafka_producer.py
うまくいけば、docker GridDBコンテナ内に新しいテーブルが作成されます。
$ docker exec -it griddb-server gs_sh
gs> searchcontainer
そして、これらのテーブルにはパイソンスクリプトによって作成されたデータが入力されるはずです。
概要
上で説明したように、これらの様々なサービスは全て docker compose を使ってビルドされ、実行されています。全てのサービスに単一の compose ファイルを使うことの良い点は、Docker が自動的に全てのサービスを同じ共有ネットワーク上に置いてくれることです。つまり、kafkaブローカー・サービスはすでにzookeeperと通信することができ、その逆も可能です。また、GridDBサーバはポート20001
(SQLポート)で接続できるので、Kafkaのデータを直接GridDBに流すことができます。
docker-compose.yml`ファイルを見て、これらのサービスがどのように開始され、どのイメージから取得され、どのような設定がされているかを確認することができます。ほとんどの場合、Kafkaがここでの重労働のほとんどをこなしていることを知っておく必要があります。Kafkaがデータ・トピックをどこにプッシュするかを知るには、JDBCシンク設定ファイルと呼ばれるものを作成する必要があります。このファイルには、データフローを設定する際に使用するすべてのパラメータが含まれています。では次に、この設定ファイルの作成方法と適用方法を見ていきましょう。
JDBC シンク設定
kafka-connect サービスは、サードパーティの統合 (この場合は JDBC) を処理する役割を担っているので、設定ファイルをそこにプッシュして適用する必要があります。このサービスには REST API が付属しており、JSON ファイルをプッシュすることができます。JSONファイルを受け取ると、その設定ファイルをKafkaサービスに適用します。そして、Kafkaサービスがすべてのデータフローを処理します。
JSONファイルはいつでもプッシュできるが、ここではkafka-connectサービスの準備ができたらJSONファイルをプッシュするように設定します。docker composeのエントリを見てみましょう:
kafka-connect:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect
ports:
- '8083:8083'
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: device
CONNECT_CONFIG_STORAGE_TOPIC: device-config
CONNECT_OFFSET_STORAGE_TOPIC: device-offsets
CONNECT_STATUS_STORAGE_TOPIC: device-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: true
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: true
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8082'
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8082'
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m (%c:%L)%n'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_PLUGIN_PATH: >-
/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components,/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
CLASSPATH: >-
/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components,/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
volumes:
- './scripts:/scripts'
- './kafka-connect/connectors:/etc/kafka-connect/jars/'
depends_on:
- zookeeper
- broker
- kafka-schema-registry
- griddb-server
command:
- bash
- '-c'
- >
/etc/confluent/docker/run &
echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
while [ $$(curl -s -o /dev/null -w %{http_code}
http://kafka-connect:8083/connectors) -eq 000 ] ; do
echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) " (waiting for 200)"
sleep 5
done
nc -vz kafka-connect 8083
echo -e "\n--\n+> Creating Kafka Connect GridDB sink"
/scripts/create-griddb-sink.sh
sleep infinity
このエントリーのコマンドセクションでは、kafka-connect(それ自体!)をチェックし、サービスの準備が整う(HTTPリクエストに対する200レスポンス)のを待っていることがわかります。準備ができたら、HTTPリクエストのボディでjsonオブジェクトを送信するスクリプトを実行します。そのスクリプトは以下のようなものです:
#!/bin/sh
curl -s \
-X "POST" "http://localhost:8083/connectors/" \
-H "Content-Type: application/json" \
-d '{
"name": "test-sink",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics.regex": "meter.(.*)",
"table.name.format": "kafka_${topic}",
"dialect.name": "PostgreSqlDatabaseDialect",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd hh:mm:ss.SSS",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.target.type": "string",
"time.precision.mode": "connect",
"connection.url":"jdbc:gs://griddb-server:20001/myCluster/public",
"connection.user": "admin",
"connection.password": "admin",
"batch.size": "1000",
"auto.create":"true",
"pk.mode" : "none",
"insert.mode": "insert",
"auto.evolve": "true"
}
}'
これは、GridDBサーバ(コンテナ)とKafkaサービスをjdbcで接続するための情報です。接続URL、ユーザ、パスなど、説明不要な項目もあります。ここでは、あまり知られていないオプションについて説明します。
topics.regexでは、Sinkコネクタに購読するトピックを指定します。他の手段でこれらのトピックにデータをプッシュし、Sink コネクタがそのデータを見つけて接続 URL にプッシュすることを想定しています。transforms
に関連するエントリは、メータートピックからタイムスタンプの文字列値を取得し、データベースにプッシュする前に明示的なタイムスタンプデータに変換することに関するものです。
この情報をkafka-connectにプッシュしたら、ポート8083にクエリすることで、そこにあることを確認できます:
$ curl http://localhost:8083/connectors
["test-sink"]
Kafka トピックのデータを生成する
Kafka のシンクコネクタを meter
で始まるトピックに subscribed
することに成功しました。では、データを produce
して、そのデータをトピックに送信してみよう。これは様々な方法で行うことができますが、ここでは単純に 10 個の異なるトピックを作成し、それらのトピックすべてにタイムスタンプデータをプッシュする単純な python スクリプトを使用します。JDBC コネクタはこれらのトピックにサブスクライブしているので、これらのトピックの変更を検出し、最終的にそれを GridDB にプッシュします。
def produce(meterid, usagemodel=None):
time = datetime.datetime.now()-datetime.timedelta(days=100)
register_device(meterid)
base_temp = random.uniform(-10,40)
base_kwh = random.uniform(0,2)
while True:
now = time.strftime('%Y-%m-%d %H:%M:%S.%f')
data= {
"payload":
{
'timestamp': now,
'kwh': base_kwh+random.uniform(-.2, 2),
'temp': base_temp+random.uniform(-5, 5)
},
"schema":
{
"fields": [
{ "field": "timestamp", "optional": False, "type": "string" },
{ "field": "kwh", "optional": False, "type": "double" },
{ "field": "temp", "optional": False, "type": "double" }
],
"name": "iot", "optional": False, "type": "struct"
}
}
time = time + datetime.timedelta(minutes=60)
if time > datetime.datetime.now():
time.sleep(3600)
m=json.dumps(data, indent=4, sort_keys=True, default=str)
p.send("meter_"+str(meterid), m.encode('utf-8'))
print("meter_"+str(meterid), data['payload'])
これはデータを作成し、meter_${num}
というラベルのトピックに送信する関数です。フィールドのエントリーはGridDBにプッシュされるスキーマとなります。
このスクリプトを実行すると、GridDB自体をチェックする前に、以下のようにトピックを設定することができます:
$ docker-compose exec broker kafka-console-consumer --bootstrap-server broker:9092 --topic meter_0 --from-beginning
これで、pythonスクリプトのデータがすべて表示されます。
次に、実際のGridDBインスタンスを確認します:
$ docker exec -it griddb-server gs_sh
gs[public]> searchcontainer
kafka_meter_0
kafka_meter_1
kafka_meter_2
kafka_meter_3
kafka_meter_4
kafka_meter_5
kafka_meter_6
kafka_meter_7
kafka_meter_8
kafka_meter_9
kafka_meters
gs[public]> select * from kafka_meter_0;
2,400 results. (4 ms)
gs[public]> get 10
+-------------------------+--------------------+--------------------+
| timestamp | kwh | temp |
+-------------------------+--------------------+--------------------+
| 2023-11-15 01:43:52.299 | 2.8875713817453637 | 38.9091116816826 |
| 2023-11-15 02:43:52.299 | 1.8928477563702992 | 37.183344440257784 |
| 2023-11-15 03:43:52.299 | 1.3057612343055085 | 37.9251109201419 |
| 2023-11-15 04:43:52.299 | 1.1172883739759085 | 40.43478215590419 |
| 2023-11-15 05:43:52.299 | 1.6667172633034288 | 36.82843364324471 |
| 2023-11-15 06:43:52.299 | 2.5131139241648173 | 38.50469053566042 |
| 2023-11-15 07:43:52.299 | 2.0608077564559095 | 38.62901305523018 |
| 2023-11-15 08:43:52.299 | 2.9945117256967295 | 39.854084974922834 |
| 2023-11-15 09:43:52.299 | 1.8693091828037747 | 41.15482986965948 |
| 2023-11-15 10:43:52.299 | 1.0284230878567477 | 37.05776090626771 |
+-------------------------+--------------------+--------------------+
The 10 results had been acquired.
まとめ
この記事では、JDBCとDockerだけを使ってGridDBサーバーにデータをプッシュするKafkaのセットアップ方法を説明しました。
ブログの内容について疑問や質問がある場合は Q&A サイトである Stack Overflow に質問を投稿しましょう。 GridDB 開発者やエンジニアから速やかな回答が得られるようにするためにも "griddb" タグをつけることをお忘れなく。 https://stackoverflow.com/questions/ask?tags=griddb