컨슈머 API
카프카에서 데이터를 넣어주는 프로듀서가 있으니 당연히 데이터를 사용하는 컨슈머가 있다.
컨슈머에 대해서 알아보자.
컨슈머 중요 개념
컨슈머를 운영하는 방법은 크게 2가지가 있다.
- 토픽의 특정 파티션을 구독하는 컨슈머를 운영하는 방식
- 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 방식
컨슈머 그룹으로 운영하는 경우를 좀 더 자세히 살펴보자.
1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 방식
컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있게 도와주는 방식이다.
컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다.
컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 파티션은 오직 하나의 컨슈머만 사용할 수 있다.
이는 같은 그룹에서 동일한 데이터를 중복해서 처리하는 것을 막기 위한 것이다.
그룹이 다른 컨슈머는 중복해서 파티션을 구독할 수 있다.
컨슈머에서 장애가 발생하면?
컨슈머에 장애가 발생하면 카프카는 리밸런싱
을 수행한다. 장애가 발생한 컨슈머에 할당된 파티션을 다른 컨슈머들에게
할당을 하는 과정을 리밸런싱
이라고 한다.
리벨런싱
은 가용성을 높여주는 동작이지만 실행 과정에서 데이터를 소비할 수 없기 때문에 처리량에서 손해가 있다.
따라서 이 과정은 자주 실행되면 좋지 않다.
컨슈머가 파티션에서 데이터를 가져가는 메커니즘
컨슈머는 파티션에서 데이터를 가져오는 poll
을 실행하면 컨슈머는 브로커의 내부토픽(_consumer_offset)에 오프셋을 기록하는 commit
을 날린다.
브로커는 내부 토픽의 값을 보고 컨슈머가 얼마나 데이터를 소비했는지를 알 수 있어 중복된 데이터를 전송하거나 데이터를 유실하지 않도록 동작한다.
자동 커밋
브로커에서 데이터를 안정적으로 보내는 메커니즘이 있지만 컨슈머 입장에서 데이터 중복과 유실은 다른 문제이다.
컨슈머가 poll
과 commit
을 어떻게 사용하느냐에 따라서 데이터 중복과 유실이 생길 수 있다.
자동 커밋은 poll
수행 시 일정 간격마다 오프셋을 commit
하도록 하는 비동기적인 방식이다. 비동기적인 방식은 중복과 유실이 발생할 수 있으니 조심해서 사용해야 한다.
예제
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
var configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // default 가 true
var consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
var records = consumer.poll(Duration.ofSeconds(1));
for(var record : records){
logger.info("{}", record);
}
}
}
}
명시적 동기 커밋
컨슈머는 명시적으로 commit
을 줄 수 있다. 데이터가 처리된 이후에 커밋을 날리면 중복과 유실을 막을 수 있다.
하지만 비동기적으로 동작하는 것보다는 처리량 면에서는 아쉽다.
예제
public class SyncCommitConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
var configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
var consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
var records = consumer.poll(Duration.ofSeconds(1));
for(var record : records){
logger.info("{}", record);
}
consumer.commitSync();
}
}
}
우아한 종료
컨슈머가 장애로 고장 나면 일정 시간 동안 데이터를 처리하지 못해서 랙이 늘어난다.
따라서 컨슈머가 장애로 종료되면 종료된 사실을 브로커에게 알려주는 예외처리를 구현하는 것이 좋다.
예제
public class ShutdownHookSyncCommitConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
private static KafkaConsumer<String, String> consumer;
public static void main(String[] args) {
var configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try {
consumer = new KafkaConsumer<>(configs);
consumer.subscribe(List.of(TOPIC_NAME));
while (true) {
var records = consumer.poll(Duration.ofSeconds(1));
for (var record : records) {
logger.info("{}", record);
}
consumer.commitSync();
}
} catch (WakeupException e) {
logger.warn("Wakeup consumer");
} finally {
consumer.close();
}
}
static class ShutdownThead extends Thread {
@Override
public void run() {
logger.info("Shutdown hook");
consumer.wakeup();
}
}
}
데이터 중복, 유실?
- 데이터 중복 시나리오 : 커밋 실패 + 데이터 처리 성공
데이터 처리를 했는데 커밋이 실패하면 다음 poll
과정에서 중복된 데이터를 가져올 수 있다.
- 데이터 유실 시나리오 : 커밋 성공 + 데이터 처리 실패
커밋이 성공했지만 데이터 처리가 실패하면 실패한 데이터를 다시 받아서 처리해야 하는데 브로커는 처리했다고 알고 있기 때문에 데이터가 유실된다.
REFERENCE
아파치 카프카 애플리케이션 프로그래밍 with 자바
'devops > kafka' 카테고리의 다른 글
4장 카프카 상세 개념 설명 (0) | 2023.10.31 |
---|---|
3장 카프카 스트림즈 (0) | 2023.10.23 |
3장 카프카 기본 개념 설명 - 브로커, 프로듀서 (1) | 2023.10.17 |
2장 카프카 빠르게 시작해보기 (카프카 커맨드 라인 ) (0) | 2023.10.16 |
2장 카프카 빠르게 시작해보기 (실습용 카프카 브로커 설치) (0) | 2023.10.15 |