Data Engineering/Kafka

Producer의 send() method 수행 과정 (feat. flush())

신수동탈곡기 2022. 3. 15. 17:49

개요 및 결론

send()는 즉시 broker에게 메세지를 전송하지 않는다. send()를 통해 메세지를 내부 버퍼에 쌓아두고 flush()를 통해 broker로 전달한다. 물론 send()와 flush() 사이에도 몇 가지 과정을 거친다.

프로듀서가 메세지를 보내는 과정

카프카 프로듀서의 작업 처리 과정

Serializer

메세지 객체를 바이트 배열(byte array)로 직렬화한다.

Partitioner

Kafka Producer가 send() 메소드를 호출하면 Record는 Partitioner에게 전달된다. Partitioner는 지정한 토픽에서 어느 파티션으로 전송할 지 정하는 역할을 한다. Kafka Producer 객체를 생성할 때 Partitioner를 지정할 수 있으며, 설정하지 않으면 DefaultPartitioner로 설정된다.

프로듀서API를 사용하면 ‘UniformStickyPartitioner’와 ‘RoundRobinPartitioner’를 제공하는데, Kafka Client Library 2.5.0 버전에서는 UniformStickyPartitioner가 DefaultPartitioner로 설정된다. 두 Parititioner는 메시지를 배치로 묶는 알고리즘이 다르며, UniformSticky가 더 효율적인 방ㅂ법을 사용하여 높은 처리량, 낮은 리소스 사용률을 가진다.

사용자가 원하는 Partitioner를 만들 수 있도록 Kafka Client Library는 Partitioner Interface를 제공한다. 메시지 키, 값을 통해 어떤 파티션에 전송할지 정하는 클래스를 생성할 수 있다.

Accumulator

파티셔너로 어떤 파티션으로 전송될 지 정해진 메세지들은 Accumulator에 버퍼로 쌓인다. 센더 스레드는 accumulator에 쌓인 메세지를 broker에 전송한다.

flush() 메소드

flush() 메소드의 주석과 코드는 다음과 같다.

Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.
    @Override
    public void flush() {
        log.trace("Flushing accumulated records in producer.");
        this.accumulator.beginFlush();
        this.sender.wakeup();
        try {
            this.accumulator.awaitFlushCompletion();
        } catch (InterruptedException e) {
            throw new InterruptException("Flush interrupted.", e);
        }
    }