# Data Ingestion with Apache Kafka

In this tutorial we feed the CASAS Dataset through Kafka and into GridDB with Confluents kafka-connect-jdbc. The raw TSV data will be converted into JSON with Gawk and then fed into Kafka with a Console Producer. Then kafka-connect-jdbc will write the data to GridDB without writing any code. Finally, we'll inspect the data with SQLWorkbench/J.

# Setup Kafka

Kafka (opens new window) is a data streaming platform with many different possible inputs and outputs that are easy to create. For this tutorial we'll use a Kafka Console Producer to put data into Kafka which will be then consumed by Kafka Connect Sink that we are going write. We are going to follow the Kafka Quickstart (opens new window). Kafka can be downloaded from their downloads page (opens new window), we're using version 2.12-2.5.0. You will also need to have a Java 1.8 development environment installed on your system. After downloading, we simply untar and start the Zookeeper and Kafka Servers.

$ tar xzvf kafka_2.12-2.5.0.tgz
$ cd kafka_2.12-2.5.0
$ export PATH=$PATH:/path/to/kafka_2.12-2.5.0/bin
$ zookeeper-server-start.sh  --daemon config/zookeeper.properties
$ kafka-server-start.sh --daemon config/server.properties

# Running the GridDB Sink

We will assume that you already have GridDB 4.5 CE installed and have the latest the GridDB JDBC Jar built. We will copy GridDB JDBC jar and the Kafka Connect JDBC Jar to the Kafka Libs folder. The Kafka Connect JDBC Jar can be downloaded from here (opens new window) and the source can be fetched from GitHub (opens new window).

The GridDB JDBC connector had to be modified for use with kafka-connect-jdbc, it can be downloaded from here (opens new window) and the source can be viewed here (opens new window).

$ cp /path/to/kafka-connect-jdbc/target/kafka-connect-jdbc-$VERSION-SNAPSHOT.jar /path/to/kafka_2.12-2.5.0/libs
$ cp /path/to/griddb-jdbc/bin/griddb-jdbc.jar /path/to/kafka_2.12-2.5.0/libs

A configuration file, configs/connect-jdbc.properties is required for Kafka Connect which defines the parameters for the JDBC sink, the topics it subscribes too, as well as a transform that converts datetime string into an object that can be written to the database.

bootstrap.servers=localhost:9092
name=griddb-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
key.converter.schemas.enable=true
value.converter.schemas.enable=true
batch.size=1
topics.regex=csh(.*)
connection.url=jdbc:gs://239.0.0.1:41999/defaultCluster/public
connection.user=admin
connection.password=admin
auto.create=true
transforms=TimestampConverter
transforms.TimestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format=yyyy-MM-dd hh:mm:ss
transforms.TimestampConverter.field=datetime
transforms.TimestampConverter.target.type=Timestamp

Now we can start Kafka Connect:

$ cd path/to/kafka
$ ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-jdbc.properties

# Data Set

CASAS (opens new window) is research group based out of the Washington State University that has a large sensor dataset (opens new window). The data was collected from a variety of in-home sensors while volunteers performed their normal daily routines. We are going to use this dataset in a series of tutorials to demonstrate how to use GridDB to ingest, analyze, and visualize data. The data looks like this:

2012-07-18 12:54:45.126257  D001    Ignore      Ignore      CLOSE   Control4-Door
2012-07-18 12:54:45.196564  D002    OutsideDoor FrontDoor   OPEN    Control4-Door
2012-07-18 12:54:45.247825  T102    Ignore      FrontDoorTemp   78  Control4-Temperature
2012-07-18 12:54:45.302398  BATP102 Ignore      Ignore      85  Control4-BatteryPercent
2012-07-18 12:54:45.399416  T103    Ignore      BathroomTemp    25  Control4-Temperature
2012-07-18 12:54:45.472391  BATP103 Ignore      Ignore      82  Control4-BatteryPercent
2012-07-18 12:54:45.606580  T101    Ignore      Ignore      31  Control4-Temperature
2012-07-18 12:54:45.682577  MA016   Kitchen     Kitchen     OFF Control4-MotionArea
2012-07-18 12:54:45.723461  D003    Bathroom    BathroomDoor    OPEN    Control4-Door
2012-07-18 12:54:45.767498  M009    Bedroom     Bedroom     ON  Control4-Motion

We've also made a smaller subset of the data available here (opens new window) in case you do not wish to download the entire 13GB original zipfile.

The fields of the TSV file are:

  • Date
  • Time
  • Sensor ID
  • The Room the sensor is in.
  • What the sensor is sensing
  • Sensor Message
  • Sensor Activity

# Ingesting the Data

We will need to convert the TSV data into the schema expected by Kafka. To do so, we'll use a small shell-script:

#!/bin/bash
function echo_payload {
    echo '{ "payload": {  "datetime": "'$1 $2'",  "sensor": "'$3'",  "translate01": "'$4'",  "translate02": "'$5'",  "message": "'$6'",  "sensoractivity": "'$7'" }, "schema": {  "fields": [   {    "field": "datetime",    "optional": false,    "type": "string"   },   {    "field": "sensor",    "optional": false,    "type": "string"   },   {    "field": "translate01",    "optional": false,    "type": "string"   },   {    "field": "translate02",    "optional": false,    "type": "string"   },   {    "field": "message",    "optional": false,    "type": "string"   },   {    "field": "sensoractivity",    "optional": false,    "type": "string"   }   ],  "name": "ksql.users",  "optional": false,  "type": "struct" }}'
}
TOPICS=()
for file in `find $1 -name \*.rawdata.txt` ; do
    echo $file
    LOCATION=`echo $file | sed -e s/.rawdata.txt// -e s:.*/::g`
    head -10 $file |while read -r line ; do
        SENSOR=`echo ${line} | awk '{ print $3 }'`
        if [[ ! " ${TOPICS[@]} " =~ " ${LOCATION}_${SENSOR} " ]]; then
            echo Creating topic ${LOCATION}_${SENSOR}
            kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic  ${LOCATION}_${SENSOR} 2&>1 /dev/null
            TOPICS+=(${LOCATION}_${SENSOR})
        fi
        echo_payload ${line} | kafka-console-producer.sh --topic ${LOCATION}_${SENSOR} --bootstrap-server localhost:9092
    done
done

It will find any of the CASAS data files in the given path, create the Kafka topics and then input the data into Kafka via Kafka Console Producer.

If everything is working as it should, the Kafka Connect should output log entries like so:

[2020-12-14 18:52:37,786] INFO WorkerSinkTask{id=griddb-sink-0} Committing offsets asynchronously using sequence number 1: 
{csh101_MA016-0=OffsetAndMetadata{offset=5, leaderEpoch=null, metadata=''}, csh102_LS013-0=OffsetAndMetadata{offset=4, 
leaderEpoch=null, metadata=''}, csh101_D001-0=OffsetAndMetadata{offset=14, leaderEpoch=null, metadata=''}, 
csh101_D002-0=OffsetAndMetadata{offset=6, leaderEpoch=null, metadata=''}, csh101_D003-0=OffsetAndMetadata{offset=5, 
leaderEpoch=null, metadata=''}, csh101_M009-0=OffsetAndMetadata{offset=5, leaderEpoch=null, metadata=''}, 
csh101_-0=OffsetAndMetadata{offset=26, leaderEpoch=null, metadata=''}, csh102_MA020-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}, csh102_-0=OffsetAndMetadata{offset=10, 
leaderEpoch=null, metadata=''}, csh102_M021-0=OffsetAndMetadata{offset=12, leaderEpoch=null, metadata=''}, 
csh101_T103-0=OffsetAndMetadata{offset=6, leaderEpoch=null, metadata=''}, csh101_T102-0=OffsetAndMetadata{offset=6, 
leaderEpoch=null, metadata=''}, csh101_T101-0=OffsetAndMetadata{offset=5, leaderEpoch=null, metadata=''}, 
csh101_BATP103-0=OffsetAndMetadata{offset=5, leaderEpoch=null, metadata=''}, csh101_BATP102-0=OffsetAndMetadata{offset=6, 
leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:349)

# Inspecting the Data

In a previous blog post (opens new window), we showed how to use SQLWorkbench/J to see data in GridDB so we're going to use it here to have a look at the data. After a successful connection to the database, select the Tools->Show Database Explorer menu item or press Ctrl-d and a list of tables in GridDB will be shown. Selecting a table will allow you to see it's data by selecting the Data tab on the right as shown here.