SimpleGridDBSinkTask
Overview
It is here where messages from Kafka are parsed and inserted as records into containers inside of GridDB. One method to extend and override when making a Sink-Task sub-class is the put() method, which will decide what to do with the collection of SinkRecords or messages that are sent from Kafka.
Sensor Schema
Static classes can be used to create a row or table schema for containers that log information about the readings of a sensor. They are TimeSeries containers making timestamps their row key. Below is an example schema of a sensor that represents a light and sound recorder.
Setting Schema for Sensor container(SimpleGridDBSinkTask.java)static class Sensor { @RowKey Date time; double light; double sound; public void setTime(Date time){ this.time = time; } public void setLight(double light){ this.light = light; } public void setSound(double sound){ this.sound = sound; } public String toString() { return "Timestamp: "+time +" Light: "+ light +" Sound: "+sound; } }
Sensor Schema
Time | Light | Sound |
---|---|---|
Jun 16 14:02 EDT | 42.00 | 23.00 |
Jun 17 23:09 EDT | 34.00 | 20.00 |
Jun 19 9:00 EDT | 41.00 | 24.00 |
Getting Payload Message
For each SinkRecord
we need to obtain its
value, a HashMap
representing message content, and its timestamp.
// First ensure only messages are HashMaps are allowed to go through parsing if(record.value().getClass() == HashMap.class) // From that HashMapretrieve a field called ‘payload’ // This field is the JSON-string we created and published from mosquito String payload = (String) ((HashMap)(record.value()). Inserting a Record
Now that a sensor object has been created (these objects will be inserted as rows into GridDB). They are ready to be inserted into a GridDB container. To create or find a GridDB container and load it with a certain schema, all that’s needed is its name and the schema the rows are formatted as (in this case it is the Sensor.class schema).
Source Code
Complete source code used for the GridDB Kafka and MQTT Application can be downloaded from the following:
Download: griddb-kafka-mqtt-application.tar.gz