[Spring, Kafka] Spring Kafka Consumer 사용시 오프셋 관리

[Spring, Kafka] Spring Kafka Consumer 사용시 오프셋 관리

Spring Kafka를 이용하여 대용량 데이터를 consuming 하고있는 어플리케이션을 제작하고 있었는데 어플리케이션 오류 등 여러 이유로 consumer가 죽거나 할때 재시작을 해줘야 하는 경우가 있었다.

재시작시 consumer offset이 유지될 경우 엄청나게 밀린 LAG을 처리하기에 기능이 LAG이 해소되는동안 정상동작 못하는 이슈가 있어서 해결이 필요했다.

Spring Kafka 에 ConsumerSeekAware 인터페이스가 존재하는데, 다음과 같은 기능들이 정의되어있다.

ConsumerSeekAware에 등록된 메소드들

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerSeekAware.html

내 케이스의 경우 Spring Application이 시작되며 KafkaConsumer가 파티션에 assign될때 offset을 최후방으로 지정하는것이 필요하였고, 실제로 아래 코드와 같이 작성하였을 경우 각 파티션의 마지막 offset으로 지정되어, Kafka Lag이 바로 해소된 것이 확인 가능하였다.

@Component public class KafkaConsumer implements ConsumerSeekAware { ... @Override public void onPartitionsAssigned(Map assignments, ConsumerSeekAware.ConsumerSeekCallback callback) { assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition())); } }

from http://mixify.tistory.com/8 by ccl(A) rewrite - 2021-12-30 21:27:13