CentOS7
Kafka 2.12-3.4.0
jdk 11
Kafka 설치
kafka 설치는 공식 홈페이지 https://kafka.apache.org/downloads 에서 받을 수 있다.
kafka는 zookeeper가 있어야 돌아가므로 zookeeper먼저 실행후 kafka를 실행 시킨다.
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
이다. 기본포트는 zookeeper는 2181이고 kafka는 9092 이다. 포트는 각 설정 properties에서 할 수 있다. error가 없으면 기동 한것이다.
나는 나중에 kafka와 spring boot로 연결하여 사용하기때문에 kafka 명령어로 말고 spring boot로 해보겠다.
SpringBoot 설정
spring boot에서 설정해서 groupId와 topic을 새롭게 설정해두면 자동으로 kafka에 추가가 된다.
pom.xml (의존성추가)
...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
...
application.yml (configuration)
spring:
kafka:
bootstrap-servers:
- 192.168.0.103:9092
consumer:
# 새로운 consumer의 topic에대한 offset 설정
# latest(가장 최근에 생산된 메시지), earliest(가장 오래된 메시지), none(offset 정보가 없으면 Exception)
auto-offset-reset: earliest
# JSON 데이터를 역직렬화
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# JSON 데이터를 역직렬화
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
ConsumerService
consumerService
@Slf4j
@Service
public class KafkaconsumerService {
//
@KafkaListener(topics = "test", groupId ="group1")
public void consume(String message) throws IOException {
log.info(String.format("Consumed message : %s", message));
}
}
test라는 topic으로 들어오는 consumerService를 추가 (groupId는 테스트를 위해 추가)
ProducerService
ProducerController
@Slf4j
@RestController
public class KafakaproducerController {
@Autowired
KafkaProducerService kafkaProducerService;
@PostMapping("/messageSend")
public String sendMessage(@RequestBody ProducerVo producerVo) {
log.info("ProducerVo : " + producerVo);
kafkaProducerService.sendMessage(producerVo.getTopic(), producerVo.getMessage());
return "Send message";
}
}
ProducerVo
@Data
public class ProducerVo {
@NotNull
@Schema(name = "topic", description = "kafk topic")
String topic;
@Schema(name = "message", description = "kafk message")
@NotNull
String message;
}
ProducerService
@Slf4j
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topicName, String message) {
log.info(String.format("Produce message : %s", message));
this.kafkaTemplate.send(topicName, message);
}
}
이제 /messageSend를 통해서 "test"라는 topic에 message를 보내본다.
send 했을때
consumer로 정상적으로 들어오는것을 알수 있다. 이제 Group을 설정 했을때 어떤식으로 넘어오는지 볼것이다. "test" 라는topic을 Subscriber하는 group2를 콘솔로 만들어보겠다.
테스트로 여러게 보냈던것이 application.yml에 설정 했던 auto-offset-reset을 earliest로 설정했기에 오래된 메시지순으로 들어 오는것을 볼 수 있다. 이 상태에서 새로 message를 send 하면 group1, group2에 각각 message가 들어오는 것을 볼 수 있다.
group1을 콘솔에서 2개를 했을때 한곳에만 message가 들어오는것을 볼 수 있었고
group1를 하나만 띄어논 상태에서 message를 보내고 group1를 하나더 띄었을때는 아무 message가 안오는걸 볼 수 있다.
즉, groupID가 같은 group들은 서로 offset을 공유하기 때문에 하나의 consumer에만 message가 오는것을 알 수 있다.
https://github.com/victory940209/kafkaSpringboot
'Open Source > messageQueue' 카테고리의 다른 글
Kafka (zookeeper) 아키텍처 [1] (0) | 2023.03.30 |
---|