김태오

Kafka 간단한 메시지 교환 본문

kafka

Kafka 간단한 메시지 교환

ystc1247 2023. 11. 27. 05:04

우선 kafka 설치이다.

brew install kafka

이후

zkServer start //zooKeeper 시작
kafka-server-start /opt/homebrew/etc/kafka/server.properties //kafka 시작 - brew 로 설치하지 않았다면 /usr/local/~ 에 있음

 

다음으로 간단하게 topic 을 생성해보자.

 

kafka-topics --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

 

test 라는 이름의 topic 을 kafka server (localhost:9092) 에 생성한다.

 

replication factor 는 가지고 있을 복사본의 개수를 의미한다. 여기서는 partition 당 복사본의 개수가 1이며, production 환경으로 가게 되면 fault tolerance 를 위해 3 이상의 replication factor 가 권장된다.

 

partitions 는 간단히 partition 의 개수를 의미한다. 이 개수가 늘어나면 topic log 가 parallel 하게 data 가 분산되어 fault tolerance 와 scalability 가 증가한다.

 

다음으로 CLI 에서 producer 와 consumer 를 간단히 테스트해본다.

 

kafka-console-producer --broker-list localhost:9092 --topic test

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

 

다음으로 Spring Boot Application 에 integration 을 시도해본다.

 

implementation 'org.springframework.kafka:spring-kafka'

 

우선 build.gradle 에 dependency 를 추가한다.

 

다음으로 topic 을 생성하는 admin config 인데, 이를 kafka CLI 에서도 할 수 있지만 스프링부트 안에서도 가능하다.

 

@Configuration
public class KafkaTopicConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("baeldung", 1, (short) 1);
    }
}

 

매번 @Value annotation 사용할 때마다 실수하는데 intelliJ 사용시 annotation processing 을 켜줘야 한다.

 

일단 KafkaAdmin Bean 을 생성해준다. configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress) 는 configuration map 에 bootstrap Address 를 세팅해준다.

 

아래 NewTopic 함수는 이전 CLI 에서 topic 을 생성했듯 프로그래밍을 통해 topic 을 생성하는데,

kafka-topics --create --topic baeldung --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

 

에 상응하는 topic 생성 함수가 되겠다.

 

다음으로는 producer 와 consumer service layer 다.

 

@Component
public class KafkaConsumerService {

    @KafkaListener(topics = "test", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message in group 'my-group': " + message);
    }
}

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

 

지원되는 함수들 쓰는거라 딱히 어려울건 없어보인다. 

'kafka' 카테고리의 다른 글

Strimzi를 사용하여 Kafka, Debezium을 쿠버네티스에 띄우기  (4) 2024.07.24
Kafka 소개  (1) 2023.11.26