Kafka と GridDB の時系列コンテナを組み合わせて使用する方法

本日の記事では、以前にも取り上げたGridDBとKafkaの組み合わせについて解説します:

本記事では、GridDBをKafkaのシンクリソースとして使用可能にする新機能に焦点を当てます。これにより、TIME_SERIESコンテナ(KafkaのトピックからGridDBにタイムシリーズデータを直接プッシュできる)を利用できるようになります(一部の設定が必要です)。v5.6以前では、Collectionコンテナに限定されていました。

前回のブログ記事「GridDBとKafkaを使用したストリームデータ処理」と一部類似点があります。主な違いは、KafkaとGridDBのすべてのコンポーネントをDockerコンテナ化してポータビリティと使いやすさを向上させた点、および前述の通りTime Seriesコンテナを使用する点です。このブログに沿って進めることで、DockerコンテナとKafkaを使用してKafkaからGridDBのTime Seriesコンテナにタイムシリーズデータを直接ストリーム送信する方法を学ぶことができます。

高度な概要

このプロジェクトの動作について簡単に説明します。まず、KafkaとGridDBをDockerコンテナ内で実行し、準備が整ったらPythonスクリプトを実行します。このスクリプトはKafkaのproducerとして機能し、ランダムなデータをbrokerに送信します。このシミュレートされたIoTデータは、Kafkaのキュー(正確には「分散ログ」)に格納され、consumerが値を読み取るまで待機します。

当プロジェクトでは、GridDBがsinkとして機能し、Pythonスクリプトが作成したデータトピックをconsumeし、そのデータをKafkaがトピックのスキームに基づいて作成するテーブルに保存します。

Kafkaトピックの保存方法と場所を適切に通信するため、GridDB Kafka Sinkプロパティファイルを設定する必要があります。まず、GridDB Kafka Connectの最新版(v5.6)を取得し、ビルドし、実行中のKafkaインストールと共有する必要があります。これにより、時系列データを直接時系列コンテナに保存できるようになります。

そのプロパティファイル内で、コンテナタイプをtime_seriesに設定し、その他の重要な詳細も設定する必要があります。

開始手順

このプロジェクトを実行する方法について説明します。

事前準備

このブログをフォローするには、KafkaとGridDBを実行するためのDockerとDocker Composeが必要です。また、Kafkaにトピックとして送信するデータを作成するためにPython3がインストールされている必要があります(最終的にGridDBに保存されます)。

さらに、GridDB Kafka ConnectのJARファイルを取得してビルドする必要があります。

GridDB Kafka Connect(オプション)

最新のバージョンはここからダウンロードできます:GridDB-Kafka接続。ビルドするには、mavenがインストールされていることを確認し、以下のコマンドを実行します:

$ mvn clean install

.jar ファイルは、target ディレクトリ内に griddb-kafka-connector-0.6.jar という名前で作成されます。

注: このリポジトリで提供されているソースコードにも jar ファイルが含まれています(次のセクションを参照)。リポジトリをクローンし、Docker Compose でこのプロジェクトを実行する場合、jar ファイルを自分でダウンロード/ビルドする必要はありません。

ソースコード

ソースコードは griddbnet GitHub ページにあります:

$ git clone https://github.com/griddbnet/Blogs.git --branch 7_kafka_timeseries

プロジェクトの実行

ソースコードと Docker がインストールされている場合、次のように実行できます:

$ docker compose pull && docker compose up -d

その後、処理が完了したら、以下のコマンドを実行して、KafkaコネクタにGridDBシンクのプロパティファイルが正しく配置されているかどうかを確認できます。

$ curl http://localhost:8083/connectors/

["griddb-kafka-sink"]

kafka-sink の内容を確認することもできます。

$ curl http://localhost:8083/connectors/griddb-kafka-sink

その作業が完了したら、Python スクリプトを実行できます。このスクリプトは Kafka プロデューサーとして機能します。

$ python3 -m pip install kafka-python
$ python3 scripts/producer.py

GridDB シンクのプロパティ

Kafka やその他のストリーム/イベント駆動型アーキテクチャでは、ソースとシンクという概念はデータのフローの方向を説明するものです。シンクはデータが流入する場所、またはデータが最終的に到達する場所を指します。この場合、私たちはデータペイロードをGridDB内のタイムシリーズコンテナ内にタイムシリーズデータとして永続化したいと考えています。そのため、プロパティファイルを次のように設定します:

        connector.class= com.github.griddb.kafka.connect.GriddbSinkConnector
        name= griddb-kafka-sink
        cluster.name= myCluster
        user= admin
        password= admin
        notification.member= griddb-server=10001
        container.type= TIME_SERIES
        topics= meter_0,meter_1,meter_2,meter_3
        transforms=  TimestampConverter
        transforms.TimestampConverter.type=  org.apache.kafka.connect.transforms.TimestampConverter$Value
        transforms.TimestampConverter.format=  yyyy-MM-dd hh=mm=ss
        transforms.TimestampConverter.field=  timestamp
        transforms.TimestampConverter.target.type=  Timestamp

前回の記事と比べて、主な変更点はcontainer.typeの指定とtransformsプロパティです。transformsプロパティは、Kafkaクラスターにどの文字列値をタイムスタンプに変換するかを指定し、そのプロセスを支援するためのその他の有用な情報も提供します。その他の値は、ブローカーがデータを送信するトピックの宛先を指定するためのもので、これはホスト名griddb-serverのGridDB Dockerコンテナです。

トピックはデータトピックの名前であり、同時にGridDBのタイムシリーズコンテナの名前にもなります。

Pythonプロデューサースクリプト

ソースコードを読むだけで理解できる内容が多いため、特に説明する必要はありません。唯一追加する点は、DockerコンテナをDocker化したい場合、サーバーの場所をlocalhostからbroker:9092に変更する必要がある点です。

#p=KafkaProducer(bootstrap_servers=['localhost:9092'])
p=KafkaProducer(bootstrap_servers=['broker:9092'])

もう一点注意すべき点は、time_series データコンテナを time_series データを行キーとして作成しているにもかかわらず、ペイロードデータフィールドの型を string に設定する必要がある点です(これは、シンクセクションで transform プロパティについて説明した際に触れました)。

"schema": 
{
    "fields": [ 
        { "field": "timestamp", "optional": False, "type": "string" },
        { "field": "kwh", "optional": False, "type": "double" }, 
        { "field": "temp", "optional": False, "type": "double" } 
    ], 
    "name": "iot", "optional": False, "type": "struct" 
}   

ここでのポイントは、型がstringであっても、最初のフィールドをターゲットのタイムスタンプ型として設定する必要がある点です。その後、このデータセットのシンクで、transforms.TimestampConverter.fieldをタイムスタンプ型に変換したいフィールドの名前として設定します。これらの設定が完了すると、KafkaとGridDBは設定したスキーマと適切なコンテナタイプでテーブルを作成します。

Docker コンテナで Kafka を実行する

以前の Kafka に関する記事では、Kafka と GridDB をベアメタル上で実行しました。つまり、CLI 経由でコマンドを実行してサーバーを起動するだけのシンプルな方法でした。この方法は機能しましたが、3~4つのターミナルを開き、手順を覚えて実行する必要があるため、やや複雑でした。この記事では、2~3つのコマンドでダウンロードと実行をすべて行えるDocker Composeファイルを用意しました!

Confluent Dockerコンテナ

まず、Confluentが提供するDockerイメージについて説明します。Confluentは、大規模企業向けのKafkaに関するサポートとツールを提供する企業です。ただし、彼らはDockerイメージを無料で提供しており、この記事のDocker Composeファイルで使用します。

Docker Composeの主な機能は、複数の「サービス」(Dockerコンテナ)をセットとして作成し、簡単なコマンドで同時に実行できるようにすることです。各コンテナが依存するルールを設定できます。例えば、Kafkaコンテナ同士を依存関係に設定することで、正しい順序で起動するようにできます。

この方法を選択した理由は、上述の通りKafkaを実行するプロセスが複雑で、多くの異なるコンポーネントを実行する必要があるためです。例えば、PythonスクリプトからデータをKafkaトピックに送信し、GridDbに格納するこのシンプルなプロジェクトでも、Docker Composeファイルに5つのサービスが必要です。

Docker Compose サービス

以下のすべてがサービスです。

  • GridDB
  • Kafka Zookeeper
  • Kafka Broker
  • Kafka スキーマレジストリ
  • Kafka-Connect

また、省略しましたが、Kafka データプロデューサーを含むことも可能です。

Kafka Zookeeper は、Kafka の脳や主要なコンポーネントと考えることができます。Broker はデータトピックを処理するサービスで、フェイルセーフのためなどに複数のブローカーを実行することが一般的です。データトピックの生産者を Kafka に接続する際は、ブローカーを指します。

Kafka スキーマレジストリは、トピックで使用されるスキーマを強制します。当社のケースでは、Python プロデューサから送信されるデータペイロードの JSON スキーマのデシリアライズに役立ちます。

Kafka Connect コンテナには、Kafka と組み合わせて使用するサードパーティライブラリを追加します:GridDB Kafka Connect JAR ファイルと GridDB シンクプロパティファイル。Connectコンテナは少し特殊で、まずコンテナが起動していることを確認し、その後GridDBシンクプロパティの指示を含むJSONファイルをプッシュする必要があります。ただし、GridDB Kafka Connect JARファイルは、Dockerイメージの起動時にファイルシステムにプッシュします。

Docker Compose 設定

GridDBには特別な設定はありません:単にgriddbnetからイメージをプルし、いくつかの環境変数を設定するだけです:

  griddb-server:
    image: 'griddbnet/griddb:5.6.0'
    container_name: griddb-server
    expose:
      - '10001'
      - '10010'
      - '10020'
      - '10040'
      - '20001'
      - '41999'
    environment:
      NOTIFICATION_MEMBER: 1
      GRIDDB_CLUSTER_NAME: myCluster

動物園の飼育員も同じような状況です:

  zookeeper:
    image: 'confluentinc/cp-zookeeper:7.3.0'
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

ブローカーはポート9092を公開しているため、Docker Composeのネットワーク環境外からPythonのプロデューサースクリプトを実行できます(localhost:9092を指すだけです)。ZooKeeperやその他のクラスタ設定を指すための追加の環境変数も必要です。

  broker:
    image: 'confluentinc/cp-kafka:7.3.0'
    container_name: broker
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092,PLAINTEXT_INTERNAL://broker:29092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

また、ブローカー、スキーマレジストリ、Kafka Connectがすべて「ZooKeeperに依存している」ことに気づくでしょう。これにより、全体的なオペレーションの責任者が明確になります。

  kafka-schema-registry:
    image: 'confluentinc/cp-schema-registry:7.3.0'
    hostname: kafka-schema-registry
    container_name: kafka-schema-registry
    ports:
      - '8082:8082'
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://broker:9092'
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8082'
    depends_on:
      - zookeeper

Kafka ConnectはConfluent Docker Hubからコンテナをプルし、多数の環境変数を設定します。さらに、ホストマシンと共有ファイルシステムを持つボリュームも含まれており、これによりGridDB Kafka ConnectのJARファイルを共有できます。最後に、サービスの最下部にスクリプトを配置し、Kafka ConnectのHTTPエンドポイントが利用可能になるまで待機する処理を実行します。200ステータスコードが返ってきたら、GridDB-Sinkプロパティファイルを送信するスクリプトを実行できます。

  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect
    ports:
      - '8083:8083'
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: device
      CONNECT_CONFIG_STORAGE_TOPIC: device-config
      CONNECT_OFFSET_STORAGE_TOPIC: device-offsets
      CONNECT_STATUS_STORAGE_TOPIC: device-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: true
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: true
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8082'
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8082'
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m (%c:%L)%n'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_PLUGIN_PATH: >-
        /usr/share/java,/etc/kafka-connect/jars
      CLASSPATH: >-
        /usr/share/java,/etc/kafka-connect/jars
    volumes:
      - './scripts:/scripts'
      - './kafka-connect/connectors:/etc/kafka-connect/jars/'
      
    depends_on:
      - zookeeper
      - broker
      - kafka-schema-registry
      - griddb-server
    command:
      - bash
      - '-c'
      - >
        /etc/confluent/docker/run & 

        echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"

        while [ $$(curl -s -o /dev/null -w %{http_code}
        http://kafka-connect:8083/connectors) -eq 000 ] ; do 
          echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) " (waiting for 200)"
          sleep 5 
        done

        nc -vz kafka-connect 8083

        echo -e "\n--\n+> Creating Kafka Connect GridDB sink"

        /scripts/create-griddb-sink.sh &&
        /scripts/example-sink.sh

        sleep infinity 

このプロパティファイルは、Kafkaに対して、ブローカーが特定の名前を持つトピックを受信した際に、そのトピックをプロパティファイル内の指示に従って送信するように明示的に指示します。この場合、その指示は当社のGridDBコンテナを指します。

結論

プロデューサーを実行すると、GridDB CLI を使用して Docker GridDB サーバー内のすべてのデータを表示できます: $ docker exec -it griddb-server gs_sh

これで、Kafka 経由で IoT のようなセンサーデータを GridDB タイムシリーズ コンテナに送信する作業が完了しました。

ブログの内容について疑問や質問がある場合は Q&A サイトである Stack Overflow に質問を投稿しましょう。 GridDB 開発者やエンジニアから速やかな回答が得られるようにするためにも "griddb" タグをつけることをお忘れなく。 https://stackoverflow.com/questions/ask?tags=griddb

Leave a Reply

Your email address will not be published. Required fields are marked *