Data Engineering/Kafka

Kafka tutorial - 1 [설치, topic생성, 발행, consumer group 실행]

신수동탈곡기 2021. 7. 3. 17:25

install Kafka

wget을 이용해 kafka를 download 받은 후, tar를 해제한다. kafka 버전별 wget link는 아래의 링크에서 찾을 수 있다. (다운로드 받으려는 kafka 버전의 하이퍼링크 링크를 복사하고 wget으로 받으면 된다.)

https://kafka.apache.org/downloads

~ $ wget https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
--2021-07-03 16:03:13--  https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 57215197 (55M) [application/x-gzip]
Saving to: `kafka_2.12-2.3.0.tgz'

kafka_2.12-2.3.0.tgz 100% [===========================================>]  54.56M   670KB/s    in 43s

2021-07-03 16:03:58 (1.26 MB/s) - `kafka_2.12-2.3.0.tgz' saved [57215197/57215197]
~ $
~ $
~ $ tar -zxf kafka_2.12-2.3.0.tgz
~ $ rm kafka_2.12-2.3.0.tgz
~ $ cd kafka_2.12-2.3.0
~/kafka_2.12-2.3.0 $ mkdir logs
~/kafka_2.12-2.3.0 $ vi config/server.properties # 설정에서 두 가지 config를 수정했다.

server.properties

...

log.dirs=/Users/seongminheo/kafka_2.12-2.3.0/logs # log path 수정

...

num.partitions=2 # defaulte partition 개수 수정

...

start zookeeper, kafka

주키퍼와 카프카를 실행할 때는 sh파일 바로 뒤에 config 파일 경로를 지정해야 한다.

# start zookeeper
~/kafka_2.12-2.3.0 $ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

# start kafka
~/kafka_2.12-2.3.0 $ ./bin/kafka-server-start.sh ./config/server.properties &

daemon 모드로 실행하기

~/kafka_2.12-2.3.0 $ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
~/kafka_2.12-2.3.0 $ ./bin/kafka-server-start.sh -daemon ./config/server.properties
~/kafka_2.12-2.3.0 $ ps
  PID TTY           TIME CMD
 9569 ttys000    0:00.77 -zsh
10329 ttys000    0:01.79 /usr/local/Cellar/openjdk@11/11.0.10/bin/java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:I
10620 ttys000    0:02.55 /usr/local/Cellar/openjdk@11/11.0.10/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:Initi
66507 ttys001    0:00.27 /bin/zsh --login -i

zookeeper, kafka를 daemon모드로 실행하면 프로세스 관리를 보다 수월하게 할 수 있다.

show topics list

~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets

같은 호스트에서 실행했고 기본 포트를 그대로 사용했기 때문에 localhost:9092를 bootstrap-server로 지정했다. 아직 아무런 topic을 생성하지 않았기 때문에 __consumer_offsets만 보인다.

create and delete topic

~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic topic_randint
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic_randint_2
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
topic_randint
topic_randint_2
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic topic_randint_2
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
topic_randint

send message to kafka topic by python (메세지 발행하기?)

매우 간단한 파이썬 스크립트를 만들어서 메세지를 발행해보려고 한다.

# producer.py

from kafka import KafkaProducer
import datetime
import time
import random
import json


broker = ["localhost:9092"]
topic_name = "topic_randint"
producer = KafkaProducer(bootstrap_servers=broker)

if __name__ == "__main__":
    id = 0
    while 1:
        value = random.randint(1, 10000)
        now = datetime.datetime.now().replace()

        msg = {'id': id, 'value': value, 'date': str(now)}
        msg = json.dumps(msg)

        producer.send(topic_name, msg.encode())

        id += 1

        if id % 3 == 0:
            time.sleep(2)

consumer group 관련

consumger groups list

~/kafka_2.12-2.3.0 $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

아무것도 없어서 출력이 안된다.

~/kafka_2.12-2.3.0 $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_randint --group group_randint_1

'topic_randint`라는 topic을 구독하는 group 'group_randint_1'이라는 이름의 consumer group을 만든다. 위 명령어를 실행하면 consumger group은 topic에 message가 들어오는 것을 기다리고 있으며, message가 들어오는대로 message를 처리한다.

producer.py 스크립트를 실행시키면 위 명령어를 실행한 터미널에서 처리한 메세지의 내용을 출력한다.

스크립트 실행 전

스크립트 실행 후

consumer group 종료 후

consumer groups describe

~/kafka_2.12-2.3.0 $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_randint_1
Consumer group 'group_randint_1' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
group_randint_1 topic_randint   0          11              11              0               -               -               -
group_randint_1 topic_randint   1          10              10              0               -  
             -               -
~/kafka_2.12-2.3.0 $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_randint --group group_randint_1 &
~/kafka_2.12-2.3.0 $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_randint_1
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
group_randint_1 topic_randint   0          11              11              0               consumer-1-1b17b879-7f0d-4af8-a790-bb32a29bc56c /127.0.0.1      consumer-1
group_randint_1 topic_randint   1          10              10              0               consumer-1-1b17b879-7f0d-4af8-a790-bb32a29bc56c /127.0.0.1      consumer-1

위 코드박스는 group_randint_1이라는 consumer group을 실행, 종료하고 describe를 출력한 박스이고, 아래 코드박스는 group_randint_1이라는 consumer group을 백그라운드로 실행, 종료하지 않고 describe를 출력한 박스다. consumer group을 백그라운드로 실행시킨 상황에서는 Consumer group 'group_randint_1' has no active members.문구가 출력되지 않는다.

~/kafka_2.12-2.3.0 $ kill -9 3718 # consumer group을 삭제함

보충해야할 점

  • consumer_offsets topic의 역할
  • topic 생성할 때 --bootstrap-server localhost:9092, --zookeeper localhost:2181 둘 다 되는데 어떤게 맞는건지 알아보기
  • 각 용어에 대한 확실한 개념 정리
  • consumer group 운영 방법
  • consumer group describe의 각 열의 의미