Setup Prerequisites

Kafka

Kafka is a messaging broker that can use many different input and output connectors. For this application, we’ll use the MQTT input or Source connector and a GridDB output or sink connector. You can download Kafka from the Apache website.

# cd /opt/
# tar zxvf kafka_2.12-0.10.2.1.tgz

Create /etc/profile.d/kafka.sh with the following contents

#!/bin/bash
export PATH=$PATH:/opt/kafka_2.12-0.10.2.1/bin

Then log out and back in so it will take affect. Service files are not included Kafka so we’ll start Zookeeper and Kafka manually:

$ nohup zookeeper-server-start.sh /opt/kafka_2.12-0.10.2.1/config/zookeeper.properties > /tmp/zoo.log &
$ nohup kafka-server-start.sh /opt/kafka_2.12-0.10.2.1/config/server.properties > /tmp/kafka.log &

With Kafka running, now we can create a test topic

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
$ kafka-topics.sh –list --zookeeper localhost:2181

Start a consumer in one terminal:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

In a different terminal, you can use a producer to send test message:

$ echo \"hello world\" | kafka-console-producer.sh --broker-list localhost:9092 --topic test
$ echo \"hello world again\" | kafka-console-producer.sh --broker-list localhost:9092 --topic test

The consumer should print your test messages:

"hello world"
"hello world again

Mosquitto

On CentOS 7, Mosquitto is included in Fedora’s Extra Packages for Enterprise Linux. If you haven’t already done so, install the epel-release package.

# yum –y install epel-release

And then install Mosquitto and Mosquitto Development Tools:

# yum install gcc
# yum –y install mosquitto
# yum install mosquitto-devel

Start Mosquitto:

# systemctl enable mosquitto
# systemctl start mosquitto 

Now we can test Mosquitto, in one terminal run the Mosquitto Subscriber:

$ mosquitto_sub -t 'test' –v

In another terminal, you can run the client multiple times to send multiple messages:

$ mosquitto_pub -t 'test' -m 'hello world'
$ mosquitto_pub -t 'test' -m 'hello world again'

You should see the subscriber print your publisher messages:

test hello world
test hello world again

Mqtt Kafka Connector

After checking out the connector from Github, change into the mqtt-kafka-connector repository

$ git clone https://github.com/evokly/kafka-connect-mqtt
$ cd kafka-mqtt-connect

You will need to edit a few files to get Mqtt-Kafka-Connector to build and run in a way that it supplies useful messages to GridDB. The first file is in the kafka-connect-mqtt directory, which is config/mqtt.properties. Change the lines as follows:

kafka.topic=test
Also comment out this line:
#mqtt.topic=#

By default the mqtt-source-connector uses a String… edit src/main/java/com/evokly/kafka/connect/mqtt/sample/DumbProcessor.java, changing this:

@Override
public SourceRecord[] getRecords(String kafkaTopic) {
    return new SourceRecord[]{new SourceRecord(null, null, kafkaTopic, null,
            Schema.STRING_SCHEMA, mTopic,
            Schema.BYTES_SCHEMA, mMessage.getPayload())};
}
To this:
@Override
public SourceRecord[] getRecords(String kafkaTopic) {
    return new SourceRecord[]{new SourceRecord(null, null, kafkaTopic, null,
            Schema.STRING_SCHEMA, mTopic,
            Schema.STRING_SCHEMA, mMessage.toString())};
}

Edit MqttSourceTaskTest.java, on line 51 change:

assertEquals(new String((byte[]) sourceRecords.get(0).value(), "UTF-8"), "test_message");
To:
assertEquals(sourceRecords.get(0).value(), "test_message");

From there run these commands in your terminal:

$ ./gradlew clean check
$ ./gradlew clean jar
$ ./gradlew javadoc
$ ./gradlew copyRuntimeLibs

Once the files have been edited, add these 2 paths into your CLASSPATH variable:

$ export CLASSPATH=${CLASSPATH}:$PWD/build/libs/kafka-connect-mqtt-{PROJECT-VERSION}.jar:$PWD/build/output/lib/org.eclipse.paho.client.mqttv3-1.0.2.jar

In this case for the project version, it was: 1.1-SNAPSHOT

Once the libraries have been built and the CLASSPATH has been configured run the connector with:

$ connect-standalone.sh /opt/kafka_2.12-0.10.2.1/config/connect-standalone.properties config/mqtt.properties

Once the kafka-mqtt connector is running, switch to another terminal and setup a kafka-consumer process:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

Switch to another terminal and set up a mosquitto producer process and send a message like this:

$ mosquitto_pub -t 'test' -m 'hello world'

To confirm the message was received on the Kafka side, switch back to the other terminal. The output should be a HashMap following this format:

{"schema": {"type":string, "optional":false}, "payload":"hello world"}

From there on the mosquitto side, you can send multiple messages to the kafka consumer until either the consumer or the kafka-mqtt-connector are shut down

Other JAR Files

A few other jar files are used by the various components of the solution. One was the Jsonic Library. The other file was the org.eclipse.paho.client.mqttv3-1.0.2.jar file.

GridDB

GridDB will store the sensor data sent over MQTT to the Kafka server.You can download GridDB from this link.

Edit /var/lib/gridstore/conf/gs_cluster.json and set the clusterName and then set passwords and start GridDB as the gsadm user. The documentation and included files assume a clustername of “defaultCluster” and a admin password of “admin”.

You will need to ensure that ports 10001, 10010, 10020, 20000, and 31999 are accessible from the multicast address and any other IP that may connect to GridDB, or you can quickly disable the firewall:

$ systemctl disable firewalld
$ systemctl stop firewalld

# su – gsadm
$ gs_password admin
$ gs_startserver
$ gs_joincluster –c defaultCluster –u admin/admin
$ gs_stat –u admin/admin

The above gs_stat command will confirm GridDB is running on your host.

You can ensure a Java client can connect to GridDB by compiling and running an included sample application:

$ mkdir gsSample
$ cp /usr/griddb-3.0.1/docs/sample/program/Sample1.java gsSample/
$ javac gsSample/Sample1.java
$ java gsSample/Sample1 239.0.0.1 31999 your_clustername admin your_password

Output:

--> Person:  name=name02 status=false count=2 lob=[65, 66, 67, 68, 69, 70, 71, 72, 73, 74]

Source Code

Complete source code used for the GridDB Kafka and MQTT Application can be downloaded from the following:

Download: griddb-kafka-mqtt-application.tar.gz