GridDBSinkTask

Overview

Here you can add the additional schema of different sensors as classes you want to put into GridDB containers. Like the Sensor object from SimpleGridDBSinkTask.java, they should have a time-field as they will be added to TimeSeries containers.

Sensor-Type

There will also be a general log of all the sensors in the program that will be identified by their unique sensor-id so that the data viewer can easily index them afterwards.

Create Sensor-type schema(GridDBSinkTask.java)
static class SensorType {
	@RowKey String id;
	Date time;
	String type;

	public void setId(String id){
		this.id = id;
	}

	public void setType(String type){
		this.type = type;
	}

	public void setTime(Date time){
		this.time = time;
	}
	public String toString() {
	return "Timestamp: " + time + " Sensor-Type: " + type +            " Sensor-id: " + id;
	}
}

Adding Other Sensors

Here we add different types of sensors to reflect the different types of IoT or embedded devices that might be used to read data. Some such examples are the power and heat of factory equipment or the electrical output of certain devices that can only handle specific voltages or amp levels.

Other sensor schemas(GridDBSinkTask.java)
// Measures power (watts) and heat
static class WattSensor {
	@RowKey Date time;
	double watts;
	double heat;
	public void setTime(Date time){
		this.time = time;
	}
	public void setWatts(double watts){
		this.watts = watts;
	}
	public void setHeat(double heat){
		this.heat = heat;
	}
	public String toString() {
		return "Timestamp: "+time +" Watts: "+watts + " Heat: " + heat;
	}
}

Parsing a Sensor-Type Record

With JSON.decode it is possible to decode one string into different classes depending on the number and names of its fields. In this program the payload message will be first decoded into a SensorType.

Decoding the JSON-string into a Sensor-type(GridDBSinkTask.java)
// Attempt to retrieve and decode message content ("payload")
String payload = (String) ((HashMap)record.value()).get("payload")
// Create a sensor-type record that corresponds to "id" and "type" fields of // the message
SensorType sensorType = JSON.decode(payload,SensorType.class);

Storing a Sensor-Type Record

Since the SensorType container only stores unique sensors, we only insert a record when a new sensor-id is found.

// In GridDBSinkTask.java : put()
Container<String,SensorType> typeContainer = store.putCollection(SENSOR_TYPE_NAME,SensorType.class); 
if(typeContainer.get(sensorType.id) == null){
	// Add the record time
	sensorType.time = new Date(record.timestamp());
// Insert sensor-type record into container, with sensor-id as the Key
	typeContainer.put(sensorType.id,sensorType);
}

Parsing and Storing Sensor Records

In this program all sensors (those with unique sensor-ids) get their own container to log their data. From there the type can be checked from sensorType and it can be determined what kind of schema the container should have.

Parsing a LightSensor record(GridDBSinkTask.java)
if(payload.contains("light")){
// For a record of type : light
// Decode the last two fields of the JSON message into a LightSensor 
LightSensor sensor = JSON.decode(payload, LightSensor.class);
// Add the records timestamp as the time field
sensor.time = new Date(record.timestamp());
// Insert into a container with a name that matches its id
TimeSeries<LightSensor> ts = store.putTimeSeries(sensorType.id, LightSensor.class);
ts.put(sensor);
}
Parsing an ElectricitySensor record(GridDBSinkTask.java)
if(payload.contains(“volts”)){
ElectricitySensor sensor = JSON.decode(payload,ElectricitySensor.class);
	sensor.time = new Date(record.timestamp());
TimeSeries<ElectricitySensor> ts= store.putTimeSeries(sensorType.id,ElectricitySensor.class);
ts.put(sensor); 
}

This process can repeat for however many schema you want to define for sensors.

Source Code

Complete source code used for the GridDB Kafka and MQTT Application can be downloaded from the following:

Download: griddb-kafka-mqtt-application.tar.gz