install Kafka
wget을 이용해 kafka를 download 받은 후, tar를 해제한다. kafka 버전별 wget link는 아래의 링크에서 찾을 수 있다. (다운로드 받으려는 kafka 버전의 하이퍼링크 링크를 복사하고 wget으로 받으면 된다.)
https://kafka.apache.org/downloads
~ $ wget https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
--2021-07-03 16:03:13-- https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 57215197 (55M) [application/x-gzip]
Saving to: `kafka_2.12-2.3.0.tgz'
kafka_2.12-2.3.0.tgz 100% [===========================================>] 54.56M 670KB/s in 43s
2021-07-03 16:03:58 (1.26 MB/s) - `kafka_2.12-2.3.0.tgz' saved [57215197/57215197]
~ $
~ $
~ $ tar -zxf kafka_2.12-2.3.0.tgz
~ $ rm kafka_2.12-2.3.0.tgz
~ $ cd kafka_2.12-2.3.0
~/kafka_2.12-2.3.0 $ mkdir logs
~/kafka_2.12-2.3.0 $ vi config/server.properties # 설정에서 두 가지 config를 수정했다.
server.properties
...
log.dirs=/Users/seongminheo/kafka_2.12-2.3.0/logs # log path 수정
...
num.partitions=2 # defaulte partition 개수 수정
...
start zookeeper, kafka
주키퍼와 카프카를 실행할 때는 sh파일 바로 뒤에 config 파일 경로를 지정해야 한다.
# start zookeeper
~/kafka_2.12-2.3.0 $ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
# start kafka
~/kafka_2.12-2.3.0 $ ./bin/kafka-server-start.sh ./config/server.properties &
daemon 모드로 실행하기
~/kafka_2.12-2.3.0 $ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
~/kafka_2.12-2.3.0 $ ./bin/kafka-server-start.sh -daemon ./config/server.properties
~/kafka_2.12-2.3.0 $ ps
PID TTY TIME CMD
9569 ttys000 0:00.77 -zsh
10329 ttys000 0:01.79 /usr/local/Cellar/openjdk@11/11.0.10/bin/java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:I
10620 ttys000 0:02.55 /usr/local/Cellar/openjdk@11/11.0.10/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:Initi
66507 ttys001 0:00.27 /bin/zsh --login -i
zookeeper, kafka를 daemon모드로 실행하면 프로세스 관리를 보다 수월하게 할 수 있다.
show topics list
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
같은 호스트에서 실행했고 기본 포트를 그대로 사용했기 때문에 localhost:9092를 bootstrap-server로 지정했다. 아직 아무런 topic을 생성하지 않았기 때문에 __consumer_offsets만 보인다.
create and delete topic
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic topic_randint
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic_randint_2
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
topic_randint
topic_randint_2
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic topic_randint_2
~/kafka_2.12-2.3.0 $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
topic_randint
send message to kafka topic by python (메세지 발행하기?)
매우 간단한 파이썬 스크립트를 만들어서 메세지를 발행해보려고 한다.
# producer.py
from kafka import KafkaProducer
import datetime
import time
import random
import json
broker = ["localhost:9092"]
topic_name = "topic_randint"
producer = KafkaProducer(bootstrap_servers=broker)
if __name__ == "__main__":
id = 0
while 1:
value = random.randint(1, 10000)
now = datetime.datetime.now().replace()
msg = {'id': id, 'value': value, 'date': str(now)}
msg = json.dumps(msg)
producer.send(topic_name, msg.encode())
id += 1
if id % 3 == 0:
time.sleep(2)
consumer group 관련
consumger groups list
~/kafka_2.12-2.3.0 $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
아무것도 없어서 출력이 안된다.
~/kafka_2.12-2.3.0 $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_randint --group group_randint_1
'topic_randint`라는 topic을 구독하는 group 'group_randint_1'이라는 이름의 consumer group을 만든다. 위 명령어를 실행하면 consumger group은 topic에 message가 들어오는 것을 기다리고 있으며, message가 들어오는대로 message를 처리한다.
producer.py
스크립트를 실행시키면 위 명령어를 실행한 터미널에서 처리한 메세지의 내용을 출력한다.
스크립트 실행 전
스크립트 실행 후
consumer group 종료 후
consumer groups describe
~/kafka_2.12-2.3.0 $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_randint_1
Consumer group 'group_randint_1' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group_randint_1 topic_randint 0 11 11 0 - - -
group_randint_1 topic_randint 1 10 10 0 -
- -
~/kafka_2.12-2.3.0 $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_randint --group group_randint_1 &
~/kafka_2.12-2.3.0 $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_randint_1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group_randint_1 topic_randint 0 11 11 0 consumer-1-1b17b879-7f0d-4af8-a790-bb32a29bc56c /127.0.0.1 consumer-1
group_randint_1 topic_randint 1 10 10 0 consumer-1-1b17b879-7f0d-4af8-a790-bb32a29bc56c /127.0.0.1 consumer-1
위 코드박스는 group_randint_1이라는 consumer group을 실행, 종료하고 describe를 출력한 박스이고, 아래 코드박스는 group_randint_1이라는 consumer group을 백그라운드로 실행, 종료하지 않고 describe를 출력한 박스다. consumer group을 백그라운드로 실행시킨 상황에서는 Consumer group 'group_randint_1' has no active members.
문구가 출력되지 않는다.
~/kafka_2.12-2.3.0 $ kill -9 3718 # consumer group을 삭제함
보충해야할 점
- consumer_offsets topic의 역할
- topic 생성할 때
--bootstrap-server localhost:9092
,--zookeeper localhost:2181
둘 다 되는데 어떤게 맞는건지 알아보기 - 각 용어에 대한 확실한 개념 정리
- consumer group 운영 방법
- consumer group describe의 각 열의 의미
'Data Engineering > Kafka' 카테고리의 다른 글
카프카 내부 메커니즘 - 4. 요청 처리 (0) | 2022.02.17 |
---|---|
카프카 내부 메커니즘 - 2.컨트롤러 (0) | 2022.02.15 |
카프카 내부 메커니즘 - 1.클러스터 멤버십 (0) | 2022.02.15 |
Kafka의 Zero copy (0) | 2022.02.12 |
Kafka의 구조와 원리 (0) | 2021.07.04 |