Hola, soy Luis y esta vez les traigo otro nuevo artículo.
Aquí está nuestra guía práctica paso a paso para implementar Kafka Connect en Kubernetes para conectar Kafka a sistemas externos.
Kubernetes (K8s) es uno de los proyectos de código abierto más famosos y se está adaptando continuamente.
Kafka es una plataforma de software de procesamiento de flujo de código abierto que utilizan muchas empresas. Por ejemplo, LinkedIn personaliza Apache Kafka para 7 billones de mensajes por día.
Entonces, ¿qué es Kafka Connect ahora? Kafka Connect es un componente de código abierto de Kafka, un marco para conectar Kafka con sistemas externos como bases de datos, almacenes de valores clave, índices de búsqueda y sistemas de archivos.
Facilita la definición rápida de conectores que mueven grandes conjuntos de datos dentro y fuera de Kafka.
Como Kafka Connect es un componente de Kafka, la configuración necesitará agentes de Kafka y Zookeepers (al menos, hasta se elimina la dependencia de Zookeeper). Nuestra configuración se vería a continuación:
Primero, comenzaremos configurando un clúster de Kafka. En este blog, usaremos el código abierto de Confluent. Gráfico de timón.
Para comenzar, asegúrese de que se esté ejecutando un clúster de Kubernetes (p. Ej. GKE de Google, EKS de AWS, AKS por Azure, Minikube, etc.) y las siguientes herramientas están instaladas en su sistema local:
Comencemos por clonar el repositorio y actualizar las dependencias.
git clone git@github.com:confluentinc/cp-helm-charts.git cd cp-helm-charts helm dependency update charts/cp-kafka/
El último comando actualiza las dependencias en el gráfico cp-kafka
que tiene una dependencia del gráfico cp-zookeeper
. Instalación de falla cp-kafka
sin ejecutar el comando de actualización.
Ahora sigamos adelante e implementemos agentes de Kafka con Zookeepers con un nombre de lanzamiento (p. Ej. confluent
) usando el siguiente comando:
helm install --name confluent ./charts/cp-kafka
Pasarán unos minutos antes de que todos los pods comiencen a funcionar. Verifiquemos que los recursos creados con nuestra versión funcionan bien usando kubectl.
$ kubectl get pods NAME READY STATUS RESTARTS AGE confluent-cp-kafka-0 2/2 Running 0 5m16s confluent-cp-kafka-1 2/2 Running 0 4m47s confluent-cp-kafka-2 2/2 Running 0 4m29s confluent-cp-zookeeper-0 2/2 Running 0 5m16s confluent-cp-zookeeper-1 2/2 Running 0 4m47s confluent-cp-zookeeper-2 2/2 Running 0 4m21s $ kubectl get services NAME TYPE CLUSTER-IP PORT(S) AGE cp-kafka ClusterIP xx.xx.xxx.x 9092/TCP 5m16s cp-kafka-headless ClusterIP None 9092/TCP 5m16s cp-zookeeper ClusterIP xx.xx.xxx.x 2181/TCP 5m16s cp-zookeeper-headless ClusterIP None 2888/TCP, 3888/TCP 5m16s
Si nota que todos los corredores y cuidadores del zoológico tienen 2 contenedores por cápsula, uno de estos es el envase prometheus
.
Puedes deshabilitar prometheus
editando los archivos de valores o simplemente estableciendo valores desde la línea de comandos de Helm durante la instalación (por ejemplo, helm install --set prometheus.jmx.enabled=false..
)
Dado que tenemos las dependencias de Kafka Connect en su lugar, podemos seguir adelante e implementar el gráfico de Kafka Connect también.
Sin embargo, para leer desde una base de datos MySQL, necesitaremos JDBC Source Connector instalado en nuestro contenedor.
Para hacerlo usemos la imagen confluentinc / cp-kafka-connect
proporcionada por Confluent y agregue una línea para instalar JDBC Source Connector. Coloque el contenido a continuación en el archivo llamado Dockerfile
FROM confluentinc/cp-kafka-connect:5.4 .0 RUN echo "===> Installing MySQL connector" \ && curl https: FROM confluentinc/cp-kafka-connect:5.4.0 RUN echo "===> Installing MySQL connector" \ && curl https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar --output /usr/share/java/kafka-connect-jdbc/mysql-connector-java-8.0.19.jar
Los siguientes comandos crean y empujan la imagen de la ventana acoplable a Registro de contenedores de Google:
docker build -t gcr.io/project123/cp-kafka-connect:5.4.0-jdbc. docker push gcr.io/project123/cp-kafka-connect:5.4.0-jdbc
El siguiente paso es usar la imagen de la ventana acoplable que acabamos de crear e implementar Kafka connect en Kubernetes:
helm install --name confluent-2 \ --set image="gcr\.io/project123/cp-kafka-connect" \ --set imageTag="5.4.0-jdbc" \ --set kafka.bootstrapServers="PLAINTEXT://confluent-cp-kafka-headless:9092" \ ./charts/cp-kafka-connect
Reemplace el nombre, la imagen y la etiqueta de imagen y con los valores apropiados en el comando anterior. Aquí, kafka.bootstrapServers
es el servicio y el puerto en el que se ejecutan los corredores de Kafka.
Después de correr el comando kubectl get all
nuevamente, deberíamos ver el pod, servicio, implementación, etc. ejecutándose también para Kafka Connect. Asegúrese de que el trabajador de conexión esté sano.
$ kubectl logs confluent-2-cp-kafka-connect-mvt5d \ --container cp-kafka-connect-server [datetime] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect) [datetime] INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Aquí, confluent-2-cp-kafka-connect-mvt5d
es el nombre del pod creado para mí, debería ser algo similar para ti también, según el nombre de lanzamiento que elijas (para mí, el nombre de lanzamiento es: confluent-2
).
Ahora tenemos nuestro servidor Kafka Connect en ejecución, pero para leer de una base de datos (p. Ej. MySQL) necesitaremos crear conectores. Hagámoslo ahora.
Suponiendo que tenemos un servidor MySQL ejecutándose en algún lugar y un cliente MySQL instalado en su sistema local, conectemos al servidor MySQL usando las credenciales apropiadas y ejecutemos las siguientes declaraciones SQL:
# Repace xx.xxx.xxx.xx, and root with appropriate values $ mysql -u root -h xx.xxx.xxx.xx -p CREATE DATABASE IF NOT EXISTS test_db; USE test_db; DROP TABLE IF EXISTS test_table; CREATE TABLE IF NOT EXISTS test_table ( id serial NOT NULL PRIMARY KEY, name varchar(100), emailId varchar(200), branch varchar(200), updated timestamp default CURRENT_TIMESTAMP NOT NULL, INDEX `updated_index` (`updated`) ); INSERT INTO test_table (name, emailId, branch) VALUES ('Chandler', 'muriel@venus.com', 'Transponster'); INSERT INTO test_table (name, emailId, branch) VALUES ('Joey', 'joseph@tribbiani.com', 'DOOL'); exit;
Al implementar Kafka Brokers y Zookeepers arriba, se muestra un Kafka-client
de muestra en los resultados para la prueba. Guardemos eso en un archivo llamado sample-pod.yaml
y desplegar eso.
apiVersion: v1 kind: Pod metadata: name: kafka-client namespace: default spec: containers: - name: kafka-client image: confluentinc/cp-enterprise-kafka:5.4.1 command: - sh - -c - "exec tail -f /dev/null"
Implemente este pod de muestra con el siguiente comando:
kubectl apply -f sample-pod.yaml
Podemos verificar si el servidor Connect está funcionando enviando un simple GET
solicitud al punto final REST de Kafka Connect. Leer más sobre la API REST aquí.
$ kubectl exec -it kafka-client -- curl confluent-2-cp-kafka-connect:8083/connectors # Output []
Como todavía no hay conectores, obtenemos un SUCCESS
respuesta con una lista vacía [ ]
. Vamos exec
en el contenedor y cree un conector:
$ kubectl exec -ti confluent-2-cp-kafka-connect-mvt5d \
--container cp-kafka-connect-server -- /bin/bash
$ curl -X POST
-H "Content-Type: application/json"
--data ' "name": "k8s-connect-source",
"config":
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"tasks.max": 1,
"connection.url":"jdbc:mysql://xx.xxx.xxx.xx/test_dbuser=root&password=ayadav",
"mode": "incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "updated",
"topic.prefix": "k8s-connect-",
"poll.interval.ms": 1000 '
http://localhost:8083/connectors
Podemos verificar el estado del conector ejecutando el siguiente comando (aún desde el interior del pod de muestra de muestra de Enterprise-Kafka
):
$ curl -s -X
GET http://localhost:8083/connectors/k8s-connect-source/status
{"name":"k8s-connect-source","connector":
{"state":"RUNNING","worker_id":"10.8.4.2:8083"},"tasks":
[{"id":0,"state":"RUNNING","worker_id":"10.8.4.2:8083"}],"type":"source"}
Esto debería haber creado una lista de temas en los que Kafka Connect almacena las configuraciones del conector y envía mensajes cada vez que se agrega una nueva fila a la tabla.
Salga del contenedor y ejecute el siguiente comando en su máquina:
$ kubectl -n default exec kafka-client -- /usr/bin/kafka-topics -- zookeeper confluent-cp-zookeeper:2181 --list # Output: List of topics __confluent.support.metrics __consumer_offsets _confluent-metrics confluent-2-cp-kafka-connect-config confluent-2-cp-kafka-connect-offset confluent-2-cp-kafka-connect-status k8s-connect-test k8s-connect-test_table # Listen for the messages on the Kafka topic $ kubectl -n default exec -ti kafka-client -- /usr/bin/kafka-console-consumer --bootstrap-server confluent-10-cp-kafka:9092 --topic k8s-connect-test_table --from-beginning # Output {"id":1,"name":"Joey","emailId":"joey@tribianni.com","branch":"DOOL","updated":1585514796000} {"id":2,"name":"Chandler","emailId":"muriel@venus.com","branch":"Transponster","updated":1585514796000}
Además, puede mantener vivo el shell del oyente, volver a conectarse a MySQL y agregar una nueva fila. Debería ver un nuevo mensaje de este tema en la salida kubectl
.
Para automatizar el proceso de creación de conectores sobre la marcha mientras se implementa Kafka Connect, eche un vistazo a este Solicitud de extracción Había presentado que ahora se fusiona con la rama master
, y el archivo values.yaml
.
Por favor, avíseme en los comentarios si se queda atascado en algún lugar o si tiene alguna sugerencia de mejora.
Gracias por leer este artículo.
Añadir comentario