본문 바로가기

devops/kafka

3장 카프카 스트림즈

카프카 스트림즈는 토픽에 적재된 데이터를 상태기반 또는 비상태기반으로 변환하여 다른 토픽에 적재하는 라이브러리이다.

  • 장점
  1. 카프카에서 공식적으로 지원하여 호환성이 매우 뛰어남
  2. 장애 허용 시스템을 가져 장애에 강함
  • 테스크

스트림즈 애플리케이션을 실행하면 생기는 데이터 처리 최소 단위

  • 토폴로지

2개 이상의 노드들과 선으로 이루어진 집합

  • 프로세서
    소스 프로세서 : 데이터를 처리하기 위한 노드
    스트림 프로세서 : 다른 프로세서가 반환한 데이터를 처리하는 역할을 하는 노드
    싱크 프로세서 : 데이터를 특정 카프카 토픽으로 저장하는 역할을 하는 노드

스트림즈 DSL

KStream

레코드의 흐름을 표현한 것으로 메시지 키와 메시지 값으로 구성

KTable

유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용한 스트림

GlobalKTable

KTable과 동일하게 메시지 키를 기준으로 묶어서 사용하지만 KTable로 선언된 토픽은 1개 파티션이 1개 태스크에 할당되어 사용되고
GlobalDTable로 선언된 토픽은 모든 파티션 데이터가 가 태스크에 할당되어 사용된다는 차이가 있다.

코파티셔닝

조인을 하는 2개의 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업

리파티셔닝

새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정

조인

KTable과 KStream을 메시지 키를 기준으로 조인할 수 있다. 하지만 코파티셔닝이 되지 않으면 TopologyException이 발생한다.

GlobalKTable과 KStream은 코파티셔닝이 되지 않아도 조인이 가능하다.
하지만 정의된 모든 데이터를 저장하고 사용하는 GlobalKTable은 로컬 스토리지의 사용량이 증가하고 네트워크, 브로커에 부하가 생기므로 주의해야 한다.

코드 실습

책 예제 코드

 

GitHub - bjpublic/apache-kafka-with-java: 아파치 카프카 애플리케이션 프로그래밍 with 자바

아파치 카프카 애플리케이션 프로그래밍 with 자바. Contribute to bjpublic/apache-kafka-with-java development by creating an account on GitHub.

github.com

실습한 코드

 

GitHub - yudonggeun/kafka_practice

Contribute to yudonggeun/kafka_practice development by creating an account on GitHub.

github.com


프로세서 API

토폴로지를 기준으로 데이터를 처리한다는 관점에서는 스트림즈 DSL과 동일한 역할을 한다.
추가적인 상세 로직의 구현이 필요하다면 프로세서 API를 활용할 수 있다.

  • processor 구현
public class FilterProcessor implements Processor<String, String, String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
    }

    @Override
    public void process(Record<String, String> record) {
        if (record.value().length() > 5){
            context.forward(record);
        }
        context.commit();
    }

    @Override
    public void close() {
    }
}
  • 프로세스 사용하기
public class SimpleKafkaProcessor {

    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String APPLICATION_NAME = "processor-application";
    private final static String STREAM_LOG = "test";
    private final static String STREAM_LOG_COPY = "stream_log_copy";

    public static void main(String[] args) {
        var props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                Serdes.String().getClass());

        var topology = new Topology();

        topology.addSource("Source", STREAM_LOG)
                .addProcessor("Process",
                        () -> new FilterProcessor(),
                        "Source")
                .addSink("Sink",
                        STREAM_LOG_COPY,
                        "Process");

        var streaming = new KafkaStreams(topology, props);
        streaming.start();
    }
}