빅토리 코딩
article thumbnail
728x90
반응형
CentOS7
Kafka 2.12-3.4.0
jdk 11

Kafka 설치

kafka 설치는 공식 홈페이지 https://kafka.apache.org/downloads 에서 받을 수 있다.

kafka는 zookeeper가 있어야 돌아가므로 zookeeper먼저 실행후 kafka를 실행 시킨다.

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

이다. 기본포트는 zookeeper는  2181이고 kafka는 9092 이다. 포트는 각 설정 properties에서 할 수 있다.  error가 없으면 기동 한것이다. 

나는 나중에 kafka와 spring boot로 연결하여 사용하기때문에 kafka 명령어로 말고 spring boot로 해보겠다.

SpringBoot 설정

spring boot에서 설정해서 groupId와 topic을 새롭게 설정해두면 자동으로 kafka에 추가가 된다.

pom.xml (의존성추가)

...
	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
	</dependency>
...

application.yml (configuration)

spring:
  kafka:
    bootstrap-servers:
      - 192.168.0.103:9092
    consumer:
      # 새로운 consumer의 topic에대한  offset 설정
      # latest(가장 최근에 생산된 메시지), earliest(가장 오래된 메시지), none(offset 정보가 없으면 Exception)
      auto-offset-reset: earliest
      # JSON 데이터를 역직렬화
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # JSON 데이터를 역직렬화
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

ConsumerService

consumerService

@Slf4j
@Service
public class KafkaconsumerService {
	//
    @KafkaListener(topics = "test", groupId ="group1")
    public void consume(String message) throws IOException {
        log.info(String.format("Consumed message : %s", message));
    }

}

test라는 topic으로 들어오는 consumerService를 추가 (groupId는 테스트를 위해 추가)

ProducerService

ProducerController

@Slf4j
@RestController
public class KafakaproducerController {

	@Autowired
	KafkaProducerService kafkaProducerService;

	@PostMapping("/messageSend")
	public String sendMessage(@RequestBody ProducerVo producerVo) {

		log.info("ProducerVo : " + producerVo);
		kafkaProducerService.sendMessage(producerVo.getTopic(), producerVo.getMessage());

		return "Send message";
	}

}

ProducerVo

@Data
public class ProducerVo {

	@NotNull
	@Schema(name = "topic", description = "kafk topic")
	String topic;

	@Schema(name = "message", description = "kafk message")
	@NotNull
	String message;
}

 

ProducerService

@Slf4j
@Service
public class KafkaProducerService {

	@Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topicName, String message) {
        log.info(String.format("Produce message : %s", message));
        this.kafkaTemplate.send(topicName, message);
    }

}

이제 /messageSend를 통해서 "test"라는 topic에 message를 보내본다.

send 했을때 

consumer로 정상적으로 들어오는것을 알수 있다. 이제 Group을 설정 했을때 어떤식으로 넘어오는지 볼것이다. "test" 라는topic을 Subscriber하는 group2를 콘솔로 만들어보겠다. 

테스트로 여러게 보냈던것이 application.yml에 설정 했던 auto-offset-reset을 earliest로 설정했기에 오래된 메시지순으로 들어 오는것을 볼 수 있다.  이 상태에서 새로 message를 send 하면 group1, group2에 각각 message가 들어오는 것을 볼 수 있다.  

group1을 콘솔에서 2개를 했을때 한곳에만 message가 들어오는것을 볼 수 있었고

group1를 하나만 띄어논 상태에서 message를 보내고 group1를 하나더 띄었을때는 아무 message가 안오는걸 볼 수 있다.

즉, groupID가 같은 group들은 서로 offset을 공유하기 때문에 하나의 consumer에만 message가 오는것을 알 수 있다.

https://github.com/victory940209/kafkaSpringboot

 

GitHub - victory940209/kafkaSpringboot

Contribute to victory940209/kafkaSpringboot development by creating an account on GitHub.

github.com

 

728x90
반응형

'Open Source > messageQueue' 카테고리의 다른 글

Kafka (zookeeper) 아키텍처 [1]  (0) 2023.03.30
profile

빅토리 코딩

@빅토리 코딩

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!

검색 태그