김태오

Strimzi를 사용하여 Kafka, Debezium을 쿠버네티스에 띄우기 본문

kafka

Strimzi를 사용하여 Kafka, Debezium을 쿠버네티스에 띄우기

ystc1247 2024. 7. 24. 18:57

Debezium은 대표적인 CDC(Change Data Capture) 툴이다. 

CDC를 쓰는 이유로는 데이터베이스의 변화를 빠르게 감지하여 이벤트를 발생시켜 작업을 함에 있다.

 

도입하게 된 유즈케이스를 간단히 설명하자면, 

어플리케이션에서 POST api 인입 -> A 데이터베이스에 정보 저장 -> 외부 api 호출 -> 호출 성공시 B 데이터베이스에 정보 저장 -> 다른 service method들 실행

의 과정에서, 외부 api 가 실패하더라도 A 데이터베이스가 업데이트되어 한 트랜잭션에 묶이지 않았기 때문이다. 

@Transcational 을 걸고 실패시 롤백 핸들링을 하면 되지 않나 싶겠지만, 외부 api 호출을 async로 던지고 있었기에 불가능했다.

아무튼 외부 api 호출을 먼저하고, 호출이 성공할 시 Debezium에서 감지가 가능한 테이블에 정보를 저장하고, 이벤트를 발생시켜 이후 일련의 과정들을 실행하는 것을 목표로 하였다.

 

여기서 이벤트를 발생시키고 받는 과정에 여러 pub/sub이나 MQ가 사용 가능한데, Kafka를 사용하는 방법을 작성하고자 하며, 그 중에서도 Kafka와 Debezium을 따로 인스턴스로 띄우는 것이 아닌 k8s 클러스터 내에 pod으로 띄워 기존의 어플리케이션과의 연결이 쉽도록 한다.

 

Kafka k8s 쉽게 배포하는 방법으로 Strimzi 있다. 개의 yaml 파일로 Kafka cluster, connect, connector 배포하여 사용할 있는데, 과정을 정리해보겠다.

 

우선 사용할 namespace를 정의해주자.

kubectl create ns strimzi-debezium

 

추가적으로 helm을 사용하고 있다면

helm install strimzi-operator strimzi/strimzi-kafka-operator -n strimzi-debezium

 

다음으로 Kafka를 특수 object로 가지는 클러스터를 생성해보자.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: strimzi-debezium
spec:
  kafka:
    version: 3.7.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 10Gi
          deleteClaim: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      ssl.endpoint.identification.algorithm: ""
      log4j.logger.org.apache.kafka: DEBUG
      log4j.logger.kafka: DEBUG
    resources:
      requests:
        memory: 1Gi
        cpu: "0.5"
      limits:
        memory: 2Gi
        cpu: "1"
    template:
      kafkaContainer:
        env:
          - name: KAFKA_HEAP_OPTS
            value: "-Xms1G -Xmx1G"
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
    resources:
      requests:
        memory: 512Mi
        cpu: "0.5"
      limits:
        memory: 1Gi
        cpu: "1"
    config:
      autopurge.snapRetainCount: 3
      autopurge.purgeInterval: 1
      clientCnxnSocket: org.apache.zookeeper.ClientCnxnSocketNetty
      serverCnxnFactory: org.apache.zookeeper.server.NettyServerCnxnFactory
      quorum.ssl.enabled: true
      ssl.quorum.protocol: TLSv1.2
      tickTime: 2000
      initLimit: 15
      syncLimit: 10
    logging:
      type: inline
      loggers:
        zookeeper.root.logger: "INFO,CONSOLE"
    template:
      pod:
        securityContext:
          runAsUser: 1001
          fsGroup: 1001
  entityOperator:
    topicOperator: {}
    userOperator: {}
  clusterCa:
    generateCertificateAuthority: true
  clientsCa:
    generateCertificateAuthority: true

 

여기에서는 kafka와 zookeeper를 각각 3개씩 생성하였고, cpu와 memory 할당을 작지 않게 함을 볼 수 있는데, replica 개수는 용도에 맞게 세팅해도 되겠으나, cpu와 memory는 database 변화 감지를 정상적으로 전달하는데 있어 최소 스펙인것 같기도 하다.

 

또 apiVersion에 v1beta2 말고도 v1alpha 등을 사용할 수도 있는데, 없는 conf property들이 있으니 혼용하여 쓰지 말도록 하자.

Kafka 버젼이 3.7.0인 최신 버젼으로 적용되어 있는데, 이후 strimzi 버젼을 명시하고 모두 올렸을 때 3.2~ 이하 버젼은 호완이 되지 않았다.

 

Kafka listeners에 tls를 해제한 포트와 사용한 포트를 두개 열어두었는데, 이것저것 만지다 보면 tls cert 없이 접근하려 하는 경우 막히는 경우가 있었다. 미리 설정을 잘해두면 좋을 것 같다.

ssl.endpoint.identification.algorithm: ""

 

부분에서는 SSL connection을 위한 hostname verification을 해제해둔 부분인데, prod 환경에서는 권장하지 않는다.

template:
  kafkaContainer:
    env:
      - name: KAFKA_HEAP_OPTS
        value: "-Xms1G -Xmx1G"

 

는 Kafka 힙 할당인데, 이걸 안달고 올렸더니 터진 경험이 있다.

clusterCa:
  generateCertificateAuthority: true
clientsCa:
  generateCertificateAuthority: true

 

부분은 옵션인데, Ca와 pem 등을 직접 만들어 사용해도 무관하다.

openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj "/CN=kafka-connect"
kubectl create secret generic kafka-connect-tls --from-file=tls.key=key.pem --from-file=tls.crt=cert.pem -n strimzi-debezium

 

직접 만들어 사용하는 부분의 일부인데, 나머지는 상세히 검색하면 잘 나와있다.

 

다음으로 kafka-connect.yaml이다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: strimzi-debezium
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: ystc1247/kafka-strimzi:0.0.3
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider

 

annotations:
  strimzi.io/use-connector-resources: "true"

 

부분은 좀 중요한데, 이걸 해둬야 connector가 정상적으로 돌아간다. image같은 경우에는 난 직접 구성하여 DockerHub에 올린걸 사용했는데, strimzi + mysql-connector 가 달린 이미지다. 다른걸 찾아서 넣어도 관계없다.

 

Dockerfile을 첨부한다.

FROM strimzi/kafka:0.20.1-kafka-2.5.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001

 

사용한 mysql connector >

curl -L https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz | tar -xz\n

 

bootstrapServers: my-cluster-kafka-bootstrap:9093

 

부분에 no-tls로 설정한 9092를 써도 되나, 인증 이슈가 터질수도 있다.

 

다음으로 kafka-connector.yaml 이다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: inventory-connector
  namespace: strimzi-debezium
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    topic.prefix: ${원하는 topic prefix}
    database.hostname: dbhost
    database.port: dbport
    database.user: 유저
    database.password: 비번
    database.dbname: db이름
    database.server.id: 185239 (고유한 id를 사용)
    database.server.name: db server
    table.include.list: 중요! 변화를 감지하고자 하는 table
    schema.history.internal.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    schema.history.internal.kafka.topic: "schema-changes.inventory"
    database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    database.history.kafka.topic: "database-changes.inventory"
    include.schema.changes: "true"
    database.history.kafka.reset.offset: "true"
    snapshot.mode: "when_needed"

 

snapshot.mode 는 좀 중요한데, 사용할 수 있는 설정이 많은데 입맛껏 사용하면 된다. https://debezium.io/documentation/reference/stable/connectors/mysql.html

 

Debezium connector for MySQL :: Debezium Documentation

An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. Each additional condition is an object that specifies the criteria for filtering the data that an ad

debezium.io

pod 생성과 소멸을 반복하다 Debezium에서 offset을 놓친 경우, 현재 snapshot과 이용 가능한 snapshot에 불일치가 발생할 수 있다.

│ INFO: Connected to {} at mysql-bin-changelog.400734/891 (sid:185054, cid:15583019)                                                                          │
│ 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Connected to binlog at dev.data.yeogiya.io:3306, starting at BinlogOffsetContext{sourceInfoSchema=Schema{io.debezium.co │
│ 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Waiting for keepalive thread to start (io.debezium.connector.binlog.BinlogStreamingChangeEventSource) [debezium-mysqlco │
│ 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Creating thread debezium-mysqlconnector-{}-binlog-client (io.debezium.util.Threads) [blc-{} │
│ 2024-07-24 06:42:19,297 ERROR [inventory-connector|task-0] Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin-changelog.400734/89 │
│ 2024-07-24 06:42:19,298 ERROR [inventory-connector|task-0] Producer failure (io.debezium.pipeline.ErrorHandler) [blc-{}:3306]                                    │
│ io.debezium.DebeziumException: Could not find first log file name in binary log index file Error code: 1236; SQLSTATE: HY000.

 

이런게 나왔다면 당첨이다. mysql-bin-changelog.400734가 현재 없으나 Debezium에서 읽고 싶어하는 경우가 에러의 원인의 다수이다. 외에도 MySQL version가 업데이트된 경우도 있을 수 있다.

 

우선 전자의 경우 해결법이다.

SHOW VARIABLES LIKE 'log_bin';

 

먼저 얘가 ON 으로 되어있는지 확인한다.

 

다음으로 

mysql> SHOW BINARY LOGS;
+----------------------------+-----------+-----------+
| Log_name                   | File_size | Encrypted |
+----------------------------+-----------+-----------+
| mysql-bin-changelog.400803 |      2436 | No        |
| mysql-bin-changelog.400804 |      2436 | No        |
| mysql-bin-changelog.400805 |       891 | No        |
+----------------------------+-----------+-----------+

 

현재 Debezium에서 읽는 버젼과 현재 최신 버젼의 차이를 보자.

 

추가적으로 expire_logs_days, binlog_expire_logs_seconds 같은 global variable 들에 문제가 있을 수 있으나, 변경한 적이 없고 default로 되어있다면 문제될건 없을 것이다.

 

이제 Kafka, Debezium에서 읽고자 하는 offset을 강제로 변경해줘야 하는데, 다음의 절차를 밟는다.

apiVersion: v1
kind: Pod
metadata:
  name: kafka-client
  namespace: default
spec:
  containers:
    - name: kafka
      image: confluentinc/cp-kafka:latest
      command:
        - sleep
        - "3600"

 

일단 얘를 띄워주는데, Kafka sh 툴들을 사용할 수 있으면 생략해도 된다.

이후

kubectl exec -it kafka-client -n default -- /bin/sh
cd /usr/bin
./kafka-consumer-groups --bootstrap-server my-cluster-kafka-bootstrap.strimzi-debezium.svc.cluster.local:9092 --group inventory-connector --reset-offsets --to-latest --all-topics --execute

 

이후 얘를 실행해주면 되는데, 모든 topic 의 offset을 가장 최신으로 되돌려버린다. 물론 그 사이의 데이터베이스 변화 감지는 전혀 하지 못하고 다 날라가기 때문에, 주의해서 사용해야 한다.

 

다음은 결과들이다.

 

 kubectl exec -it my-cluster-kafka-0 -n strimzi-debezium -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic {토픽명} --from-beginning

 

를 실행하면, kafka 가 produce하고 있는 message들이 나온다.

'kafka' 카테고리의 다른 글

Kafka 간단한 메시지 교환  (1) 2023.11.27
Kafka 소개  (1) 2023.11.26