Build a Data Pipeline for Weather IoT with FastAPI and GridDB

In today’s blog post, we walk you through a practical guide on how you can use FastAPI and GridDB to build a robust data pipeline for your weather IoT project.

By the end of this article, you’ll learn about the key components of a basic data pipeline architecture, as well as gain a good understanding on how you can further extend and apply the article’s example FastAPI implementation into your own use case.

To keep the discussion easy to follow, this blog entry primarily focuses on building the data pipeline and integrating its various components. We won’t go into detail about setting up an IoT weather station, although we’ll also talk about steps to make the weather station a seamless part of the data pipeline itself.

FULL SOURCE CODE

Data Pipeline Overview

First, let’s get a big-picture view of the example data pipeline architecture we’ll be implementing. The data pipeline consists of three main components:

  • Weather station: This includes the full HW and SW for monitoring temperature, air pressure, and humidity.
  • GridDB server: This houses the time-series database where the weather data will be stored.
  • FastAPI app: This provides the API endpoints, business logic, database connection layer, and other services for processing and returning data.

In this setup, the FastAPI app handles incoming HTTP requests from the web. These include requests that activate the sensors as well as requests that fetch stored data from GridDB. The FastAPI app is also where other business logic and services such as parsing, validating, and processing data is executed.

Later in this article, we’ll also talk about other possible data pipeline designs that may suit your use case better. For now, we’ll use this design to illustrate solutions on how we can nicely integrate FastAPI and GridDB together.

Requirements and Setup

Let’s now go over all the requirements and how they’re set up in this project.

Weather Station

For discussion purposes, our example assumes working with a hypothetical IoT home weather station that measures and monitors three physical quantities: temperature, pressure, and humidity. The sensors are hooked up to a Raspberry Pi, which also serves as the machine where our GridDB server and FastAPI app will be running.

As mentioned, we won’t explain how to set up and configure the weather station here. Besides, you may even choose a different hardware setup for your weather station altogether.

The important thing to note is that, in order for the weather station to properly integrate into the data pipeline, you need to make sure that all the libraries and packages for the sensors are installed in an environment where they’re visible to Python code in the FastAPI app.

For example, if you’re using the DHT22 temperature and air pressure sensor, the command import Adafruit_DHT shouldn’t raise an ImportError.

GridDB Server

GridDB is a good choice as the database for weather IoT systems. It’s optimized for IoT and supports a number of helpful time-series functionalities out of the box.

To learn how to set up GridDB, checkout this guide on installing GridDB on a Raspberry Pi. Or to quickly get it up and running, build and run a suitable GridDB Docker image instead.

In our example, we use a GridDB container to store the sensor data into appropriate columns:

  • timestamp or type TIMESTAMP
  • temperature of type DOUBLE
  • pressure of type DOUBLE
  • humidity of type DOUBLE

To interact with GridDB, the FastAPI app uses the GridDB Python client which can be built and installed using this guide.

FastAPI App

FastAPI is a high-performance framework for quickly building web APIs. It’s easy to pick up and use, which means that creating apps with FastAPI requires shorter development times compared to other Python web frameworks.

We install FastAPI with pip:

$ pip install fastapi

We also install an ASGI server for running the app in production:

$ pip install uvicorn[standard]

To ensure there’s separation of concerns among different parts of the FastAPI app, here’s how we structure the project:

.
|   .dockerignore
|   Dockerfile
|   init_griddb.py
|   requirements.txt
|   run.sh
|   
\---app
    |   main.py
    |   __init__.py
    |   
    +---config
    |       base.py
    |       __init__.py
    |       
    +---db
    |       connection.py
    |       utils.py
    |       __init__.py
    |       
    +---schema
    |       response.py
    |       __init__.py
    |       
    \---services
            weatherstation.py
            __init__.py

Here’s a summary of the key files and directories in the project:

  • ./init_griddb.py is a stand-alone Python script that creates the GridDB container
  • ./run.sh executes the GridDB initialization script and runs the uvicorn server.
  • ./app/main.py is where the FastAPI app instance and endpoints are defined and set up.
  • ./app/config/ contains settings and configuration files.
  • ./app/db/ is the database access layer.
  • ./app/schema/ contains pydantic models that ensure the response body has the correct format.
  • ./app/services/ contains the app’s business logic (e.g., activating sensors and capturing data)

Later, we’ll take a more in-depth look at the roles that these modules play in the data pipeline.

Data Pipeline

The FastAPI app ties together the different components of the data pipeline. Here’s how.

Configuration

Values for configuring the app are loaded or defined inside modules in the ./app/config/ directory. In ./app/config/base.py, the following settings are defined:

import os

GRIDDB_CONNECTION_PARAMS = {
    "notification_member": os.environ.get(
        "GRIDDB_NOTIFICATION_MEMBER",
        "griddb-server:10001"
    ),
    "cluster_name": os.environ.get("GRIDDB_CLUSTER_NAME", "defaultCluster"),
    "username": os.environ.get("GRIDDB_USERNAME", "admin"),
    "password": os.environ.get("GRIDDB_PASSWORD", "admin"),
}
GRIDDB_CONTAINER_NAME = os.environ.get(
    "GRIDDB_CONTAINER_NAME",
    "weatherstation"
)

It’s important to note that values for the settings are passed in as environment variables. Otherwise, default values are available. This helps prevent hard-coding sensitive information like database credentials and accidentally committing them to source control.

Another thing to consider is that the connection parameters above imply that we’re acquiring a GridDB Store object via fixed-list method. Other methods are multicast method and provider method, each of which requires a different set of parameters.

Business Logic (Service Layer)

In our example, the app’s interaction with the weather station is defined in the file ./app/services/weatherstation.py. That file defines a function get_sensor_data() which activates all the sensors and captures their readings. This function returns a dictionary that contains the current timestamp, temperature, pressure, and humidity.

Again, since we’re only using a hypothetical weather station, we won’t go into detail about this function’s implementation. The key thing to note is that the ./app/services/ directory is where the code for your business logic should reside. You can add other functionalities here, such as helper or utility functions/classes for processing the data further.

GridDB Access Layer

Next, we define a class that contains methods and attributes we need to manage GridDB connections and queries. We put this in the file ./app/db/connection.py:

import griddb_python as griddb

from .utils import handle_exceptions

class GridDBConnection:
    """Convenience class for managing GridDB connection."""

    def __init__(self, connection_params: dict) -> None:
        self.connection_params = connection_params
        self.container_name = None
        self.gridstore = None
        self.container = None

    @handle_exceptions
    def init(self, container_name: str = None) -> None:
        """Sets gridstore and container instance."""
        factory = griddb.StoreFactory.get_instance()
        self.gridstore = factory.get_store(**self.connection_params)
        if container_name is not None:
            self.container_name = container_name
            self.container = self.gridstore.get_container(self.container_name)

    @handle_exceptions
    def create_container(self, container_info: griddb.ContainerInfo) -> None:
        """Creates container with given container info."""
        self.container_name = container_info.name
        self.container = self.gridstore.put_container(container_info)

    @handle_exceptions
    def execute_and_fetch(self, query_stmt: str, as_dict: bool = False) -> list:
        """Executes query on `self.container` and returns results."""
        query = self.container.query(query_stmt)
        row_set = query.fetch()

        results = []
        columns = row_set.get_column_names()

        while row_set.has_next():
            row = row_set.next()

            if as_dict:
                row = dict((k, v) for k, v in zip(columns, row))

            results.append(row)

        return results

    def cleanup(self) -> None:
        """Closes container and store objects."""
        if self.container is not None:
            self.container.close()

        if self.gridstore is not None:
            self.gridstore.close()

We’ll see below how this class is used throughout the app. Note that the handle_exception decorator is defined in the file ./app/db/utils.py.

GridDB Container Initialization

To create the container for our weather station data, we include a stand-alone Python script that defines and initializes the container:

#!/usr/bin/python
import griddb_python as griddb

from app.db.connection import GridDBConnection
from app.config import GRIDDB_CONNECTION_PARAMS, GRIDDB_CONTAINER_NAME

def main() -> None:
    try:
        gdb = GridDBConnection(GRIDDB_CONNECTION_PARAMS)

        print("Initializing GridDB connection")
        gdb.init()

        print(f"Creating container {GRIDDB_CONTAINER_NAME}")
        con_info = griddb.ContainerInfo(
            name=GRIDDB_CONTAINER_NAME,
            column_info_list=[
                ["timestamp", griddb.Type.TIMESTAMP],
                ["temperature", griddb.Type.DOUBLE],
                ["pressure", griddb.Type.DOUBLE],
                ["humidity", griddb.Type.DOUBLE],
            ],
            type=griddb.ContainerType.TIME_SERIES
        )

        gdb.create_container(con_info)

        print('Done')
    except Exception as e:
        print(f"Error initializing GridDB: {e}")

if __name__ == "__main__":
    main()

This script needs to be run before starting the uvicorn server. Alternatively, you can run the bash script ./run.sh instead.

GridDB Connection Setup

We want to establish a connection to the GridDB server once the app server starts, as well as to close the connection once the app server shuts down. In addition, we need the ability to access the instance of our GridDBConnection class throughout the app.

To meet these requirements, we write part of the ./app/main.py as follows:

import logging
from typing import Optional

from fastapi import FastAPI, Response, status

from .config import (
    GRIDDB_CONNECTION_PARAMS,
    GRIDDB_CONTAINER_NAME,
)
from .db import GridDBConnection
from .schema import ResponseModel
from .services.weatherstation import get_sensor_data

logger = logging.getLogger("api")

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    logger.info("Server startup")

    logger.info("Initializing GridDB connection")
    gdb = GridDBConnection(GRIDDB_CONNECTION_PARAMS)
    gdb.init(GRIDDB_CONTAINER_NAME)

    app.state.griddb = gdb
    logger.info("Server Startup")

@app.on_event("shutdown")
async def shutdown_event():
    app.state.griddb.cleanup()
    logger.info("Server Shutdown")

# ... Continued on next code block ...

This ensures that connection to the GridDB server and selection of the weather station container both happen during app server startup. The created GridDBConnection is then assigned to the app.state.griddb attribute so that it can be used throughout the app.

API Endpoints

The data pipeline receives requests from the web via two API endpoints. These endpoints are defined in the next part of the ./app/main.py file:

# ... Continued from previous code block ...

@app.post("/record", response_model=ResponseModel)
async def save_current_sensor_data(response: Response):
    """Gets data from sensors and stores it in GridDB."""
    sensor_data = get_sensor_data()
    app.state.griddb.container.put(list(sensor_data.values()))

    response.status_code = status.HTTP_201_CREATED
    return {
        "status": "Successfully stored new reading",
        "records": 1,
        "data": [sensor_data],
    }

@app.get("/retrieve", response_model=ResponseModel)
async def get_stored_readings(minutes: Optional[int] = 5):
    """Retrieves stored readings within given number of minutes ago."""
    stmt = f"""
        select * 
        where timestamp > TIMESTAMPADD(MINUTE, NOW(), -{minutes})
        order by timestamp desc
    """
    data = app.state.griddb.execute_and_fetch(stmt, as_dict=True)

    return {
        "status": f"Retrieved stored records within last {minutes} minutes",
        "records": len(data),
        "data": data,
    }

Here, we can see the two API endpoints are:

  • POST /record
  • GET /retrieve

/record activates the weather sensors and stores the current reading into GridDB. It accepts a POST request and returns a 201 CREATED response and a JSON body that includes the status message, record count, and the record stored.

/retrieve fetches the most recent records within a given number of minutes ago. It accepts a GET request and an optional query parameter minutes. It then returns a 200 OK response and a JSON body that includes the status message, record count, and an array of retrieved records.

Response Schema

A key advantage of working with the FastAPI framework is that it offers various features that ensure the app accepts and returns objects with the right data types and formatting without additional developer effort.

One such feature is its use of pydantic models to parse and validate response content. We saw this above in the definitions of the two API endpoints where we pass in ResponseModel into the response_model argument of the function decorator. This makes sure that the response body follows the schema defined in the ResponseModel class in the file ./app/schema/response.py:

from typing import List, Optional

from pydantic import BaseModel

class ResponseModel(BaseModel):
    status: str = "ok"
    records: Optional[int] = 0
    data: Optional[List[dict]] = []

We’ll see usage examples of the API endpoints and response body in the next section.

Usage

To run the entire data pipeline, we first make sure that the weather station is properly hooked up and the GridDB server is already running. Once ready, we initialize the weather station container with:

$ python init_griddb.py

After this, we then run the app server with:

$ uvicorn app.main:app --host 0.0.0.0 --port 80

You can also just run the ./run.sh bash file to execute both of the above commands in one go.

This will make the API endpoints accessible via 127.0.01 (localhost) on port 80.

Note that if you’re running this via docker container, the endpoints may not be accessible via localhost but through the docker container’s or docker machine’s IP address instead.

So, in the subsequent examples, please substitute the correct IP address and port for your settings.

Let’s test the API endpoints using curl. First let’s activate the sensors and capture the current weather data:

$ curl -X POST http://IP_ADDRESS:PORT/record

This should return a response like this:

{"status":"Successfully stored new reading","records":1,"data":[{"timestamp":"2021-02-17T10:18:06.509054","temperature":29.530784788460004,"pressure":1004.8003564430331,"humidity":64}]}

To get the most recently-stored readings within the past 5 minutes (recall that if you don’t pass a value for the query parameter minutes, the default is 5):

$ curl http://IP_ADDRESS:PORT/retrieve

This produces the following response:

{"status":"Retrieved stored records within last 5 minutes","records":3,"data":[{"timestamp":"2021-02-17T10:23:54.940000","temperature":29.649967592036077,"pressure":1005.0168541036603,"humidity":53.0},{"timestamp":"2021-02-17T10:23:52.531000","temperature":29.767032247389704,"pressure":991.8437893041985,"humidity":73.0},{"timestamp":"2021-02-17T10:23:46.351000","temperature":31.720923387508172,"pressure":1011.998375306113,"humidity":95.0}]}

And for the past 10 minutes:

$ curl http://IP_ADDRESS:PORT/retrieve?minutes=10

which gives us:

{"status":"Retrieved stored records within last 10 minutes","records":7,"data":[{"timestamp":"2021-02-17T10:23:54.940000","temperature":29.649967592036077,"pressure":1005.0168541036603,"humidity":53.0},{"timestamp":"2021-02-17T10:23:52.531000","temperature":29.767032247389704,"pressure":991.8437893041985,"humidity":73.0},{"timestamp":"2021-02-17T10:23:46.351000","temperature":31.720923387508172,"pressure":1011.998375306113,"humidity":95.0},{"timestamp":"2021-02-17T10:18:34.288000","temperature":28.10441189566165,"pressure":1028.2311879177432,"humidity":67.0},{"timestamp":"2021-02-17T10:18:30.304000","temperature":29.996273963137778,"pressure":1007.5716385639391,"humidity":52.0},{"timestamp":"2021-02-17T10:18:22.655000","temperature":28.823514215164877,"pressure":989.7959636620639,"humidity":66.0},{"timestamp":"2021-02-17T10:18:06.509000","temperature":29.530784788460004,"pressure":1004.8003564430331,"humidity":64.0}]}

Now, we can do something like set a cron job or a Lambda Function to regularly invoke the /record API. We can also integrate a dashboard or data visualization app to consume the /retrieve API.

Further Considerations

Here are some ideas to further extend the basic data pipeline we’ve implemented, as well as ways to take it into another direction:

  • Implement additional business logic such as data cleaning or data processing. With our project structure, it’s very easy to add new functionality and wire it up with an existing or additional endpoint.
  • Use an MQTT message broker so that GridDB can directly listen for new sensor data. This is implemented in another GridDB tutorial here.
  • Handle both HTTP and MQTT protocols. There’s a FastAPI library called fastapi-mqtt that lets you do this.

Conclusion

With this article, we built a simple but extensible data pipeline implementation using FastAPI and GridDB for weather IoT. We learned that these tools can help us easily create and maintain data-intensive systems, especially with use cases that require seamless integration among many different IoT components.

Source Code

GitHub