일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- Debezium
- traceId
- 0 replica
- 동등성
- Leaderboard
- keda
- eks
- spring boot
- Salting
- SW 마에스트로
- blue-green
- yml
- zset
- logback
- minreplica
- Strimzi
- Grafana
- hammerDB
- SW Maestro
- 스프링부트
- Benchmarks
- Kafka
- Kubernetes
- Software maestro
- docket
- slow query
- Database
- propogation
- MSSQL
- Helm
- Today
- Total
김태오
Strimzi를 사용하여 Kafka, Debezium을 쿠버네티스에 띄우기 본문
Debezium은 대표적인 CDC(Change Data Capture) 툴이다.
CDC를 쓰는 이유로는 데이터베이스의 변화를 빠르게 감지하여 이벤트를 발생시켜 작업을 함에 있다.
도입하게 된 유즈케이스를 간단히 설명하자면,
어플리케이션에서 POST api 인입 -> A 데이터베이스에 정보 저장 -> 외부 api 호출 -> 호출 성공시 B 데이터베이스에 정보 저장 -> 다른 service method들 실행
의 과정에서, 외부 api 가 실패하더라도 A 데이터베이스가 업데이트되어 한 트랜잭션에 묶이지 않았기 때문이다.
@Transcational 을 걸고 실패시 롤백 핸들링을 하면 되지 않나 싶겠지만, 외부 api 호출을 async로 던지고 있었기에 불가능했다.
아무튼 외부 api 호출을 먼저하고, 호출이 성공할 시 Debezium에서 감지가 가능한 테이블에 정보를 저장하고, 이벤트를 발생시켜 이후 일련의 과정들을 실행하는 것을 목표로 하였다.
여기서 이벤트를 발생시키고 받는 과정에 여러 pub/sub이나 MQ가 사용 가능한데, Kafka를 사용하는 방법을 작성하고자 하며, 그 중에서도 Kafka와 Debezium을 따로 인스턴스로 띄우는 것이 아닌 k8s 클러스터 내에 pod으로 띄워 기존의 어플리케이션과의 연결이 쉽도록 한다.
Kafka를 k8s에 쉽게 배포하는 방법으로 Strimzi가 있다. 총 세 개의 yaml 파일로 Kafka cluster, connect, connector를 배포하여 사용할 수 있는데, 그 과정을 정리해보겠다.
우선 사용할 namespace를 정의해주자.
kubectl create ns strimzi-debezium
추가적으로 helm을 사용하고 있다면
helm install strimzi-operator strimzi/strimzi-kafka-operator -n strimzi-debezium
다음으로 Kafka를 특수 object로 가지는 클러스터를 생성해보자.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: strimzi-debezium
spec:
kafka:
version: 3.7.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
ssl.endpoint.identification.algorithm: ""
log4j.logger.org.apache.kafka: DEBUG
log4j.logger.kafka: DEBUG
resources:
requests:
memory: 1Gi
cpu: "0.5"
limits:
memory: 2Gi
cpu: "1"
template:
kafkaContainer:
env:
- name: KAFKA_HEAP_OPTS
value: "-Xms1G -Xmx1G"
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
deleteClaim: false
resources:
requests:
memory: 512Mi
cpu: "0.5"
limits:
memory: 1Gi
cpu: "1"
config:
autopurge.snapRetainCount: 3
autopurge.purgeInterval: 1
clientCnxnSocket: org.apache.zookeeper.ClientCnxnSocketNetty
serverCnxnFactory: org.apache.zookeeper.server.NettyServerCnxnFactory
quorum.ssl.enabled: true
ssl.quorum.protocol: TLSv1.2
tickTime: 2000
initLimit: 15
syncLimit: 10
logging:
type: inline
loggers:
zookeeper.root.logger: "INFO,CONSOLE"
template:
pod:
securityContext:
runAsUser: 1001
fsGroup: 1001
entityOperator:
topicOperator: {}
userOperator: {}
clusterCa:
generateCertificateAuthority: true
clientsCa:
generateCertificateAuthority: true
여기에서는 kafka와 zookeeper를 각각 3개씩 생성하였고, cpu와 memory 할당을 작지 않게 함을 볼 수 있는데, replica 개수는 용도에 맞게 세팅해도 되겠으나, cpu와 memory는 database 변화 감지를 정상적으로 전달하는데 있어 최소 스펙인것 같기도 하다.
또 apiVersion에 v1beta2 말고도 v1alpha 등을 사용할 수도 있는데, 없는 conf property들이 있으니 혼용하여 쓰지 말도록 하자.
Kafka 버젼이 3.7.0인 최신 버젼으로 적용되어 있는데, 이후 strimzi 버젼을 명시하고 모두 올렸을 때 3.2~ 이하 버젼은 호완이 되지 않았다.
Kafka listeners에 tls를 해제한 포트와 사용한 포트를 두개 열어두었는데, 이것저것 만지다 보면 tls cert 없이 접근하려 하는 경우 막히는 경우가 있었다. 미리 설정을 잘해두면 좋을 것 같다.
ssl.endpoint.identification.algorithm: ""
부분에서는 SSL connection을 위한 hostname verification을 해제해둔 부분인데, prod 환경에서는 권장하지 않는다.
template:
kafkaContainer:
env:
- name: KAFKA_HEAP_OPTS
value: "-Xms1G -Xmx1G"
는 Kafka 힙 할당인데, 이걸 안달고 올렸더니 터진 경험이 있다.
clusterCa:
generateCertificateAuthority: true
clientsCa:
generateCertificateAuthority: true
부분은 옵션인데, Ca와 pem 등을 직접 만들어 사용해도 무관하다.
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj "/CN=kafka-connect"
kubectl create secret generic kafka-connect-tls --from-file=tls.key=key.pem --from-file=tls.crt=cert.pem -n strimzi-debezium
직접 만들어 사용하는 부분의 일부인데, 나머지는 상세히 검색하면 잘 나와있다.
다음으로 kafka-connect.yaml이다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: strimzi-debezium
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: ystc1247/kafka-strimzi:0.0.3
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
annotations:
strimzi.io/use-connector-resources: "true"
부분은 좀 중요한데, 이걸 해둬야 connector가 정상적으로 돌아간다. image같은 경우에는 난 직접 구성하여 DockerHub에 올린걸 사용했는데, strimzi + mysql-connector 가 달린 이미지다. 다른걸 찾아서 넣어도 관계없다.
Dockerfile을 첨부한다.
FROM strimzi/kafka:0.20.1-kafka-2.5.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001
사용한 mysql connector >
curl -L https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz | tar -xz\n
bootstrapServers: my-cluster-kafka-bootstrap:9093
부분에 no-tls로 설정한 9092를 써도 되나, 인증 이슈가 터질수도 있다.
다음으로 kafka-connector.yaml 이다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: inventory-connector
namespace: strimzi-debezium
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
topic.prefix: ${원하는 topic prefix}
database.hostname: dbhost
database.port: dbport
database.user: 유저
database.password: 비번
database.dbname: db이름
database.server.id: 185239 (고유한 id를 사용)
database.server.name: db server
table.include.list: 중요! 변화를 감지하고자 하는 table
schema.history.internal.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
schema.history.internal.kafka.topic: "schema-changes.inventory"
database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
database.history.kafka.topic: "database-changes.inventory"
include.schema.changes: "true"
database.history.kafka.reset.offset: "true"
snapshot.mode: "when_needed"
snapshot.mode 는 좀 중요한데, 사용할 수 있는 설정이 많은데 입맛껏 사용하면 된다. https://debezium.io/documentation/reference/stable/connectors/mysql.html
pod 생성과 소멸을 반복하다 Debezium에서 offset을 놓친 경우, 현재 snapshot과 이용 가능한 snapshot에 불일치가 발생할 수 있다.
│ INFO: Connected to {} at mysql-bin-changelog.400734/891 (sid:185054, cid:15583019) │
│ 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Connected to binlog at dev.data.yeogiya.io:3306, starting at BinlogOffsetContext{sourceInfoSchema=Schema{io.debezium.co │
│ 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Waiting for keepalive thread to start (io.debezium.connector.binlog.BinlogStreamingChangeEventSource) [debezium-mysqlco │
│ 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Creating thread debezium-mysqlconnector-{}-binlog-client (io.debezium.util.Threads) [blc-{} │
│ 2024-07-24 06:42:19,297 ERROR [inventory-connector|task-0] Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin-changelog.400734/89 │
│ 2024-07-24 06:42:19,298 ERROR [inventory-connector|task-0] Producer failure (io.debezium.pipeline.ErrorHandler) [blc-{}:3306] │
│ io.debezium.DebeziumException: Could not find first log file name in binary log index file Error code: 1236; SQLSTATE: HY000.
이런게 나왔다면 당첨이다. mysql-bin-changelog.400734가 현재 없으나 Debezium에서 읽고 싶어하는 경우가 에러의 원인의 다수이다. 외에도 MySQL version가 업데이트된 경우도 있을 수 있다.
우선 전자의 경우 해결법이다.
SHOW VARIABLES LIKE 'log_bin';
먼저 얘가 ON 으로 되어있는지 확인한다.
다음으로
mysql> SHOW BINARY LOGS;
+----------------------------+-----------+-----------+
| Log_name | File_size | Encrypted |
+----------------------------+-----------+-----------+
| mysql-bin-changelog.400803 | 2436 | No |
| mysql-bin-changelog.400804 | 2436 | No |
| mysql-bin-changelog.400805 | 891 | No |
+----------------------------+-----------+-----------+
현재 Debezium에서 읽는 버젼과 현재 최신 버젼의 차이를 보자.
추가적으로 expire_logs_days, binlog_expire_logs_seconds 같은 global variable 들에 문제가 있을 수 있으나, 변경한 적이 없고 default로 되어있다면 문제될건 없을 것이다.
이제 Kafka, Debezium에서 읽고자 하는 offset을 강제로 변경해줘야 하는데, 다음의 절차를 밟는다.
apiVersion: v1
kind: Pod
metadata:
name: kafka-client
namespace: default
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:latest
command:
- sleep
- "3600"
일단 얘를 띄워주는데, Kafka sh 툴들을 사용할 수 있으면 생략해도 된다.
이후
kubectl exec -it kafka-client -n default -- /bin/sh
cd /usr/bin
./kafka-consumer-groups --bootstrap-server my-cluster-kafka-bootstrap.strimzi-debezium.svc.cluster.local:9092 --group inventory-connector --reset-offsets --to-latest --all-topics --execute
이후 얘를 실행해주면 되는데, 모든 topic 의 offset을 가장 최신으로 되돌려버린다. 물론 그 사이의 데이터베이스 변화 감지는 전혀 하지 못하고 다 날라가기 때문에, 주의해서 사용해야 한다.
다음은 결과들이다.
kubectl exec -it my-cluster-kafka-0 -n strimzi-debezium -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic {토픽명} --from-beginning
를 실행하면, kafka 가 produce하고 있는 message들이 나온다.
'kafka' 카테고리의 다른 글
Kafka 간단한 메시지 교환 (1) | 2023.11.27 |
---|---|
Kafka 소개 (1) | 2023.11.26 |