SimpleGridDBSinkTask

Overview

It is here where messages from Kafka are parsed and inserted as records into containers inside of GridDB. One method to extend and override when making a Sink-Task sub-class is the put() method, which will decide what to do with the collection of SinkRecords or messages that are sent from Kafka.

Sensor Schema

Static classes can be used to create a row or table schema for containers that log information about the readings of a sensor. They are TimeSeries containers making timestamps their row key. Below is an example schema of a sensor that represents a light and sound recorder.

Setting Schema for Sensor container(SimpleGridDBSinkTask.java)
static class Sensor {
	@RowKey Date time; 
	double light;
	double sound;
	public void setTime(Date time){
		this.time = time;
	}
	public void setLight(double light){
		this.light = light;
	}
	public void setSound(double sound){
		this.sound = sound;
	}
	public String toString() {
	return "Timestamp: "+time +" Light: "+ light +" Sound: "+sound;
	}
}

Sensor Schema

Time Light Sound
Jun 16 14:02 EDT 42.00 23.00
Jun 17 23:09 EDT 34.00 20.00
Jun 19 9:00 EDT 41.00 24.00

Getting Payload Message

For each SinkRecord we need to obtain its value, a HashMap representing message content, and its timestamp.

Extracting JSON message from Kafka(SimpleGridDBSinkTask.java)
// First ensure only messages are HashMaps are allowed to go through parsing
if(record.value().getClass() == HashMap.class)
// From that HashMap retrieve a field called ‘payload’ 
// This field is the JSON-string we created and published from mosquito
String payload = (String) ((HashMap)(record.value()).


Inserting a Record

Now that a sensor object has been created (these objects will be inserted as rows into GridDB). They are ready to be inserted into a GridDB container. To create or find a GridDB container and load it with a certain schema, all that’s needed is its name and the schema the rows are formatted as (in this case it is the Sensor.class schema).

Source Code

Complete source code used for the GridDB Kafka and MQTT Application can be downloaded from the following:

Download: griddb-kafka-mqtt-application.tar.gz