GridDBSinkTask
Overview
Here you can add the additional schema of different sensors as classes you want to put into GridDB containers. Like the Sensor object from SimpleGridDBSinkTask.java, they should have a time-field as they will be added to TimeSeries containers.
Sensor-Type
There will also be a general log of all the sensors in the program that will be identified by their unique sensor-id
so that the data viewer can easily index them afterwards.
static class SensorType { @RowKey String id; Date time; String type; public void setId(String id){ this.id = id; } public void setType(String type){ this.type = type; } public void setTime(Date time){ this.time = time; } public String toString() { return "Timestamp: " + time + " Sensor-Type: " + type + " Sensor-id: " + id; } }
Adding Other Sensors
Here we add different types of sensors to reflect the different types of IoT or embedded devices that might be used to read data. Some such examples are the power and heat of factory equipment or the electrical output of certain devices that can only handle specific voltages or amp levels.
Other sensor schemas(GridDBSinkTask.java)// Measures power (watts) and heat static class WattSensor { @RowKey Date time; double watts; double heat; public void setTime(Date time){ this.time = time; } public void setWatts(double watts){ this.watts = watts; } public void setHeat(double heat){ this.heat = heat; } public String toString() { return "Timestamp: "+time +" Watts: "+watts + " Heat: " + heat; } }
Parsing a Sensor-Type Record
With JSON.decode
it is possible to decode one string into different classes depending on the number and names of its fields. In this program the payload
message will be first decoded into a SensorType.
// Attempt to retrieve and decode message content ("payload") String payload = (String) ((HashMap)record.value()).get("payload") // Create a sensor-type record that corresponds to "id" and "type" fields of // the message SensorType sensorType = JSON.decode(payload,SensorType.class);
Storing a Sensor-Type Record
Since the SensorType container only stores unique sensors, we only insert a record when a new sensor-id is found.
// In GridDBSinkTask.java : put() Container<String,SensorType> typeContainer = store.putCollection(SENSOR_TYPE_NAME,SensorType.class); if(typeContainer.get(sensorType.id) == null){ // Add the record time sensorType.time = new Date(record.timestamp()); // Insert sensor-type record into container, with sensor-id as the Key typeContainer.put(sensorType.id,sensorType); }
Parsing and Storing Sensor Records
In this program all sensors (those with unique sensor-ids) get their own container to log their data. From there the type can be checked from sensorType and it can be determined what kind of schema the container should have.
Parsing a LightSensor record(GridDBSinkTask.java)if(payload.contains("light")){ // For a record of type : light // Decode the last two fields of the JSON message into a LightSensor LightSensor sensor = JSON.decode(payload, LightSensor.class); // Add the records timestamp as the time field sensor.time = new Date(record.timestamp()); // Insert into a container with a name that matches its id TimeSeries<LightSensor> ts = store.putTimeSeries(sensorType.id, LightSensor.class); ts.put(sensor); }Parsing an ElectricitySensor record(GridDBSinkTask.java)
if(payload.contains(“volts”)){ ElectricitySensor sensor = JSON.decode(payload,ElectricitySensor.class); sensor.time = new Date(record.timestamp()); TimeSeries<ElectricitySensor> ts= store.putTimeSeries(sensorType.id,ElectricitySensor.class); ts.put(sensor); }
This process can repeat for however many schema you want to define for sensors.
Source Code
Complete source code used for the GridDB Kafka and MQTT Application can be downloaded from the following:
Download: griddb-kafka-mqtt-application.tar.gz