브로커가 하는 일의 대부분은 클라이언트와 파티션 리플리카 및 컨트롤러부터 파티션 리더에게 전송되는 요청을 처리하는 것이다. 이러한 요청과 응답에는 TCP로 전송되는 이진 프로토콜을 가지고 있다. 모든 요청에는 다음 내용을 포함하는 헤더가 존재하며 요청 타입에 따라 서로 다른 구조의 데이터를 전송한다.
- 요청 타입 ID: 어떤 요청인지를 나타낸다. 예를 들어, 메시지를 쓰는 요청은 produce라고 하며 id는 0, 메시지를 읽는 요청은 fetch라고 하며 id는 1 등등...
- 요청 버전: 프로토콜 API의 버전을 나타낸다. 요청 버전 덕분에 서로 다른 프로토콜을 사용하는 브로커가 요청을 보내도, 카프카는 버전에 맞추어 요청을 처리할 수 있다.
- cID (correlation ID): 각 요청의 고유 식별 번호. 로깅, 디버깅에 유용하게 사용 가능
- 클라이언트 ID: 요청을 전송한 클라이언트의 ID
브로커 내부에서는...
브로커는 자신이 리스닝하는 포트에서 acceptor 스레드를 실행하며 이 스레드는 연결을 생성한다. processor 스레드 (혹은 네트워크 스레드)가 그 다음 단계를 처리한다. processor 스레드는 클라이언트 연결에서 요청을 받고 요청 큐에 넣는 일, 응답 큐에서 응답을 가져와 클라이언트 측으로 전송하는 일을 수행한다. 요청 큐에 요청이 위치하면 입출력 스레드(혹은 요청 처리 스레드)가 요청을 가져와 처리한다.
쓰기요청과 읽기요청
- 쓰기요청: 프로듀서가 전송하며 브로커에게 쓰려는 메시지를 포함한다.
- 읽기요청: 브로커로부터 메시지를 읽을 때 컨슈머와 팔로워 리플리카가 전송한다.
실질적인 요청을 처리하는 주체는 리더 리플리카이기 때문에 카프카 클라이언트들의 모든 요청은 요청과 관련된 파티션들의 리더 리플리카가 있는 브로커에게 해야한다. 프로듀서와 컨슈머, 팔로워 리플리카는 리더 특정 파티션들의 리더가 없는 브로커에게 요청하면 ‘파티션 리더가 아님’이라는 에러 응답을 받는다.
카프카 내부의 클라이언트들이 요청을 보낼 리더 리플리카를 찾는 방법
요청 타입 중에 메타데이터 요청(metadata request)라는 요청 타입이 있다. 이 요청을 하면 브로커는 토픽에 존재하는 파티션들, 각 파티션의 리플리카, 어떤 리플리카가 리더인지 등의 정보를 응답한다. 모든 브로커가 이런 정보를 담고있는 메타데이터 캐시를 갖고 있기 때문에 이 요청은 아무 브로커에게나 할 수 있다.
클라이언트들은 이 정보를 토대로 각 파티션의 올바른 브로커에게 요청을 전송한다. 클라이언트들은 config metadata.max.age.mx에서 지정한 시간이 지나거나 ‘파티션 리더가 아님’ 에러를 수신할 때 메타데이터 요청을 보낸다.
쓰기요청
특정 파티션의 리더 리플리카를 포함하는 브로커가 쓰기 요청을 받으면 다음 사항을 검사한다.
- 데이터를 전송한 사용자가 쓰기 권한이 있는가?
- acks의 값이 유효한가? (0, 1, ‘all’) 중 하나
- acks가 all이라면 메시지를 안전하게 쓰는 데 충분한 ISR들이 있는가?
위 사항을 검사한 후 적합하면 아래와 같이 쓰기를 진행한다.
- 브로커가 로컬 디스크(리눅스의 경우 파일시스템 캐시)에 메시지를 쓴다
- acks의 값을 보고, 0 혹은 1이면 즉시 응답을 전송하고 all이면 리더 리플리카가 팔로워 리플리카들이 복제를 완료했는지 확인한 후 응답을 전송한다.
읽기요청
읽기 요청을 하는 클라이언트는 읽기를 원하는 메시지의 토픽, 파티션, 오프셋을 전달하여 요청한다. (“Test 토픽, 0 파티션, 53오프셋” 같은 정보가 담기도록 요청함) 이러한 요청에 브로커는 오프셋 이후의 메시지를 전달하는데, 몇 개의 메세지를 전달받을지는 클라이언트가 지정할 수 있다. (consumer의 경우 max.poll.records config로 지정 가능) 클라이언트는 브로커가 전송한 응답을 저장하기 위해 메모리를 할당해야 하므로 클라이언트 별로 적합한 메시지 개수를 지정해야 한다.
클라이언트는 반환 데이터의 max 크기도 지정할 수 있지만 min 크기도 지정할 수 있다. 반환되는 데이터가 거의 없는데도 아주 짧은 시간동안 계속 요청, 응답을 주고받으면 네트워크 트래픽만 늘어난다. “최소한 100KB가 되면 메시지를 전송해” 라는 설정을 해 빈번한 네트워크 통신을 줄일 수 있다. 모아서 보내기 때문에 네트워크 통신 횟수는 줄어들고 결국 읽는 메시지의 양은 같다.
또한 너무 잦은 네트워크 통신을 피하고 싶은거지 너무 긴 시간 기다리게 되면 처리에 문제가 생긱 수 있다. “00초가 지나도 지정한 만큼의 메시지가 쌓이지 않으면 그냥 보내라” 같은 설정도 가능하다.
카프카는 제로카피 기법을 사용해 클라이언트에게 메시지를 전송한다. 파일 시스템에 기록한 메시지를 어플리케이션 버퍼에 쓰지 않고, 곧바로 네트워크 채널로 전송하기 때문에 성능이 향상시킬 수 있다.
대부분의 클라이언트는 모든 ISR에 쓴 메시지들만 읽을 수 있다. 리더 리플리카에만 존재하는 메시지를 읽을 수 있게 허용한다면 일관성을 잃을 수 있기 때문이다. 팔로워 리플리카에게 복제되지 않은 메세지를 어떤 컨슈머가 읽은 후 리더가 중단되면 해당 메시지를 가지고 있는 다른 파티션이 없기 때문에 그 메시지가 사라지게 되고 다른 컨슈머들은 유실된 메시지를 읽을 수 없다. config replica.lag.time.max.ms로 복제를 기대라는 최대 시간을 지정할 수 있다.
'Data Engineering > Kafka' 카테고리의 다른 글
Producer의 send() method 수행 과정 (feat. flush()) (2) | 2022.03.15 |
---|---|
카프카 내부 메커니즘 - 2.컨트롤러 (0) | 2022.02.15 |
카프카 내부 메커니즘 - 1.클러스터 멤버십 (0) | 2022.02.15 |
Kafka의 Zero copy (0) | 2022.02.12 |
Kafka의 구조와 원리 (0) | 2021.07.04 |