FastAPIとGridDBによる気象IoTのデータパイプライン構築

本日のブログ記事では、FastAPIとGridDBを使用して、気象IoTプロジェクトのための堅牢なデータパイプラインを構築する方法について、実践的なガイドをご紹介します。

この記事を読み終える頃には、基本的なデータパイプラインアーキテクチャの主要なコンポーネントについて学び、記事のFastAPI実装例をさらに拡張して自分のユースケースに適用する方法について理解することができます。

議論をわかりやすくするために、このブログエントリでは、主にデータパイプラインの構築とその様々なコンポーネントの統合に焦点を当てています。IoTウェザーステーションのセットアップについては詳しく説明しませんが、ウェザーステーションをデータパイプライン自体のシームレスな一部にするための手順についても説明します。

全てのソースコード

データパイプラインの概要

まず、これから実装するデータパイプラインのアーキテクチャ例の全体像を把握しておきましょう。データパイプラインは3つの主要コンポーネントで構成されています。

  • ウェザーステーション:温度、気圧、湿度を監視するためのハードウェアとソフトウェアが含まれています。
  • GridDBサーバ:気象データを保存する時系列データベースを格納します。
  • FastAPI アプリ:APIエンドポイント、ビジネスロジック、データベース接続層、およびデータを処理して返すためのその他のサービスを提供します。

この設定では、FastAPIアプリはウェブからの受信 HTTP リクエストを処理します。これには、センサーをアクティブにするリクエストや、GridDBから保存されたデータをフェッチするリクエストが含まれます。また、FastAPIアプリは、データの解析、検証、および処理など、その他のビジネスロジックやサービスを実行する場所でもあります。

この記事の後半では、ユースケースに適した他のデータパイプラインのデザインについても説明します。今のところ、このデザインを使って、FastAPIとGridDBをうまく統合する方法のソリューションを説明します。

要件と設定

それでは、今回のプロジェクトにおけるすべての要件と、その設定方法について説明します。

ウェザーステーション

ここでは、温度、気圧、湿度の3つの物理量を測定・監視する仮想的なIoTホームウェザーステーションを想定しています。センサーはRaspberry Piに接続されており、Raspberry PiはGridDBサーバーとFastAPIアプリを実行するマシンとしても機能します。

前述の通り、ウェザーステーションの設定方法はここでは説明しません。また、ウェザーステーションのハードウェア設定は、別のものを選択することもできます。

重要なことは、ウェザーステーションをデータパイプラインに適切に統合するためには、センサー用のすべてのライブラリとパッケージが、FastAPIアプリのPythonコードから見える環境にインストールされていることを確認する必要があるということです。

例えば、DHT22 温度・気圧センサーを使用している場合、import Adafruit_DHT コマンドは ImportError を発生させないはずです。

GridDBサーバー

GridDBは、気象IoTシステムのデータベースとして適しています。IoT向けに最適化されており、多くの有用な時系列機能をすぐにサポートしています。

GridDBのセットアップ方法については、Raspberry Pi上でARM用のGridDBと時系列データベースを構築する方法のガイドをご覧ください。また、すぐに起動したい場合は、適切なGridDB Dockerイメージをビルドして実行してください。

今回の例では、GridDBコンテナを使って、センサーデータを適切なカラムに格納します。

  • タイムスタンプ(TIMESTAMP型)
  • 温度(DOUBLE型)
  • 圧力(DOUBLE型)
  • 湿度(DOUBLE型)

GridDBと対話するために、FastAPIアプリはGridDB Pythonクライアントを使用します。GridDB Pythonクライアントは、このガイドを使用して構築およびインストールすることができます。

FastAPIアプリ

FastAPIは、Web APIを迅速に構築するための高性能フレームワークです。使いやすく、他のPythonウェブフレームワークと比較して、FastAPIでアプリケーションを作成すると開発期間が短縮されます。

FastAPIをpipでインストールします。

$ pip install fastapi

また、本番環境でアプリを実行するためのASGIサーバーもインストールします。

$ pip install uvicorn[standard]

FastAPIアプリのさまざまな部分で懸念事項を確実に分離するために、プロジェクトをどのように構成するかを説明します。

    .
    |   .dockerignore
    |   Dockerfile
    |   init_griddb.py
    |   requirements.txt
    |   run.sh
    |   
    \---app
        |   main.py
        |   __init__.py
        |   
        +---config
        |       base.py
        |       __init__.py
        |       
        +---db
        |       connection.py
        |       utils.py
        |       __init__.py
        |       
        +---schema
        |       response.py
        |       __init__.py
        |       
        \---services
                weatherstation.py
                __init__.py

プロジェクトの主要なファイルとディレクトリの概要は以下の通りです。

  • ./init_griddb.py は GridDB コンテナを作成するスタンドアロンの Python スクリプトです。
  • ./run.sh は GridDB の初期化スクリプトを実行し、uvicorn サーバーを実行します。
  • ./app/main.py は、FastAPI アプリのインスタンスとエンドポイントの定義と設定を行います。
  • ./app/config/ は、設定と構成ファイルを含みます。
  • .app/db/ は、データベースアクセスレイヤーです。
  • .app/schema/ は、レスポンスボディが正しいフォーマットであることを保証する pydantic モデルを含みます。
  • .app/services/ は、アプリのビジネスロジック(センサーの起動やデータの取得など)を含みます。

後ほど、これらのモジュールがデータパイプラインで果たす役割について、より詳しくご紹介します。

データパイプライン

FastAPIアプリは、データパイプラインのさまざまなコンポーネントを結びつけます。その方法をご紹介します。

コンフィグレーション

アプリを設定するための値は、./app/config/ディレクトリ内のモジュール内に読み込まれたり、定義されたりします。./app/config/base.pyでは、以下の設定が定義されています。


import os

GRIDDB_CONNECTION_PARAMS = {
    "notification_member": os.environ.get(
        "GRIDDB_NOTIFICATION_MEMBER",
        "griddb-server:10001"
    ),
    "cluster_name": os.environ.get("GRIDDB_CLUSTER_NAME", "defaultCluster"),
    "username": os.environ.get("GRIDDB_USERNAME", "admin"),
    "password": os.environ.get("GRIDDB_PASSWORD", "admin"),
}
GRIDDB_CONTAINER_NAME = os.environ.get(
    "GRIDDB_CONTAINER_NAME",
    "weatherstation"
)
  

注意すべき点は、設定の値が環境変数として渡されることです。それ以外の場合は、デフォルト値が利用できます。これにより、データベースの認証情報などの機密情報をハードコーディングして、誤ってソースコントロールにコミットしてしまうことを防ぐことができます。

また、上記の接続パラメータは、固定リスト方式で GridDB ストアオブジェクトを取得していることを意味してい ることに注意してください。他の方法としては、マルチキャスト方式やプロバイダ方式がありますが、それぞれ異なるパラメータセットが必要です。

ビジネスロジック(サービスレイヤー)

今回の例では,アプリとウェザーステーションとのインタラクションは,ファイル ./app/services/weatherstation.py で定義されています.このファイルでは,すべてのセンサを起動してその測定値を取得する関数 get_sensor_data() が定義されています.この関数は、現在のタイムスタンプ、温度、気圧、湿度を含む辞書を返します。

繰り返しになりますが、ここでは仮想的なウェザーステーションを使用しているだけなので、この関数の実装については詳しく説明しません。ここで重要なのは、./app/services/ディレクトリには、ビジネスロジックのコードが置かれるべきだということです。ここには、データをさらに処理するためのヘルパー関数やユーティリティー関数/クラスなど、他の機能を追加することができます。

GridDBアクセスレイヤー

次に、GridDBの接続とクエリを管理するために必要なメソッドと属性を含むクラスを定義します。このクラスは、./app/db/connection.pyというファイルに記述します。


import griddb_python as griddb

from .utils import handle_exceptions

class GridDBConnection:
    """Convenience class for managing GridDB connection."""

    def __init__(self, connection_params: dict) -> None:
        self.connection_params = connection_params
        self.container_name = None
        self.gridstore = None
        self.container = None

    @handle_exceptions
    def init(self, container_name: str = None) -> None:
        """Sets gridstore and container instance."""
        factory = griddb.StoreFactory.get_instance()
        self.gridstore = factory.get_store(**self.connection_params)
        if container_name is not None:
            self.container_name = container_name
            self.container = self.gridstore.get_container(self.container_name)

    @handle_exceptions
    def create_container(self, container_info: griddb.ContainerInfo) -> None:
        """Creates container with given container info."""
        self.container_name = container_info.name
        self.container = self.gridstore.put_container(container_info)

    @handle_exceptions
    def execute_and_fetch(self, query_stmt: str, as_dict: bool = False) -> list:
        """Executes query on `self.container` and returns results."""
        query = self.container.query(query_stmt)
        row_set = query.fetch()

        results = []
        columns = row_set.get_column_names()

        while row_set.has_next():
            row = row_set.next()

            if as_dict:
                row = dict((k, v) for k, v in zip(columns, row))

            results.append(row)

        return results

    def cleanup(self) -> None:
        """Closes container and store objects."""
        if self.container is not None:
            self.container.close()

        if self.gridstore is not None:
            self.gridstore.close()

このクラスがアプリ全体でどのように使用されているか、以下で説明します。なお、handle_exceptionデコレーターは、./app/db/utils.pyというファイルで定義されています。

GridDBコンテナの初期化

ウェザーステーションデータ用のコンテナを作成するために、コンテナの定義と初期化を行うスタンドアローンのPythonスクリプトを用意しました。


#!/usr/bin/python
import griddb_python as griddb

from app.db.connection import GridDBConnection
from app.config import GRIDDB_CONNECTION_PARAMS, GRIDDB_CONTAINER_NAME

def main() -> None:
    try:
        gdb = GridDBConnection(GRIDDB_CONNECTION_PARAMS)

        print("Initializing GridDB connection")
        gdb.init()

        print(f"Creating container {GRIDDB_CONTAINER_NAME}")
        con_info = griddb.ContainerInfo(
            name=GRIDDB_CONTAINER_NAME,
            column_info_list=[
                ["timestamp", griddb.Type.TIMESTAMP],
                ["temperature", griddb.Type.DOUBLE],
                ["pressure", griddb.Type.DOUBLE],
                ["humidity", griddb.Type.DOUBLE],
            ],
            type=griddb.ContainerType.TIME_SERIES
        )

        gdb.create_container(con_info)

        print('Done')
    except Exception as e:
        print(f"Error initializing GridDB: {e}")

if __name__ == "__main__":
    main()
  

このスクリプトは、uvicornサーバを起動する前に実行する必要があります。代わりに、bashスクリプト ./run.sh を実行することもできます。

GridDB 接続の設定

アプリサーバの起動時に GridDB サーバへの接続を確立し、アプリサーバの停止時に接続を終了する必要があります。また、アプリ全体でGridDBConnectionクラスのインスタンスにアクセスできるようにする必要があります。

これらの要件を満たすために、./app/main.pyの一部を以下のように記述します。


import logging
from typing import Optional

from fastapi import FastAPI, Response, status

from .config import (
    GRIDDB_CONNECTION_PARAMS,
    GRIDDB_CONTAINER_NAME,
)
from .db import GridDBConnection
from .schema import ResponseModel
from .services.weatherstation import get_sensor_data

logger = logging.getLogger("api")

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    logger.info("Server startup")

    logger.info("Initializing GridDB connection")
    gdb = GridDBConnection(GRIDDB_CONNECTION_PARAMS)
    gdb.init(GRIDDB_CONTAINER_NAME)

    app.state.griddb = gdb
    logger.info("Server Startup")

@app.on_event("shutdown")
async def shutdown_event():
    app.state.griddb.cleanup()
    logger.info("Server Shutdown")

# ... Continued on next code block ...
  

これにより、GridDBサーバへの接続とウェザーステーションコンテナの選択が、アプリサーバの起動時に行われるようになります。作成されたGridDBConnectionはapp.state.griddb属性に割り当てられ、アプリ全体で使用できるようになります。

API エンドポイント

データパイプラインは、2つのAPIエンドポイントを介してWebからのリクエストを受け取ります。これらのエンドポイントは、./app/main.pyファイルの次の部分で定義されています。


# ... Continued from previous code block ...

@app.post("/record", response_model=ResponseModel)
async def save_current_sensor_data(response: Response):
    """Gets data from sensors and stores it in GridDB."""
    sensor_data = get_sensor_data()
    app.state.griddb.container.put(list(sensor_data.values()))

    response.status_code = status.HTTP_201_CREATED
    return {
        "status": "Successfully stored new reading",
        "records": 1,
        "data": [sensor_data],
    }

@app.get("/retrieve", response_model=ResponseModel)
async def get_stored_readings(minutes: Optional[int] = 5):
    """Retrieves stored readings within given number of minutes ago."""
    stmt = f"""
        select * 
        where timestamp > TIMESTAMPADD(MINUTE, NOW(), -{minutes})
        order by timestamp desc
    """
    data = app.state.griddb.execute_and_fetch(stmt, as_dict=True)

    return {
        "status": f"Retrieved stored records within last {minutes} minutes",
        "records": len(data),
        "data": data,
    }
  

ここでは、2つのAPIエンドポイントがあることがわかります。

  • POST /record
  • GET /retrieve

/record は、気象センサーを起動し、現在の読み取り値を GridDB に保存します。POSTリクエストを受け付け、201 CREATEDレスポンスと、ステータスメッセージ、レコード数、保存されているレコードを含むJSONボディを返します。

/retrieve は、指定された数分前以内の最新のレコードをフェッチします。GETリクエストと、オプションのクエリパラメータ minutes を受け取ります。そして、200 OKレスポンスと、ステータスメッセージ、レコード数、取得したレコードの配列を含むJSONボディを返します。

レスポンススキーマ

FastAPIフレームワークを使用する主な利点は、開発者の手間をかけずに、アプリが正しいデータタイプとフォーマットのオブジェクトを受け入れて返すことを保証するさまざまな機能を提供していることです。

そのような機能の1つとして、pydanticモデルを使用してレスポンスコンテンツを解析し、検証することが挙げられます。上記の2つのAPIエンドポイントの定義では、関数デコレーターのresponse_model引数にResponseModelを渡しています。これにより、レスポンスボディが、ファイル ./app/schema/response.py の ResponseModel クラスで定義されたスキーマに従っていることが確認できます。


from typing import List, Optional

from pydantic import BaseModel

class ResponseModel(BaseModel):
    status: str = "ok"
    records: Optional[int] = 0
    data: Optional[List[dict]] = []
  

次のセクションでは、APIのエンドポイントとレスポンスボディの使用例を紹介します。

使用方法

データパイプライン全体を実行するには、まずウェザーステーションが正しく接続され、GridDBサーバがすでに稼働していることを確認します。準備ができたら、ウェザーステーションコンテナを以下のように初期化します。

$ python init_griddb.py

その後に次のようにしてアプリケーションサーバーを起動します。

$ uvicorn app.main:app --host 0.0.0.0 --port 80

また、./run.shというbashファイルを実行すれば、上記の両方のコマンドを一度に実行することができます。

これにより、APIエンドポイントは、127.0.01(localhost)の80番ポートからアクセスできるようになります。

なお、dockerコンテナ経由で実行している場合、エンドポイントへのアクセスはlocalhostではなく、dockerコンテナやdockerマシンのIPアドレスを介して行われることがあります。

そのため、以降の例では、正しいIPアドレスとポートを設定に代えてください。

curlを使ってAPIエンドポイントをテストしてみましょう。まず、センサーを起動し、現在の気象データを取得します。

$ curl -X POST http://IP_ADDRESS:PORT/record

これにより、以下のようなレスポンスが返ってくるはずです。

{"status":"Successfully stored new reading","records":1,"data":[{"timestamp":"2021-02-17T10:18:06.509054","temperature":29.530784788460004,"pressure":1004.8003564430331,"humidity":64}]}

過去5分以内の直近に保存された読み取り値を取得します(クエリパラメータ minutesに値を渡さない場合、デフォルトは5です)。

$ curl http://IP_ADDRESS:PORT/retrieve

これにより、以下のようなレスポンスが得られます。

{"status":"Retrieved stored records within last 5 minutes","records":3,"data":[{"timestamp":"2021-02-17T10:23:54.940000","temperature":29.649967592036077,"pressure":1005.0168541036603,"humidity":53.0},{"timestamp":"2021-02-17T10:23:52.531000","temperature":29.767032247389704,"pressure":991.8437893041985,"humidity":73.0},{"timestamp":"2021-02-17T10:23:46.351000","temperature":31.720923387508172,"pressure":1011.998375306113,"humidity":95.0}]}

過去10分以内に保存された読み取り値を取得します。

$ curl http://IP_ADDRESS:PORT/retrieve?minutes=10

この場合には以下のようなレスポンスが得られます。

{"status":"Retrieved stored records within last 10 minutes","records":7,"data":[{"timestamp":"2021-02-17T10:23:54.940000","temperature":29.649967592036077,"pressure":1005.0168541036603,"humidity":53.0},{"timestamp":"2021-02-17T10:23:52.531000","temperature":29.767032247389704,"pressure":991.8437893041985,"humidity":73.0},{"timestamp":"2021-02-17T10:23:46.351000","temperature":31.720923387508172,"pressure":1011.998375306113,"humidity":95.0},{"timestamp":"2021-02-17T10:18:34.288000","temperature":28.10441189566165,"pressure":1028.2311879177432,"humidity":67.0},{"timestamp":"2021-02-17T10:18:30.304000","temperature":29.996273963137778,"pressure":1007.5716385639391,"humidity":52.0},{"timestamp":"2021-02-17T10:18:22.655000","temperature":28.823514215164877,"pressure":989.7959636620639,"humidity":66.0},{"timestamp":"2021-02-17T10:18:06.509000","temperature":29.530784788460004,"pressure":1004.8003564430331,"humidity":64.0}]}

これで、cronジョブやLambda Functionを設定して、/record APIを定期的に呼び出すことができるようになりました。また、ダッシュボードやデータビジュアライゼーションアプリを統合して、/retrieve APIを利用することもできます。

その他の検討事項

今回実装した基本的なデータパイプラインをさらに拡張したり、別の方向に持っていくためのアイデアをいくつか紹介します。

  • データクリーニングやデータ処理などのビジネスロジックを追加する。このプロジェクトの構造では、新しい機能を追加して、既存または追加のエンドポイントと接続することが非常に簡単です。
  • MQTTメッセージブローカを使用して、GridDBが新しいセンサデータを直接リッスンできるようにする。これは、別のGridDBチュートリアルで実装されています
  • HTTPとMQTTの両方のプロトコルを扱うことができます。fastapi-mqttというFastAPIライブラリがあり、これを使うことができます。

結論

この記事では、気象IoT向けにFastAPIとGridDBを使用して、シンプルかつ拡張可能なデータパイプラインの実装を構築しました。これらのツールは、特に多くの異なるIoTコンポーネント間のシームレスな統合を必要とするユースケースにおいて、データ集約型のシステムを簡単に作成・維持するのに役立つことを学びました。

ソースコード

GitHub

Leave a Reply

Your email address will not be published.