This tutorial shows how to deploy a MigratoryData cluster — with Kafka support, in conjunction with Amazon Managed Streaming for Apache Kafka (MSK), using Elastic Kubernetes Service (EKS).

Prerequisites

Before deploying MigratoryData on EKS, ensure that you have an AWS account and have installed the following tools:

Variables

To avoid a hardcoded names of the EKS cluster and Kafka topic, let’s define environment variables as follows:

export EKS_CLUSTER=eks-migratorydata
export EKS_SERVICE_ACCOUNT=msk-service-account
export EKS_NAMESPACE=migratory
export EKS_ROLE_NAME=msk-migratory-role
export EKS_SECURITY_GROUP=eks-migratorydata-sg
export KAFKA_TOPIC=server

export IAM_POLICY_NAME=msk-cluster-access-migratorydata
export MSK_CLUSTER_NAME=msk-migratorydata
export MSK_CONFIGURATION_NAME=MSK-MigratoryData-Auto-Create-Topics-Enabled

Create an EKS cluster

Login to AWS with the following command and follow the instructions on the screen to configure your AWS credentials:

aws configure

Create an EKS cluster with at least three and at most five nodes:

  • Create cluster configuration file. For NLB load balancer to work we need to change the parameter awsLoadBalancerController from false to true:

  • To be able to configure a service account used for kafka client authentication from EKS to MSK cluster, we need to enable OIDC. We need to change the parameter withOIDC from false to true:

eksctl create cluster --name=$EKS_CLUSTER \
--version=1.28 \
--nodes-min=3 \
--nodes-max=5 \
--region=us-east-1 \
--zones=us-east-1a,us-east-1b \
--ssh-access=true \
--dry-run | sed 's/awsLoadBalancerController: false/awsLoadBalancerController: true/g' | sed 's/withOIDC: false/withOIDC: true/g' > cluster-config.yaml
  • Create the cluster:
eksctl create cluster -f cluster-config.yaml

Check if the EKS nodes are ready with the following command:

kubectl get nodes

In order to connect to MSK cluster from EKS pods, we need to create and attach an IAM policy to an eks service account. The IAM policy gives the service account the necessary permissions to interact with MSK. The service account is configured in migratorydata cluster deployment configuration from bellow and is set with config serviceAccountName: $EKS_SERVICE_ACCOUNT.

Create the file msk-policy.json

cat > msk-policy.json <<EOL
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "kafka-cluster:*",
            "Resource": "*"
        }
    ]
}
EOL
aws iam create-policy --policy-name $IAM_POLICY_NAME --policy-document file://msk-policy.json

Attach the policy to the service account:

POLICY_ARN=$(aws iam list-policies --query "Policies[?PolicyName=='$IAM_POLICY_NAME'].{ARN:Arn}" --output text)

eksctl create iamserviceaccount --name $EKS_SERVICE_ACCOUNT --namespace $EKS_NAMESPACE --cluster $EKS_CLUSTER --role-name $EKS_ROLE_NAME \
--attach-policy-arn $POLICY_ARN --approve

See if role was attached to the service account with the following commands:

aws iam get-role --role-name $EKS_ROLE_NAME --query Role.AssumeRolePolicyDocument
aws iam list-attached-role-policies --role-name $EKS_ROLE_NAME --query 'AttachedPolicies[].PolicyArn' --output text

kubectl describe serviceaccount $EKS_SERVICE_ACCOUNT -n $EKS_NAMESPACE

Create an MSK cluster

Get VPC and Subnets ids from eks cluster

VPC_ID=`aws ec2 describe-vpcs --filters Name=tag:Name,Values="*$EKS_CLUSTER*" --query "Vpcs[].VpcId" --output text`
echo $VPC_ID
SUBNET_ID_1=$(aws ec2 describe-subnets --filter Name=vpc-id,Values=$VPC_ID --query 'Subnets[?AvailabilityZone==`us-east-1a`&&MapPublicIpOnLaunch==`true`].SubnetId' --output text)
echo $SUBNET_ID_1
SUBNET_ID_2=$(aws ec2 describe-subnets --filter Name=vpc-id,Values=$VPC_ID --query 'Subnets[?AvailabilityZone==`us-east-1b`&&MapPublicIpOnLaunch==`true`].SubnetId' --output text)
echo $SUBNET_ID_2

Create security group used for communication between EKS and MSK

aws ec2 create-security-group --group-name $EKS_SECURITY_GROUP --description "MigratoryData msk security group" --vpc-id $VPC_ID

Add inbound permit rules for port 9098:

SECURITY_GROUP_ID=$(aws ec2 describe-security-groups --filter Name=vpc-id,Values=$VPC_ID Name=group-name,Values=$EKS_SECURITY_GROUP --query 'SecurityGroups[*].[GroupId]' --output text)
echo $SECURITY_GROUP_ID

aws ec2 authorize-security-group-ingress --group-id $SECURITY_GROUP_ID --protocol tcp --port 9098 --cidr 0.0.0.0/0

Create configuration file for kafka to allow automatic topic creation:

cat > configuration.txt <<EOL
auto.create.topics.enable=true
EOL

MSK_CONFIGURATION_ARN=$(aws kafka create-configuration --name "${MSK_CONFIGURATION_NAME}" --description "Auto create topics enabled" --kafka-versions "3.5.1" --server-properties fileb://configuration.txt --query "Arn" --output text)
echo $MSK_CONFIGURATION_ARN

Create the following configuration files:

  • brokernodegroupinfo.json to define the broker node group
  • client-authentication.json to enable IAM client authentication
  • configuration.json to define custom kafka configuration
cat > brokernodegroupinfo.json <<EOL
{
  "InstanceType": "kafka.t3.small",
  "ClientSubnets": [
    "${SUBNET_ID_1}",
    "${SUBNET_ID_2}"
  ],
  "SecurityGroups": [
    "${SECURITY_GROUP_ID}"
  ]
}
EOL
cat > client-authentication.json <<EOL
{
  "Sasl": {
    "Iam": {
      "Enabled": true
    }
  }
}
EOL
cat > configuration.json <<EOL
{
  "Revision": 1,
  "Arn": "${MSK_CONFIGURATION_ARN}"
}
EOL

Run the following command to create the MSK cluster:

aws kafka create-cluster --cluster-name $MSK_CLUSTER_NAME \
--broker-node-group-info file://brokernodegroupinfo.json \
--kafka-version "3.5.1" \
--client-authentication file://client-authentication.json \
--configuration-info file://configuration.json \
--number-of-broker-nodes 2

This make take 15 to 30 minutes to complete.

Save the value of the ClusterArn key as you need it to perform other actions on your cluster. If you forget the cluster ARN, list all the kafka clusters with the following command:

MSK_ARN=$(aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn' --output text)
echo $MSK_ARN

See info about the cluster with the following command:

aws kafka describe-cluster --cluster-arn $MSK_ARN

Get bootstrap server

Retrieve the Kafka bootstrap server in variable KAFKA_BOOTSTRAP_SERVER as follows:

KAFKA_BOOTSTRAP_SERVER=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_ARN --output text)
echo $KAFKA_BOOTSTRAP_SERVER

Install a load balancer

Install an AWS load balancer controller as follows to be able to use a Network Load Balancer (NLB) to expose the MigratoryData:

helm repo add "eks" "https://aws.github.io/eks-charts"
helm repo update
helm upgrade -i aws-load-balancer-controller \
eks/aws-load-balancer-controller \
--namespace kube-system \
--set clusterName=$EKS_CLUSTER

kubectl get all --selector  "app.kubernetes.io/name=aws-load-balancer-controller"  \
--namespace "kube-system"

Create namespace

kubectl create namespace $EKS_NAMESPACE

Deploy MigratoryData cluster

We will use the following Kubernetes manifest to build a cluster of three MigratoryData servers:

cat > migratorydata-cluster.yaml <<EOL
apiVersion: v1
kind: ConfigMap
metadata:
  name: client-properties
  labels:
    name: client-properties
  namespace: migratory
data:
  client.properties: |-
    security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler    
---
apiVersion: v1
kind: Service
metadata:
  namespace: migratory
  name: migratorydata-cs
  labels:
    app: migratorydata
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-type: external
    service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
    service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
spec:
  type: LoadBalancer
  ports:
    - name: client-port
      port: 80
      protocol: TCP
      targetPort: 8800
  selector:
    app: migratorydata
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: migratorydata
  namespace: migratory
  labels:
    app: migratorydata
spec:
  selector:
    matchLabels:
      app: migratorydata
  replicas: 3
  template:
    metadata:
      labels:
        app: migratorydata
    spec:
      serviceAccountName: $EKS_SERVICE_ACCOUNT
      containers:
      - name: migratorydata
        imagePullPolicy: Always
        image: migratorydata/server:latest
        volumeMounts:
        - name: client-properties
          mountPath: "/migratorydata/addons/kafka/consumer.properties"
          subPath: client.properties
          readOnly: true
        - name: client-properties
          mountPath: "/migratorydata/addons/kafka/producer.properties"
          subPath: client.properties
          readOnly: true
        env:
          - name: MIGRATORYDATA_EXTRA_OPTS
            value: "-DMemory=512MB -DX.ConnectionOffload=true -DClusterEngine=kafka"
          - name: MIGRATORYDATA_KAFKA_EXTRA_OPTS
            value: "-Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVER -Dtopics=$KAFKA_TOPIC"
          - name: MIGRATORYDATA_JAVA_EXTRA_OPTS
            value: "-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap"
        resources:
          requests:
            memory: "512Mi"
        ports:
          - name: client-port
            containerPort: 8800
        readinessProbe:
          tcpSocket:
            port: 8800
          initialDelaySeconds: 10
          periodSeconds: 5
        livenessProbe:
          tcpSocket:
            port: 8800
          initialDelaySeconds: 10
          periodSeconds: 5
      volumes:
      - name: client-properties
        configMap:
          name: client-properties
EOL

This manifest contains a Service and a Deployment. The Service is used to handle the clients of the MigratoryData cluster over the port 8800. Furthermore, the NLB service create above maps this port to port number 80. Consequently, clients will establish connections with the MigratoryData cluster on port 80.

In this manifest, we’ve used the MIGRATORYDATA_EXTRA_OPTS environment variable which can be used to define specific parameters or adjust the default value of any parameter listed in the Configuration Guide. In this manifest, we’ve used this environment variable to modify the default values of the parameters such as Memory. Additionally, we’ve employed it to modify the default value of the parameter ClusterEngine, to enable the Kafka native add-on.

To customize the MigratoryData’s native add-on for Kafka, the environment variable MIGRATORYDATA_KAFKA_EXTRA_OPTS offers the flexibility to define specific parameters or adjust the default value of any parameter of the Kafka native add-on. In the manifest above, we’ve used this environment variable to modify the default values of the parameters bootstrap.servers and topics among others to connect to MSK.

To deploy the MigratoryData cluster, copy this manifest to a file migratorydata-cluster.yaml, and run:

kubectl apply -f migratorydata-cluster.yaml

Namespace switch

Because the deployment concerns the namespace migratory, switch to this namespace as follows:

kubectl config set-context --current --namespace=migratory

To return to the default namespace, run:

kubectl config set-context --current --namespace=default

Verify the deployment

Check the running pods to ensure the migratorydata pods are running:

kubectl get pods 

The output of this command should include something similar to the following:

NAME                                READY   STATUS    RESTARTS   AGE
migratorydata-57848575bd-4tnbz   1/1     Running   0          4m32s
migratorydata-57848575bd-gjmld   1/1     Running   0          4m32s
migratorydata-57848575bd-tcbtf   1/1     Running   0          4m32s

You can check the logs of each cluster member by running a command as follows:

kubectl logs migratorydata-57848575bd-4tnbz

Test deployment

Now, you can check that the service of the Docker manifest above is up and running:

kubectl get svc

You should see an output similar to the following:

NAME                  TYPE           CLUSTER-IP    EXTERNAL-IP   PORT(S)        AGE
migratorydata-cs      LoadBalancer   10.0.90.187   NLB-DNS    80:30546/TCP   5m27s

You should now be able to connect to http://NLB-DNS and run the demo app provided with each MigratoryData server of the cluster (where NLB-DNS is the external address assigned by NLB service to your client service.

Scaling

The stateless nature of the MigratoryData cluster when deployed in conjunction with MSK, where each cluster member is independent from the others, highly simplifies the horizontal scaling on EKS.

Manual scaling up

For example, if the load of your system increases substantially, and supposing your nodes have enough resources available, you can add two new members to the cluster by modifying the replicas field as follows:

kubectl scale deployment migratorydata --replicas=5

Manual scaling down

If the load of your system decreases significantly, then you might remove three members from the cluster by modifying the replicas field as follows:

kubectl scale deployment migratorydata --replicas=2

Autoscaling

Manual scaling is practical if the load of your system changes gradually. Otherwise, you can use the autoscaling feature of Kubernetes.

Kubernetes can monitor the load of your system, typically expressed in CPU usage, and scale your MigratoryData cluster up and down by automatically modifying the replicas field.

In the example above, to add one or more new members up to a maximum of 5 cluster members if the CPU usage of the existing members becomes higher than 50%, or remove one or more of the existing members if the CPU usage of the existing members becomes lower than 50%, use the following command:

kubectl autoscale deployment migratorydata \
--cpu-percent=50 --min=3 --max=5

Now, you can display information about the autoscaler object above using the following command:

kubectl get hpa

and display CPU usage of cluster members with:

kubectl top pods

While testing cluster autoscaling, it is important to understand that the Kubernetes autoscaler periodically retrieves CPU usage information from the cluster members. As a result, the autoscaling process may not appear instantaneous, but this delay aligns with the normal behavior of Kubernetes.

Node Scaling with eksctl

Get node group name with the following command:

eksctl get nodegroup --cluster=$EKS_CLUSTER --region=us-east-1

You should see an output similar to the following:

CLUSTER                 NODEGROUP       STATUS  CREATED                 MIN SIZE        MAX SIZE        DESIRED CAPACITY        INSTANCE TYPE   IMAGE ID        
eks-migratorydata       ng-78d1f82e     ACTIVE  2024-01-25T07:55:46Z    3               5               5                       m5.large        AL2_x86_64      

To scale the number of nodes in the EKS cluster, use the following command, update <NODE_GROUP> with the value from above:

eksctl scale nodegroup --cluster=$EKS_CLUSTER --nodes=5 --name=<NODE_GROUP> --region=us-east-1

See the number of nodes increased with the following command:

kubectl get nodes

You should see an output similar to the following:

NAME                             STATUS   ROLES    AGE     VERSION
ip-192-168-0-196.ec2.internal    Ready    <none>   2m26s   v1.27.9-eks-5e0fdde
ip-192-168-20-197.ec2.internal   Ready    <none>   2m26s   v1.27.9-eks-5e0fdde
ip-192-168-46-194.ec2.internal   Ready    <none>   54m     v1.27.9-eks-5e0fdde
ip-192-168-49-230.ec2.internal   Ready    <none>   54m     v1.27.9-eks-5e0fdde
ip-192-168-8-103.ec2.internal    Ready    <none>   54m     v1.27.9-eks-5e0fdde

Node Failure Testing

MigratoryData clustering tolerates a number of cluster member to be down or to fail as detailed in the Clustering section.

In order to test an EKS node failure, use:

kubectl drain <node-name> --force --delete-local-data --ignore-daemonsets

Then, to start an EKS node, use:

kubectl uncordon <node-name>

Uninstall

To uninstall or if something got wrong with the commands above, you can remove the allocated resources (and try again) as detailed below.

Delete kubernetes resources

kubectl delete namespace $EKS_NAMESPACE

Delete MSK cluster

aws kafka delete-cluster --cluster-arn $MSK_ARN

Delete the msk configuration, wait for msk cluster to be deleted before running the following command:

aws kafka delete-configuration --arn $MSK_CONFIGURATION_ARN

Delete the security group, same as above, wait for msk cluster to be deleted before running the following command:

SECURITY_GROUP_ID=$(aws ec2 describe-security-groups --filter Name=vpc-id,Values=$VPC_ID Name=group-name,Values=$EKS_SECURITY_GROUP --query 'SecurityGroups[*].[GroupId]' --output text)
aws ec2 delete-security-group --group-id $SECURITY_GROUP_ID

Delete EKS cluster

eksctl delete cluster --name=$EKS_CLUSTER --region=us-east-1

aws iam delete-policy --policy-arn $POLICY_ARN

Build realtime apps

First, please read the documentation of the Kafka native add-on to understand the automatic mapping between MigratoryData subjects and Kafka topics.

Utilize MigratoryData’s client APIs to create real-time applications that communicate with your MigratoryData cluster via MSK.

Also, employ the APIs or tools of MSK to generate real-time messages, which are subsequently delivered to MigratoryData’s clients. Similarly, consume real-time messages from MSK that originate from MigratoryData’s clients.