Overview
The MigratoryData Source Connector for Kafka consumes MigratoryData messages from a configurable list of topics and streams them to Kafka according to a configurable mapping between MigratoryData subjects and Kafka topics.
Requirements
Built upon the Kafka Connect API, MigratoryData Source Connector for Kafka must be installed into a Kafka Connect distributed service. Therefore, here are the requirements:
- A running Kafka cluster
- A running Kafka Connect distributed service
- A running MigratoryData cluster
Installation
-
To install the MigratoryData Source Connector for Kafka, download the Kafka connector from the MigratoryData Connectors section of the downloads page.
-
Unzip the Kafka connector archive and place the
JAR
filemigratorydata-connector-kafka-<VERSION>.jar
of the archive to a folder<CONNECTOR_PATH>
accessible by your Kafka Connect distributed service. -
Edit the configuration file
config/connect-distributed.properties
of your Kafka Connect distributed service and configure the parameterplugin.path
to include the folder<CONNECTOR_PATH>
where you placed theJAR
file in the previous step.
Configuration
MigratoryData Source Connector for Kafka consists of Kafka-defined parameters, MigratoryData-specific parameters, as well as subjects-to-topics mapping parameters.
Kafka-defined parameters
Entry-point Class
The Java class of the connector is defined as follows:
"connector.class":"com.migratorydata.kafka.source.MigratoryDataSourceConnector"
Converters
In MigratoryData both the content and the subject of the messages are UTF-8 strings. Therefore, the connector should use the following convertors:
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter"
Tasks
MigratoryData Source Connector can be split into multiple tasks across the Workers of the Kafka Connect distributed service, where each task will consume messages from a distinct subset of MigratoryData subjects. The Kafka Connect service is in charge with:
- the distribution of tasks across the Kafka Connect workers
- the distribution of MigratoryData subjects across the tasks
- the restart of a task after a failure or a load re-balance
For example, to split the MigratoryData Source Connector into at most five tasks, configure the connector as follows:
"tasks.max": "5",
MigratoryData-specific parameters
Define the MigratoryData cluster where the connector should connect to. For example,
supposing the MigratoryData cluster consists of two instances which accept
encrypted client connections at the addresses push1.example.com:443
,
push2.example.com:443
, then configure this connector as follows:
"migratorydata.servers": "push1.example.com:443,push2.example.com:443",
"migratorydata.encryption": "true"
In order to be able to consume messages from a MigratoryData cluster, a client must provide an entitlement token such that message consumption is allowed only on the entitled subjects. To allow the connector to subscribe to messages, configure the entitlement token as in the following example:
"migratorydata.entitlement_token": "some-token"
Subjects-to-topics mapping parameters
The list of MigratoryData subject and their mapping to Kafka topics is defined as follows.
A MigratoryData subject, say /mysubject/1
can be mapped into a Kafka topic, say
mytopic1
using the following configuration:
"migratorydata.subject.mysubject.1": "mytopic1"
Additionally, a MigratoryData message can be published to Kafka with a key. For example, the
subject /mysubject/2
can be mapped into the topic mytopic2
with the key mykey2
as follows:
"migratorydata.subject.mysubject.2": "mytopic2"
"migratorydata.key.mysubject.2": "mykey2"
Reliability
MigratoryData Source Connector achieves reliable messaging without message losses.
Indeed, each task of the MigratoryData Source Connector writes into Kafka the sequence numbers of the MigratoryData messages successfully published into Kafka.
Therefore, if a task is restarted after a failure or a load re-balance performed by Kafka Connect, the task will continue publication from the last sequence numbers previously acknowledged by the task.
Quick start tutorial
In this section we demonstrate how to achieve web messaging into Kafka.
Install MigratoryData
Download the tarball package of the MigratoryData server from the downloads page, unzip the tarball to any folder, change to that folder, and run on Linux/Unix/MacOS:
$ ./start-migratorydata.sh
By default, the MigratoryData server will accept client connections on the address localhost:8800
.
Therefore, open in a browser the following url:
http://localhost:8800
A welcome page should be loaded. Click on the DEMO
button and a demo web app should be loaded.
Click on the buttons Connect
, then Subscribe
and Publish
to verify that you installation
is correct.
If you encounter any issue with the installation, please check out the Installation Guide.
Install Kafka
Download the tarball binary package of Kafka from the Kafka downloads page, unzip the tarball to any folder, change to that folder, and run on Linux/Unix/MacOS:
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
$ ./bin/kafka-server-start.sh config/server.properties
Install Kafka Connect
Download the MigratoryData Connector for Kafka from the MigratoryData Connectors section of the downloads page, unzip it to any folder, say /tmp/kafka/connectors
. Change to the folder where you installed Kafka at the previous step and edit the configuration file config/connect-distributed.properties
as follows:
plugin.path = /tmp/kafka/connectors/migratorydata-connector-kafka
Finally, start the Kafka Connect distributed service as follows:
$ ./bin/connect-distributed.sh config/connect-distributed.properties
Deploy the Connector
In this example, we show how to deploy MigratoryData Source Connector as two Kafka Connect
tasks which consumes the MigratoryData subjects /server/status1
and /server/status2
, and maps them
to the Kafka topics as follows:
MigratoryData subject | Kafka topic |
---|---|
/server/status1 |
topic1 |
/server/status2 |
topic2 with key: mykey |
To load the source connector, run the following command:
$ curl --header "Content-Type: application/json" \
--request PUT \
--data '{
"connector.class":"com.migratorydata.kafka.source.MigratoryDataSourceConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "2",
"migratorydata.servers":"127.0.0.1:8800",
"migratorydata.entitlement_token":"some-token",
"migratorydata.subject.server.status1":"topic1",
"migratorydata.subject.server.status2":"topic2",
"migratorydata.key.server.status2":"mykey"
}' \
http://127.0.0.1:8083/connectors/migratory_data_source_00/config
To check that the connector is up and running, run the following command:
curl -s localhost:8083/connectors/migratory_data_source_00/status | jq .
Test the Connector
Open two Kafka consumers for the topics topic1
and topic2
as follows:
$ ./bin/kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server localhost:9092
$ ./bin/kafka-console-consumer.sh --topic topic2 --from-beginning --bootstrap-server localhost:9092
Open the demo web app at http://localhost:8800 as detailed above, connect and publish on the subjects:
/server/status1
/server/status2
You should be able to see in your Kafka consumers in real-time the web messages published by the web app.
Delete the Connector
To delete the MigratoryData Source Connector installed above, run the following command:
$ curl --header "Content-Type: application/json" \
--request DELETE \
http://127.0.0.1:8083/connectors/migratory_data_source_00