본문 바로가기

devops/kafka

2장 카프카 빠르게 시작해보기 (실습용 카프카 브로커 설치)

들어가면서

책에서는 AWS EC2에 카프카를 설치하고 운영하는 예제를 보여주지만 AWS 계정에 다른 서버를 구동하고 있기도 하고
그냥 로컬에서 외부 환경에 제약 없이 자유롭게 실습을 하고 싶어서 Docker를 사용해서 실습환경을 구축하기로 결정했다.

2.1 실습용 카프카 설치

  • AWS 환경 설정을 도커 설치로 변경

2.1.1 도커 데스크톱 설치

도커를 그냥 사용해도 괜찮지만 GUI로 변하게 컨테이너를 관리하기 위해서 도커 데스크톱을 설치한다.


2.1.2 인스턴스에 접속하기

아직 컨테이너를 생성하지 않았기 때문에 PASS!!

하지만 생성된 컨테이너를 도커 데스크톱을 사용하면 쉽게 컨테이너에 접속할 수 있다.
도커 CLI도 docker exec 명령을 통해서 접근 가능!!


2.1.3 인스턴스에 자바 설치하기

도커 이미지를 통해서 환경을 구성한다면 java 의존성은 자동으로 해결~~!


2.1.4 주키퍼/카프카 브로커 실행

카프카 설치 및 주키퍼 설치

책에서는 바이너리 패키지를 다운 받아서 카프카를 실행하는 방법을 설명하지만 Docker Hub에 kafka 이미지가 존재한다.

  • kafka image 다운로드 실행
  • docker pull confluentinc/cp-kafka
  • zookeeper 역시 이미지 다운로드 실행
  • docker pull confluentinc/cp-zookeeper

카프카 브로커 힙 메모리 설정

카프카 환경 변수로 힙 메모리를 설정하기 때문에 도커 컴포즈의 환경변수 argument를 사용했다.

카프카 힙 메모리 사이즈 설정 xms 힙 메모리 사이즈 최소 크기, xmx 최대 크기

services:
  kafka:
    image: confluentinc/cp-kafka
    environment:
      KAFKA_HEAP_OPTS: "-Xmx400m -Xms400m"

카프카 브로커 실행 옵션 설정

책에서는 패키지를 다운로드하여서 진행한다. 그래서 패키지의 server.properties 파일을 수정하여 설정을 해주지만
도커 이미지를 활용한다면 모든 설정을 환경 변수를 통해서 진행하면 된다.

환경 변수의 이름 규칙은 카프카 설정 파일의 변수명을 변형한 것이다. 이 내용은 confluent docs의 기제된 문서를 보고 알게 되었다.

  • 예시 : broker.id > KAFKA_BROKER_ID

이 규칙에 따라서 책에서 나온 설정을 적용하면 이렇다.

services:
  kafka:
    image: confluentinc/cp-kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_HEAP_OPTS: "-Xmx400m -Xms400m"  
      KAFKA_BROKER_ID: 0 
      KAFKA_NUM_NETWORK_THREADS: 3
      KAFKA_NUM_IO_THREADS: 8

      KAFKA_LOG_DIRS: /Users/dvwy/Documents/test/kafka_2_12-2_5_0/data
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
      KAFKA_ OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TaaaaaaRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 18000
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

주키퍼 실행

주키퍼는 특별한 설정이 없이 설정 파일 지정 및 데몬 프로세스 실행을 적용했기 때문에 이미지를 가이드에 따라서 실행하면 되었다.

    ...
  zookeeper:
    image: confluentinc/cp-zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - 2181:2181

카프카 브로커 실행 및 로그 확인

카프카의 로그를 파일에 저장하는 것이 아니라 표준입출력을 사용한다. 도커를 사용하기 때문에 이러한 방식으로
로그 관리를 구현한 것으로 보인다. 따라서 책과 다르게 docker log 명령이나 도커 데스크톱의 log 탭을 통해서
실행 및 로그를 확인할 수 있다.


2.1.5 로컬 컴퓨터에서 카프카와 통신확인

카프카의 내부적인 네트워크 처리 방식을 아직 잘 모르기 때문에 책의 환경과 유사하게 구성했다.
카프카와 주키퍼를 모든 포트에서 자유롭게 접근할 수 있는 컨테이너에 로컬에 설치해서 사용하는 카프카 바이너리 패키지를 다운로드하여서 예제와 동일하게 수행하기로 결정했다.

구현 방식은 실습을 위한 네트워크를 만들어서 통신하고 추후에 클러스터링에도 대비할 수 있고 서비스 이름으로 hostname을 설정했다.

  • CLI 전용 컨테이너 Dockerfile
FROM adoptopenjdk/openjdk11
LABEL authors="ydong"
ENV CLI_DIR=/cli

RUN mkdir ${CLI_DIR}
WORKDIR ${CLI_DIR}
RUN curl https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz --output kafka.tgz --output kafka.tgz
RUN tar -xvf kafka.tgz
ENTRYPOINT ["top", "-b"]
  • CLI 컨테이너 접속 후 예제 명령어 실행
    ./kafka-broker-api-versions.sh --bootstrap-server kafka:9092

테스트를 편의를 위한 hosts 설정

서비스 이름을 통해서 라우팅이 가능하기 때문에 설정할 필요가 없었다.

전체 설정 파일 코드

  • 컴포즈 파일
services:
  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_HEAP_OPTS: "-Xmx400m -Xms400m" 
      KAFKA_BROKER_ID: 0
      KAFKA_NUM_NETWORK_THREADS: 3
      KAFKA_NUM_IO_THREADS: 8

      KAFKA_LOG_DIRS: /Users/dvwy/Documents/test/kafka_2_12-2_5_0/data
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
      KAFKA_ OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TaaaaaaRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 18000
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    networks:
      - data
    volumes:
      - "./log:/Users/dvwy/Documents/test/kafka_2_12-2_5_0/data"

  zookeeper:
    image: confluentinc/cp-zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - 2181:2181
    networks:
      - data

  cli:
    build:
      dockerfile: ./Dockerfile
    networks:
      - data

networks:
  data:
  • CLI 전용 컨테이너
FROM adoptopenjdk/openjdk11
LABEL authors="ydong"
ENV CLI_DIR=/cli

RUN mkdir ${CLI_DIR}
WORKDIR ${CLI_DIR}
RUN curl https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz --output kafka.tgz --output kafka.tgz
#RUN curl https://archive.apache.org/dist/kafka/3.6.0/kafka_2.12-3.6.0.tgz --output kafka.tgz --output kafka.tgz
RUN tar -xvf kafka.tgz
ENTRYPOINT ["top", "-b"]

ERROR

  • 클러스터 ID 충돌 이슈
  • 컨테이너를 재시작하는 경우 발생했던 오류로 여기까지 실습을 진행한 경우에는 그냥 log에 존재하는 meta.properties 파일을 삭제하는 것으로 해결

 

REFERENCE

아파치 카프카 애플리케이션 프로그래밍 with 자바