Overview

The MigratoryData Sink Connector for Kafka consumes Kafka messages from a configurable list of topics and streams them to MigratoryData according to a configurable mapping between Kafka topics and MigratoryData subjects.

Requirements

Built upon the Kafka Connect API, MigratoryData Sink Connector for Kafka must be installed into a Kafka Connect distributed service. Therefore, here are the requirements:

  • A running Kafka cluster
  • A running Kafka Connect distributed service
  • A running MigratoryData cluster

Installation

  1. To install the MigratoryData Connector for Kafka, download the Kafka connector from the MigratoryData Connectors section of the downloads page.

  2. Unzip the Kafka connector archive and place the JAR file migratorydata-connector-kafka-<VERSION>.jar of the archive to a folder <CONNECTOR_PATH> accessible by your Kafka Connect distributed service.

  3. Edit the configuration file config/connect-distributed.properties of your Kafka Connect distributed service and configure the parameter plugin.path to include the folder <CONNECTOR_PATH> where you placed the JAR file in the previous step.

Configuration

MigratoryData Sink Connector for Kafka consists of Kafka-defined parameters, MigratoryData-specific parameters, as well as topics-to-subjects mapping parameters.

Kafka-defined parameters

Entry-point Class

The Java class of the connector is defined as follows:

"connector.class":"com.migratorydata.kafka.sink.MigratoryDataSinkConnector"

Converters

In MigratoryData both the content and the subject of the messages are UTF-8 strings. Therefore, the connector should use the following convertors:

"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter"

Tasks

MigratoryData Sink Connector can be split into multiple tasks across the Workers of the Kafka Connect distributed service, where each task will consume messages from a distinct subset of topic partitions. The Kafka Connect service is in charge with:

  • the distribution of tasks across the Kafka Connect workers
  • the distribution of the Kafka topic partitions for each task
  • the restart of a task after a failure or a load re-balance

For example, to split the MigratoryData Sink Connector into at most five tasks, configure the connector as follows:

"tasks.max": "5",

MigratoryData-specific parameters

Define the MigratoryData cluster where the connector should connect to. For example, supposing the MigratoryData cluster consists of two instances which accept encrypted client connections at the addresses push1.example.com:443, push2.example.com:443, then configure this connector as follows:

"migratorydata.servers": "push1.example.com:443,push2.example.com:443",
"migratorydata.encryption": "true"

In order to be able to publish messages to a MigratoryData cluster, a client must provide an entitlement token such that message publication is allowed only on the entitled subjects. To allow the connector to publish messages, configure the entitlement token as in the following example:

"migratorydata.entitlement_token": "some-token"

Topics-to-subjects mapping parameters

The list of Kafka topics to be consumed by the adapter can be specified:

  • either as a list of Kafka topics
  • or a a regexp pattern to match certain Kafka topics

A Kafka message received on a Kafka topic which is in the list or matches the regexp will be published by the connector on a MigratoryData subject as defined by a mapping you can defined.

Indeed, each Kafka topic can be mapped into one or more MigratoryData subjects as detailed below.

Topics as list

Assuming the connector consumes the Kafka topics t_1 and t_2, and maps Kafka topics to MigratoryData subjects as follows:

Kafka topic MigratoryData subject
t_1 /sbj/a
t_2 /sbj/x, /sbj/y/t_2 and /sbj/z/t_2/<KEY>

where <KEY> is the key of the Kafka message received on the Kafka topic t_2. Then, configure the connector as follows:

"topics":"t_1, t_2",
"kafka.topics.t_1":"/sbj/a",
"kafka.topics.t_2":"/sbj/x, /sbj/y/${topic}, /sbj/z/${topic}/${key}"

Topics as regexp

Assuming the connector consumes the Kafka topics matching the regexp pattern t_(.)*, and maps each matched topic to the following MigratoryData subjects /sbj/a, /sbj/b/<TOPIC>, and /sbj/c/<TOPIC>/<KEY> where <TOPIC> is the matched topic of the received Kafka message, and <KEY> is the key of the received Kafka message. Then, configure the connector as follows:

"topics.regex":"t_(.*)",
"kafka.topics.regex":"/sbj/a, /sbj/b/${topic}, /sbj/c/${topic}/${key}"

Reliability

MigratoryData Sink Connector achieves reliable messaging without message losses.

Indeed, each task of the MigratoryData Sink Connector writes into Kafka the offsets of the Kafka messages successfully published into the MigratoryData cluster, i.e. for which a publication acknowledgement has been received from the MigratoryData cluster.

Therefore, if a task is restarted after a failure or a load re-balance performed by Kafka Connect, the task will continue publication of Kafka messages from the last offsets previously acknowledged by task.

Quick start tutorial

In this section we demonstrate how to achieve Kafka messaging to the web.

Install MigratoryData

Download the tarball package of the MigratoryData server from the downloads page, unzip the tarball to any folder, change to that folder, and run on Linux/Unix/MacOS:

$ ./start-migratorydata.sh

By default, the MigratoryData server will accept client connections on the address localhost:8800. Therefore, open in a browser the following url:

http://localhost:8800

A welcome page should be loaded. Click on the DEMO button and a demo web app should be loaded. Click on the buttons Connect, then Subscribe and Publish to verify that you installation is correct.

If you encounter any issue with the installation, please check out the Installation Guide.

Install Kafka

Download the tarball binary package of Kafka from the Kafka downloads page, unzip the tarball to any folder, change to that folder, and run on Linux/Unix/MacOS:

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
$ ./bin/kafka-server-start.sh config/server.properties

Install Kafka Connect

Download the MigratoryData Sink Connector for Kafka from the MigratoryData Connectors section of the downloads page, unzip it to any folder, say /tmp/kafka/connectors. Change to the folder where you installed Kafka at the previous step and edit the configuration file config/connect-distributed.properties as follows:

plugin.path = /tmp/kafka/connectors/migratorydata-connector-kafka

Finally, start the Kafka Connect distributed service as follows:

$ ./bin/connect-distributed.sh config/connect-distributed.properties

Deploy the Connector

In this example, we show how to deploy MigratoryData Sink Connector as two Kafka Connect tasks which consumes the topics topic_1 and topic_2, and maps them to MigratoryData subjects as follows:

Kafka topic MigratoryData subject
topic_1 /server/status
topic_2 /x/topic_2,/y/topic_2/<KEY>

where <KEY> is the key of the received Kafka message.

To load the connector, run the following command:

$ curl --header "Content-Type: application/json" \
  --request PUT \
  --data '{
      "connector.class":"com.migratorydata.kafka.sink.MigratoryDataSinkConnector",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "tasks.max": "2",
      "migratorydata.servers":"127.0.0.1:8800",
      "migratorydata.entitlement_token":"some-token",
      "topics":"topic_1, topic_2",
      "kafka.topics.topic_1":"/server/status",
      "kafka.topics.topic_2":"/x/${topic}, /y/${topic}/${key}"
}' \
http://127.0.0.1:8083/connectors/migratory_data_sink_00/config

To check that the connector is up and running, run the following command:

curl -s localhost:8083/connectors/migratory_data_sink_00/status | jq .

Test the Connector

Open the demo web app at http://localhost:8800 as detailed above, connect and subscribe to the topics:

  • /server/status
  • /x/topic_2
  • /y/topic_2/key1

Change to the folder where you installed Kafka, and run the following Kafka publisher for the topic topic_1:

$ ./bin/kafka-console-producer.sh --topic topic_1 --bootstrap-server localhost:9092

You should be able to see that the web app displays in real-time the Kafka messages that you publish here.

To publish Kafka messages with keys, let’s run the following Kafka publisher for the topic topic_2. You can publish a message with a key by running the following command and prefixing the message with a key as follows:

# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_2 --property "parse.key=true" --property "key.separator=:"
key1:abc
key1:cde
key2:efg
...

You should be able to see that the web app displays in real-time the Kafka messages that you publish here. Moreover, you will see that the example message abc is published across two subjects /x/topic_2 and /y/topic_2/key1 according to the defined mapping of the connector and the current subscriptions of your web app.

Delete the Connector

To delete the MigratoryData Sink Connector installed above, run the following command:

curl --header "Content-Type: application/json" \
  --request DELETE \
  http://127.0.0.1:8083/connectors/migratory_data_sink_00