SimpleGridDBSinkConnector

Overview

In this file we implement the connector class defined in simple-sink.properties. It is where we load the configurations from the .properties file to establish a connection to Kafka and a certain topic and to a specified cluster in GridDB.

Setting Configurations

First we need to set up a Kafka configDef object that will extract GridDB variables and configurations like IP and port number from the simple-sink.properties.

Initializing configurations of a Sink Connector(SimpleGridDBSinkConnector.java)
public static final String HOST_CONFIG = "host";
public static final String PORT_CONFIG = "port";
public static final String CLUSTERNAME_CONFIG = "clusterName";
public static final String DATABASE_CONFIG = "database";
public static final String USER_CONFIG = "user";
public static final String PASSWORD_CONFIG = "password";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(HOST_CONFIG, Type.STRING, Importance.HIGH, "GridDB Host") 
.define(PORT_CONFIG, Type.STRING, Importance.HIGH, "GridDB Port")
.define(CLUSTERNAME_CONFIG, Type.STRING, Importance.HIGH, "GridDB clusterName")
.define(DATABASE_CONFIG, Type.STRING, Importance.HIGH, "GridDB Database")
.define(USER_CONFIG, Type.STRING, Importance.HIGH, "GridDB Username")
.define(PASSWORD_CONFIG, Type.STRING, Importance.HIGH, "GridDB Password");

Starting the Connector

The above constants also help start a connection to GridDB when the connector starts up by mapping fields like “host”, “username” and “port” from a Properties Object. The value of those fields will be loaded from the Properties object and used to connect to a specific cluster on GridDB.

Setting task configurations of a Sink Connector(SimpleGridDBSinkConnector.java)
public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    for (int i = 0; i < maxTasks; i++) {
        Map<String, String> config = new HashMap<>();
        if (host != null)
            config.put(HOST_CONFIG, host);
        if (port != null)
            config.put(PORT_CONFIG, port);
        if (clusterName != null)
            config.put(CLUSTERNAME_CONFIG, clusterName);
    if (database != null)
            config.put(DATABASE_CONFIG, database);
        if (user != null)
    config.put(USER_CONFIG, user);
        if (password != null)
            config.put(PASSWORD_CONFIG, password);
        configs.add(config);
    }
    
    return configs;
}

Source Code

Complete source code used for the GridDB Kafka and MQTT Application can be downloaded from the following:

Download: griddb-kafka-mqtt-application.tar.gz