本日の記事では、以前にも取り上げたGridDBとKafkaの組み合わせについて解説します:
- GridDBとKafkaでストリームデータを処理する
- JDBCを使用してGridDBをKafkaのソースとして使用する
- GridDB v5.5、JDBC、およびKafkaを使用したSQLバッチ挿入
- Udemyコース: Apache Kafka、Python、GridDBを使用した動作するIoTプロジェクトの作成
本記事では、GridDBをKafkaのシンクリソースとして使用可能にする新機能に焦点を当てます。これにより、TIME_SERIES
コンテナ(KafkaのトピックからGridDBにタイムシリーズデータを直接プッシュできる)を利用できるようになります(一部の設定が必要です)。v5.6以前では、Collectionコンテナに限定されていました。
前回のブログ記事「GridDBとKafkaを使用したストリームデータ処理」と一部類似点があります。主な違いは、KafkaとGridDBのすべてのコンポーネントをDockerコンテナ化してポータビリティと使いやすさを向上させた点、および前述の通りTime Seriesコンテナを使用する点です。このブログに沿って進めることで、DockerコンテナとKafkaを使用してKafkaからGridDBのTime Seriesコンテナにタイムシリーズデータを直接ストリーム送信する方法を学ぶことができます。
高度な概要
このプロジェクトの動作について簡単に説明します。まず、KafkaとGridDBをDockerコンテナ内で実行し、準備が整ったらPythonスクリプトを実行します。このスクリプトはKafkaのproducer
として機能し、ランダムなデータをbroker
に送信します。このシミュレートされたIoTデータは、Kafkaのキュー(正確には「分散ログ」)に格納され、consumer
が値を読み取るまで待機します。
当プロジェクトでは、GridDBがsink
として機能し、Pythonスクリプトが作成したデータトピックをconsume
し、そのデータをKafkaがトピックのスキームに基づいて作成するテーブルに保存します。
Kafkaトピックの保存方法と場所を適切に通信するため、GridDB Kafka Sinkプロパティファイルを設定する必要があります。まず、GridDB Kafka Connectの最新版(v5.6)を取得し、ビルドし、実行中のKafkaインストールと共有する必要があります。これにより、時系列データを直接時系列コンテナに保存できるようになります。
そのプロパティファイル内で、コンテナタイプをtime_series
に設定し、その他の重要な詳細も設定する必要があります。
開始手順
このプロジェクトを実行する方法について説明します。
事前準備
このブログをフォローするには、KafkaとGridDBを実行するためのDockerとDocker Composeが必要です。また、Kafkaにトピックとして送信するデータを作成するためにPython3がインストールされている必要があります(最終的にGridDBに保存されます)。
さらに、GridDB Kafka ConnectのJARファイルを取得してビルドする必要があります。
GridDB Kafka Connect(オプション)
最新のバージョンはここからダウンロードできます:GridDB-Kafka接続。ビルドするには、maven
がインストールされていることを確認し、以下のコマンドを実行します:
$ mvn clean install
.jar
ファイルは、target
ディレクトリ内に griddb-kafka-connector-0.6.jar
という名前で作成されます。
注: このリポジトリで提供されているソースコードにも jar ファイルが含まれています(次のセクションを参照)。リポジトリをクローンし、Docker Compose でこのプロジェクトを実行する場合、jar ファイルを自分でダウンロード/ビルドする必要はありません。
ソースコード
ソースコードは griddbnet GitHub ページにあります:
$ git clone https://github.com/griddbnet/Blogs.git --branch 7_kafka_timeseries
プロジェクトの実行
ソースコードと Docker がインストールされている場合、次のように実行できます:
$ docker compose pull && docker compose up -d
その後、処理が完了したら、以下のコマンドを実行して、KafkaコネクタにGridDBシンクのプロパティファイルが正しく配置されているかどうかを確認できます。
$ curl http://localhost:8083/connectors/
["griddb-kafka-sink"]
kafka-sink の内容を確認することもできます。
$ curl http://localhost:8083/connectors/griddb-kafka-sink
その作業が完了したら、Python スクリプトを実行できます。このスクリプトは Kafka プロデューサーとして機能します。
$ python3 -m pip install kafka-python
$ python3 scripts/producer.py
GridDB シンクのプロパティ
Kafka やその他のストリーム/イベント駆動型アーキテクチャでは、ソースとシンクという概念はデータのフローの方向を説明するものです。シンクはデータが流入する場所、またはデータが最終的に到達する場所を指します。この場合、私たちはデータペイロードをGridDB内のタイムシリーズコンテナ内にタイムシリーズデータとして永続化したいと考えています。そのため、プロパティファイルを次のように設定します:
connector.class= com.github.griddb.kafka.connect.GriddbSinkConnector
name= griddb-kafka-sink
cluster.name= myCluster
user= admin
password= admin
notification.member= griddb-server=10001
container.type= TIME_SERIES
topics= meter_0,meter_1,meter_2,meter_3
transforms= TimestampConverter
transforms.TimestampConverter.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format= yyyy-MM-dd hh=mm=ss
transforms.TimestampConverter.field= timestamp
transforms.TimestampConverter.target.type= Timestamp
前回の記事と比べて、主な変更点はcontainer.type
の指定とtransforms
プロパティです。transforms
プロパティは、Kafkaクラスターにどの文字列値をタイムスタンプに変換するかを指定し、そのプロセスを支援するためのその他の有用な情報も提供します。その他の値は、ブローカーがデータを送信するトピックの宛先を指定するためのもので、これはホスト名griddb-server
のGridDB Dockerコンテナです。
トピックはデータトピックの名前であり、同時にGridDBのタイムシリーズコンテナの名前にもなります。
Pythonプロデューサースクリプト
ソースコードを読むだけで理解できる内容が多いため、特に説明する必要はありません。唯一追加する点は、DockerコンテナをDocker化したい場合、サーバーの場所をlocalhost
からbroker:9092
に変更する必要がある点です。
#p=KafkaProducer(bootstrap_servers=['localhost:9092'])
p=KafkaProducer(bootstrap_servers=['broker:9092'])
もう一点注意すべき点は、time_series データコンテナを time_series データを行キーとして作成しているにもかかわらず、ペイロードデータフィールドの型を string
に設定する必要がある点です(これは、シンクセクションで transform
プロパティについて説明した際に触れました)。
"schema":
{
"fields": [
{ "field": "timestamp", "optional": False, "type": "string" },
{ "field": "kwh", "optional": False, "type": "double" },
{ "field": "temp", "optional": False, "type": "double" }
],
"name": "iot", "optional": False, "type": "struct"
}
ここでのポイントは、型がstring
であっても、最初のフィールドをターゲットのタイムスタンプ型として設定する必要がある点です。その後、このデータセットのシンクで、transforms.TimestampConverter.field
をタイムスタンプ型に変換したいフィールドの名前として設定します。これらの設定が完了すると、KafkaとGridDBは設定したスキーマと適切なコンテナタイプでテーブルを作成します。
Docker コンテナで Kafka を実行する
以前の Kafka に関する記事では、Kafka と GridDB をベアメタル上で実行しました。つまり、CLI 経由でコマンドを実行してサーバーを起動するだけのシンプルな方法でした。この方法は機能しましたが、3~4つのターミナルを開き、手順を覚えて実行する必要があるため、やや複雑でした。この記事では、2~3つのコマンドでダウンロードと実行をすべて行えるDocker Composeファイルを用意しました!
Confluent Dockerコンテナ
まず、Confluentが提供するDockerイメージについて説明します。Confluentは、大規模企業向けのKafkaに関するサポートとツールを提供する企業です。ただし、彼らはDockerイメージを無料で提供しており、この記事のDocker Composeファイルで使用します。
Docker Composeの主な機能は、複数の「サービス」(Dockerコンテナ)をセットとして作成し、簡単なコマンドで同時に実行できるようにすることです。各コンテナが依存するルールを設定できます。例えば、Kafkaコンテナ同士を依存関係に設定することで、正しい順序で起動するようにできます。
この方法を選択した理由は、上述の通りKafkaを実行するプロセスが複雑で、多くの異なるコンポーネントを実行する必要があるためです。例えば、PythonスクリプトからデータをKafkaトピックに送信し、GridDbに格納するこのシンプルなプロジェクトでも、Docker Composeファイルに5つのサービスが必要です。
Docker Compose サービス
以下のすべてがサービスです。
- GridDB
- Kafka Zookeeper
- Kafka Broker
- Kafka スキーマレジストリ
- Kafka-Connect
また、省略しましたが、Kafka データプロデューサーを含むことも可能です。
Kafka Zookeeper は、Kafka の脳や主要なコンポーネントと考えることができます。Broker はデータトピックを処理するサービスで、フェイルセーフのためなどに複数のブローカーを実行することが一般的です。データトピックの生産者を Kafka に接続する際は、ブローカーを指します。
Kafka スキーマレジストリは、トピックで使用されるスキーマを強制します。当社のケースでは、Python プロデューサから送信されるデータペイロードの JSON スキーマのデシリアライズに役立ちます。
Kafka Connect コンテナには、Kafka と組み合わせて使用するサードパーティライブラリを追加します:GridDB Kafka Connect JAR ファイルと GridDB シンクプロパティファイル。Connectコンテナは少し特殊で、まずコンテナが起動していることを確認し、その後GridDBシンクプロパティの指示を含むJSONファイルをプッシュする必要があります。ただし、GridDB Kafka Connect JARファイルは、Dockerイメージの起動時にファイルシステムにプッシュします。
Docker Compose 設定
GridDBには特別な設定はありません:単にgriddbnetからイメージをプルし、いくつかの環境変数を設定するだけです:
griddb-server:
image: 'griddbnet/griddb:5.6.0'
container_name: griddb-server
expose:
- '10001'
- '10010'
- '10020'
- '10040'
- '20001'
- '41999'
environment:
NOTIFICATION_MEMBER: 1
GRIDDB_CLUSTER_NAME: myCluster
動物園の飼育員も同じような状況です:
zookeeper:
image: 'confluentinc/cp-zookeeper:7.3.0'
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ブローカーはポート9092を公開しているため、Docker Composeのネットワーク環境外からPythonのプロデューサースクリプトを実行できます(localhost:9092を指すだけです)。ZooKeeperやその他のクラスタ設定を指すための追加の環境変数も必要です。
broker:
image: 'confluentinc/cp-kafka:7.3.0'
container_name: broker
ports:
- '9092:9092'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092,PLAINTEXT_INTERNAL://broker:29092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
また、ブローカー、スキーマレジストリ、Kafka Connectがすべて「ZooKeeperに依存している」ことに気づくでしょう。これにより、全体的なオペレーションの責任者が明確になります。
kafka-schema-registry:
image: 'confluentinc/cp-schema-registry:7.3.0'
hostname: kafka-schema-registry
container_name: kafka-schema-registry
ports:
- '8082:8082'
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://broker:9092'
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8082'
depends_on:
- zookeeper
Kafka ConnectはConfluent Docker Hubからコンテナをプルし、多数の環境変数を設定します。さらに、ホストマシンと共有ファイルシステムを持つボリュームも含まれており、これによりGridDB Kafka ConnectのJARファイルを共有できます。最後に、サービスの最下部にスクリプトを配置し、Kafka ConnectのHTTPエンドポイントが利用可能になるまで待機する処理を実行します。200ステータスコードが返ってきたら、GridDB-Sinkプロパティファイルを送信するスクリプトを実行できます。
kafka-connect:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect
ports:
- '8083:8083'
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: device
CONNECT_CONFIG_STORAGE_TOPIC: device-config
CONNECT_OFFSET_STORAGE_TOPIC: device-offsets
CONNECT_STATUS_STORAGE_TOPIC: device-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: true
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: true
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8082'
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8082'
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m (%c:%L)%n'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_PLUGIN_PATH: >-
/usr/share/java,/etc/kafka-connect/jars
CLASSPATH: >-
/usr/share/java,/etc/kafka-connect/jars
volumes:
- './scripts:/scripts'
- './kafka-connect/connectors:/etc/kafka-connect/jars/'
depends_on:
- zookeeper
- broker
- kafka-schema-registry
- griddb-server
command:
- bash
- '-c'
- >
/etc/confluent/docker/run &
echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
while [ $$(curl -s -o /dev/null -w %{http_code}
http://kafka-connect:8083/connectors) -eq 000 ] ; do
echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) " (waiting for 200)"
sleep 5
done
nc -vz kafka-connect 8083
echo -e "\n--\n+> Creating Kafka Connect GridDB sink"
/scripts/create-griddb-sink.sh &&
/scripts/example-sink.sh
sleep infinity
このプロパティファイルは、Kafkaに対して、ブローカーが特定の名前を持つトピックを受信した際に、そのトピックをプロパティファイル内の指示に従って送信するように明示的に指示します。この場合、その指示は当社のGridDBコンテナを指します。
結論
プロデューサーを実行すると、GridDB CLI を使用して Docker GridDB サーバー内のすべてのデータを表示できます: $ docker exec -it griddb-server gs_sh
。
これで、Kafka 経由で IoT のようなセンサーデータを GridDB タイムシリーズ コンテナに送信する作業が完了しました。
ブログの内容について疑問や質問がある場合は Q&A サイトである Stack Overflow に質問を投稿しましょう。 GridDB 開発者やエンジニアから速やかな回答が得られるようにするためにも "griddb" タグをつけることをお忘れなく。 https://stackoverflow.com/questions/ask?tags=griddb