본문 바로가기

devops/kafka

3장 카프카 기본 개념 설명 - 브로커, 프로듀서

카프카 브로커, 클러스터 주키퍼

데이터 복제, 싱크

장애 발생시에도 데이터를 유실하지 않고 안전하게 사용하기 위해서 데이터 복제(싱크)를 실행한다.

카프카의 데이터 복제는 파티션 단위로 이루어지며 복제된 파티션은 리터팔로워로 구성된다.
팔로워의 파티션은 리더 파티션으로부터 데이터를 가져와서 자신의 파티션에 저장한다.
복제 개수만큼 저장 용량이 증가한다는 단점을 가지고 있다.
리더 파티션에 장애가 발생하면 다른 팔로워 파티션이 그 기능을 위임받는다.

컨트롤러

클러스터의 다수 브로커 중 한 대가 컨트롤러의 역할을 한다. 컨트롤러는 다른 브로커들의 상태를 체크하고
브로커가 클러스터에서 빠지는 경우 해당 브로커에 존재하는 리더 파티션을 재분배한다.

데이터 삭제

카프카는 다른 메시징 플랫폼과 다르게 컨슈머가 데이터를 가져가더라도 토픽의 데이터는 삭제되지 않는다.
오직 브로커만이 데이터를 삭제할 수 있다.
데이터 삭제는 파일 단위로 이루어지는데 이 단위를 로그 세그먼트라고 부른다. 이 세그먼트에는 다수의 데이터가 들어 있기 때문에 일반적인 데이터베이스처럼 특정 데이터를 선별해서 삭제할 수 없다.

닫힌 세그먼트 파일은 log.retention.bytes 또는 log.retention.ms 옵션에 설정값이 넘으면 삭제된다.

카프카는 데이터를 삭제하지 않고 메시지 키를 기준으로 오래된 데이터를 압축하는 정책(토픽 압축 정책)을 가져갈 수 있다.

코디네이터

브로커 중 하나가 코디네이터 역할을 수행한다.
코디네이터는 컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭되도록 분배하는 역할을 한다.
파티션을 컨슈머로 재할당하는 과정을 수행한다. 즉, 리벨런스를 수행한다.

프로듀서 API

gradle 기반 자바 애플리케이션에서 카프카를 사용하기 위해서 다음 의존성을 추가한다.

implementation 'org.apache.kafka:kafka-clients:3.5.1'
implementation 'org.slf4j:slf4j-simple:1.7.30'

카프카의 CLI 커맨드에서 사용했던 옵션들을 자바 코드로 구현해서 카프카에 데이터를 입력한다.

public class SimpleProducer {

    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        var configs = new Properties();

        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        try (var producer = new KafkaProducer<String, String>(configs)) {

            String messageValue = "testMessage";
            var nonKeyRecord = new ProducerRecord<String, String>(TOPIC_NAME, messageValue);
            var keyRecord = new ProducerRecord<>(TOPIC_NAME, "pangyo", "this key value");
            var recordKeyAndPartition = new ProducerRecord<>(TOPIC_NAME, 0, "key1", "this value");

            producer.send(nonKeyRecord);
            producer.send(recordKeyAndPartition);
            producer.send(keyRecord);

            producer.flush();
        }
    }
}

카프카 API에 대한 자세한 스펙은 kafka docs에서 찾아볼 수 있다.
자바를 이용한 카프카 API example


카프카 파티션 정책

카프카의 파티션 정책은 UniformStickyPartitionerRoundRobinPartitioner가 있는데 2.4.0 보다 최신 버전에서는 UniformStickyPartitioner
기본정책으로 사용한다.
데이터가 배치로 모두 묶일 때까지 기다렸다가 전송하기 때문에 라운드 로빈 방식보다 UniformStickyPartitioner방식이 성능면에서 이점을 가지기 때문에 변경되었다.

만약 커스텀 파티션 정책을 사용하고 싶다면 Partitioner인터페이스를 구현한 이후 정책을 반영시킬 수 있다.

카프카 옵션

  • 필수 옵션
option describe
bootstrap.servers 카프카 클러스터에 속한 브로커의 호스트 이름, 2개 이상 작성하여 이슈 방지 가능
key.serializer 레코드의 메시지 키를 직렬화하는 클래스
value.serializer 레코드의 메시지 값을 직렬화하는 클래스
  • 선택 옵션
option describe
acks 프로듀서가 전송한 데이터가 정상적으로 전송되었는지 성공여부 확인하는 옵션, 0(not check), 1(one check), -1(all check) 설정가능
buffer.memory 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리 크기 (default : 32MB)
retries 에러시 재전송 시도 횟수 (default : Integer.MAX_VALUE)
batch.size 배치로 전송할 레코드의 최대 용량
linger.ms 배치로 전송하기 전까지 기다리는 최소 시간 (default : 0s)
partitioner.class 레코드를 전송할 때 적용하는 파티셔너 클래스
enable.idempotence 멱등성 프로듀서로 동작할지 여부 (default : false)
transactional.id 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정

REFERENCE

아파치 카프카 애플리케이션 프로그래밍 with 자바