본문 바로가기

devops/kafka

3장 카프카 컨슈머

컨슈머 API

카프카에서 데이터를 넣어주는 프로듀서가 있으니 당연히 데이터를 사용하는 컨슈머가 있다.
컨슈머에 대해서 알아보자.

컨슈머 중요 개념

컨슈머를 운영하는 방법은 크게 2가지가 있다.

  1. 토픽의 특정 파티션을 구독하는 컨슈머를 운영하는 방식
  2. 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 방식

컨슈머 그룹으로 운영하는 경우를 좀 더 자세히 살펴보자.

1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 방식

컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있게 도와주는 방식이다.

컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다.

컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 파티션은 오직 하나의 컨슈머만 사용할 수 있다.
이는 같은 그룹에서 동일한 데이터를 중복해서 처리하는 것을 막기 위한 것이다.

그룹이 다른 컨슈머는 중복해서 파티션을 구독할 수 있다.

컨슈머에서 장애가 발생하면?

컨슈머에 장애가 발생하면 카프카는 리밸런싱을 수행한다. 장애가 발생한 컨슈머에 할당된 파티션을 다른 컨슈머들에게
할당을 하는 과정을 리밸런싱이라고 한다.

리벨런싱은 가용성을 높여주는 동작이지만 실행 과정에서 데이터를 소비할 수 없기 때문에 처리량에서 손해가 있다.
따라서 이 과정은 자주 실행되면 좋지 않다.

컨슈머가 파티션에서 데이터를 가져가는 메커니즘

컨슈머는 파티션에서 데이터를 가져오는 poll을 실행하면 컨슈머는 브로커의 내부토픽(_consumer_offset)에 오프셋을 기록하는 commit을 날린다.

브로커는 내부 토픽의 값을 보고 컨슈머가 얼마나 데이터를 소비했는지를 알 수 있어 중복된 데이터를 전송하거나 데이터를 유실하지 않도록 동작한다.

자동 커밋

브로커에서 데이터를 안정적으로 보내는 메커니즘이 있지만 컨슈머 입장에서 데이터 중복과 유실은 다른 문제이다.

컨슈머가 pollcommit을 어떻게 사용하느냐에 따라서 데이터 중복과 유실이 생길 수 있다.

자동 커밋은 poll 수행 시 일정 간격마다 오프셋을 commit하도록 하는 비동기적인 방식이다. 비동기적인 방식은 중복과 유실이 발생할 수 있으니 조심해서 사용해야 한다.

예제
public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        var configs = new Properties();

        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // default 가 true

        var consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            var records = consumer.poll(Duration.ofSeconds(1));

            for(var record : records){
                logger.info("{}", record);
            }
        }
    }
}
명시적 동기 커밋

컨슈머는 명시적으로 commit을 줄 수 있다. 데이터가 처리된 이후에 커밋을 날리면 중복과 유실을 막을 수 있다.

하지만 비동기적으로 동작하는 것보다는 처리량 면에서는 아쉽다.

예제
public class SyncCommitConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        var configs = new Properties();

        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        var consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            var records = consumer.poll(Duration.ofSeconds(1));

            for(var record : records){
                logger.info("{}", record);
            }
            consumer.commitSync();
        }
    }
}
우아한 종료

컨슈머가 장애로 고장 나면 일정 시간 동안 데이터를 처리하지 못해서 랙이 늘어난다.

따라서 컨슈머가 장애로 종료되면 종료된 사실을 브로커에게 알려주는 예외처리를 구현하는 것이 좋다.

예제
public class ShutdownHookSyncCommitConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-group";
    private static KafkaConsumer<String, String> consumer;

    public static void main(String[] args) {
        var configs = new Properties();

        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        try {
            consumer = new KafkaConsumer<>(configs);
            consumer.subscribe(List.of(TOPIC_NAME));

            while (true) {
                var records = consumer.poll(Duration.ofSeconds(1));

                for (var record : records) {
                    logger.info("{}", record);
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            logger.warn("Wakeup consumer");
        } finally {
            consumer.close();
        }
    }

    static class ShutdownThead extends Thread {
        @Override
        public void run() {
            logger.info("Shutdown hook");
            consumer.wakeup();
        }
    }
}
데이터 중복, 유실?
  1. 데이터 중복 시나리오 : 커밋 실패 + 데이터 처리 성공

데이터 처리를 했는데 커밋이 실패하면 다음 poll 과정에서 중복된 데이터를 가져올 수 있다.

  1. 데이터 유실 시나리오 : 커밋 성공 + 데이터 처리 실패

커밋이 성공했지만 데이터 처리가 실패하면 실패한 데이터를 다시 받아서 처리해야 하는데 브로커는 처리했다고 알고 있기 때문에 데이터가 유실된다.

REFERENCE

아파치 카프카 애플리케이션 프로그래밍 with 자바