SimpleGridDBSinkConnector
Overview
In this file we implement the connector class defined in simple-sink.properties. It is where we load the configurations from the .properties file to establish a connection to Kafka and a certain topic and to a specified cluster in GridDB.
Setting Configurations
First we need to set up a Kafka configDef object that will extract GridDB variables and configurations like IP and port number from the simple-sink.properties.
Initializing configurations of a Sink Connector(SimpleGridDBSinkConnector.java)public static final String HOST_CONFIG = "host"; public static final String PORT_CONFIG = "port"; public static final String CLUSTERNAME_CONFIG = "clusterName"; public static final String DATABASE_CONFIG = "database"; public static final String USER_CONFIG = "user"; public static final String PASSWORD_CONFIG = "password"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(HOST_CONFIG, Type.STRING, Importance.HIGH, "GridDB Host") .define(PORT_CONFIG, Type.STRING, Importance.HIGH, "GridDB Port") .define(CLUSTERNAME_CONFIG, Type.STRING, Importance.HIGH, "GridDB clusterName") .define(DATABASE_CONFIG, Type.STRING, Importance.HIGH, "GridDB Database") .define(USER_CONFIG, Type.STRING, Importance.HIGH, "GridDB Username") .define(PASSWORD_CONFIG, Type.STRING, Importance.HIGH, "GridDB Password");
Starting the Connector
The above constants also help start a connection to GridDB when the connector starts up by mapping fields like “host”, “username” and “port” from a Properties Object. The value of those fields will be loaded from the Properties object and used to connect to a specific cluster on GridDB.
Setting task configurations of a Sink Connector(SimpleGridDBSinkConnector.java)public List<Map<String, String>> taskConfigs(int maxTasks) { ArrayList<Map<String, String>> configs = new ArrayList<>(); for (int i = 0; i < maxTasks; i++) { Map<String, String> config = new HashMap<>(); if (host != null) config.put(HOST_CONFIG, host); if (port != null) config.put(PORT_CONFIG, port); if (clusterName != null) config.put(CLUSTERNAME_CONFIG, clusterName); if (database != null) config.put(DATABASE_CONFIG, database); if (user != null) config.put(USER_CONFIG, user); if (password != null) config.put(PASSWORD_CONFIG, password); configs.add(config); } return configs; }
Source Code
Complete source code used for the GridDB Kafka and MQTT Application can be downloaded from the following:
Download: griddb-kafka-mqtt-application.tar.gz