일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- Leaderboard
- zset
- Strimzi
- Helm
- 스프링부트
- Grafana
- yml
- logback
- minreplica
- traceId
- Kafka
- MSSQL
- 동등성
- hammerDB
- eks
- Benchmarks
- docket
- SW 마에스트로
- blue-green
- propogation
- Debezium
- Salting
- 0 replica
- keda
- spring boot
- Database
- Kubernetes
- slow query
- Software maestro
- SW Maestro
- Today
- Total
김태오
Kafka 간단한 메시지 교환 본문
우선 kafka 설치이다.
brew install kafka
이후
zkServer start //zooKeeper 시작
kafka-server-start /opt/homebrew/etc/kafka/server.properties //kafka 시작 - brew 로 설치하지 않았다면 /usr/local/~ 에 있음
다음으로 간단하게 topic 을 생성해보자.
kafka-topics --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
test 라는 이름의 topic 을 kafka server (localhost:9092) 에 생성한다.
replication factor 는 가지고 있을 복사본의 개수를 의미한다. 여기서는 partition 당 복사본의 개수가 1이며, production 환경으로 가게 되면 fault tolerance 를 위해 3 이상의 replication factor 가 권장된다.
partitions 는 간단히 partition 의 개수를 의미한다. 이 개수가 늘어나면 topic log 가 parallel 하게 data 가 분산되어 fault tolerance 와 scalability 가 증가한다.
다음으로 CLI 에서 producer 와 consumer 를 간단히 테스트해본다.
kafka-console-producer --broker-list localhost:9092 --topic test
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
다음으로 Spring Boot Application 에 integration 을 시도해본다.
implementation 'org.springframework.kafka:spring-kafka'
우선 build.gradle 에 dependency 를 추가한다.
다음으로 topic 을 생성하는 admin config 인데, 이를 kafka CLI 에서도 할 수 있지만 스프링부트 안에서도 가능하다.
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("baeldung", 1, (short) 1);
}
}
매번 @Value annotation 사용할 때마다 실수하는데 intelliJ 사용시 annotation processing 을 켜줘야 한다.
일단 KafkaAdmin Bean 을 생성해준다. configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress) 는 configuration map 에 bootstrap Address 를 세팅해준다.
아래 NewTopic 함수는 이전 CLI 에서 topic 을 생성했듯 프로그래밍을 통해 topic 을 생성하는데,
kafka-topics --create --topic baeldung --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
에 상응하는 topic 생성 함수가 되겠다.
다음으로는 producer 와 consumer service layer 다.
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "test", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message in group 'my-group': " + message);
}
}
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
지원되는 함수들 쓰는거라 딱히 어려울건 없어보인다.
'kafka' 카테고리의 다른 글
Strimzi를 사용하여 Kafka, Debezium을 쿠버네티스에 띄우기 (4) | 2024.07.24 |
---|---|
Kafka 소개 (1) | 2023.11.26 |