Data Engineering 16

Airflow의 parallelism과 dag_concurrency

parallelism This defines the maximum number of task instances that can run concurrently in Airflow regardless of scheduler count and worker count. Generally, this value is reflective of the number of task instances with the running state in the metadata database. scheduler와 worker의 개수와 상관없이 Airflow 내에서 동시에 구동할 수 있는 task 인스턴스의 최대 개수 Configuration Reference — Airflow Documentation airflow.apache.o..

airflow-webserver-monitor.pid is already locked

증상 Airflow webserver, scheduler가 작동하지 않음 Error log Traceback (most recent call last): File "/home/keti/.local/lib/python3.6/site-packages/lockfile/pidlockfile.py", line 77, in acquire write_pid_to_pidfile(self.path) File "/home/keti/.local/lib/python3.6/site-packages/lockfile/pidlockfile.py", line 161, in write_pid_to_pidfile pidfile_fd = os.open(pidfile_path, open_flags, open_mode) FileExistsEr..

Producer의 send() method 수행 과정 (feat. flush())

개요 및 결론 send()는 즉시 broker에게 메세지를 전송하지 않는다. send()를 통해 메세지를 내부 버퍼에 쌓아두고 flush()를 통해 broker로 전달한다. 물론 send()와 flush() 사이에도 몇 가지 과정을 거친다. 프로듀서가 메세지를 보내는 과정 Serializer 메세지 객체를 바이트 배열(byte array)로 직렬화한다. Partitioner Kafka Producer가 send() 메소드를 호출하면 Record는 Partitioner에게 전달된다. Partitioner는 지정한 토픽에서 어느 파티션으로 전송할 지 정하는 역할을 한다. Kafka Producer 객체를 생성할 때 Partitioner를 지정할 수 있으며, 설정하지 않으면 DefaultPartitioner로..

spark-submit

spark submit summary 스파크 어플리케이션을 클러스터 매니저에 제출하기 위해 spark-submit이라는 툴을 이용한다. submit을 할 때 다양한 파라미터로 추가 정보를 제공할 수 있다. spark-submit parameter 파라미터 설명 --master 클러스터 매니저를 설정함. 아래 표에서 자세히 설명 --deploy-mode 드라이버 프로세스를 client에서 실행할지, cluster에서 실행할 지 결정함 (Default: client) --class 스파크 어플리케이션의 메인클래스 (Java, Scala 앱의 경우) --name 제출하는 스파크 어플리케이션의 이름 --jars 드라이버와 익스큐터의 CLASSPATH에 포함될 로컬 JAR 파일들 (콤마로 구분한 리스트로 지정) ..

카프카 내부 메커니즘 - 4. 요청 처리

브로커가 하는 일의 대부분은 클라이언트와 파티션 리플리카 및 컨트롤러부터 파티션 리더에게 전송되는 요청을 처리하는 것이다. 이러한 요청과 응답에는 TCP로 전송되는 이진 프로토콜을 가지고 있다. 모든 요청에는 다음 내용을 포함하는 헤더가 존재하며 요청 타입에 따라 서로 다른 구조의 데이터를 전송한다. 요청 타입 ID: 어떤 요청인지를 나타낸다. 예를 들어, 메시지를 쓰는 요청은 produce라고 하며 id는 0, 메시지를 읽는 요청은 fetch라고 하며 id는 1 등등... 요청 버전: 프로토콜 API의 버전을 나타낸다. 요청 버전 덕분에 서로 다른 프로토콜을 사용하는 브로커가 요청을 보내도, 카프카는 버전에 맞추어 요청을 처리할 수 있다. cID (correlation ID): 각 요청의 고유 식별 번..

카프카 내부 메커니즘 - 2.컨트롤러

컨트롤러 컨트롤러 역시 카프카 브로커 중 하나이며 일반 브로커의 기능에 더하여 파티션 리더를 선출하는 책임을 갖는다. 컨트롤러 선출 과정 카프카 클러스터가 실행되면 모든 브로커들은 최상위 노드에 /controller 임시 노드를 생성하려고 한다. 이 때 가장 먼저 실행된 브로커가 임시 노드를 생성하고 컨트롤러가 되며 나머지 브로커들은 ‘노드가 이미 존재한다’는 예외를 받고 이미 /controller 임시 노드가 있다는 것과 클러스터에 컨트롤러가 있다는 것을 알게 된다. 이후에 모든 브로커들은 /controller 노드에 주키퍼의 watch를 생성하여 controller 노드에 변화가 생기면 바로 알 수 있도록 한다. 컨트롤러 재선출 컨트롤러 브로커가 중단되거나 연결이 끊기면 임시 노드였던 /control..

카프카 내부 메커니즘 - 1.클러스터 멤버십

실제 업무에서 카프카를 사용하는 작성하는 어플리케이션을 위해서 내부 구조를 알 필요는 없지만 어떻게 동작하는지 알아두면 문제가 발생했을 때 신속히 원인을 파악하고 대처하는(트러블슈팅) 기반이 될 수 있다. 이 글에서는 책 을 바탕으로 카프카 복제(Replication)가 동작하는 방식, 카프카가 프로듀서와 컨슈머의 요청을 처리하는 방법, 카프카가 스토리지(파일 형식, 인덱스 등)를 처리하는 방법을 알아본다. 클러스터 멤버십 주키퍼의 트리구조 데이터 저장 카프카에서 사용하는 주키퍼의 중요사항을 알아볼 필요가 있다. 주피커는 내부적으로 디렉터리처럼 계층적인 트리구조로 데이터를 저장한다. 데이터를 저장하는 노드를 znode라고 하며 각 znode의 이름 앞에는 /(슬래시)를 붙여 디렉터리처럼 경로(path)를..

Kafka의 Zero copy

Zero copy Zero-copy란 컴퓨터에서 CPU의 개입을 받지 않고 한 메모리의 영역에서 다른 메모리의 영역으로 데이터를 카피하는 작업을 말한다. 불필요한 데이터 복사을 줄이고 CPU 자원을 아껴 성능을 개선할 수 있다. Kafka는 이 방식을 사용하여 클라이언트에서 요청하는 메시지를 빠르게 송신할 수 있다. 카프카는 파일(또는 리눅스의 경우 파일시스템 캐시)의 메시지를 중간 버퍼 메모리에 쓰지 않고 곧바로 네트워크 채널로 전송한다. 데이터를 클라이언트에게 전송하기 위해 로컬 캐시 메모리에 저장하는 대부분의 데이터베이스와 달리 Kafka가 갖는 하나의 특징이며 데이터 전송을 빠르게 하여 성능을 향상시킨다. 아래에서 데이터를 전송하는 일반적인 방식과 Kafka가 채택한 Zero copy 방식을 비교..

Spark의 핵심 RDD

RDD (Resilient Distributed Datasets) RDD는 변경 불가능하며 파티셔닝된 레코드들의 모음이다. (변경 안되고, 나누어져있는 기록들) RDD를 조작하는 코드를 짠다는 것은 실제로 조작하는 것이 아니라 어떻게 조작할지 기록하는 행위다. Directed acyclic graph(DAG)형태로 Lineage를 기록하는 과정이다. Transformation과 Actions RDD Operator에는 지연 처리 방식의 transformation과 즉시 실행 방식의 action 두 가지 operator가 있다. transformation operator는 데이터를 어떻게 조작할지 정의하는 operator(Lineage를 작성하는 operator)이고 action operator는 실제로 ..

Spark Application Architecture - 스파크의 작동 순서(스파크 클러스터)

클러스터 모드로 spark-submit 명령을 수행했을 때 spark application이 어떤 순서로 작동하는지 알아본다. 클라이언트의 요청 첫 단계는 스파크 애플리케이션(컴파인된 JAR파일이나 라이브러리 파일)을 제출하는 것. 스파크 어플리케이션을 제출하면 로컬에서 코드가 실행되어 클러스터 드라이버 노드에 요청한다. 이 과정에서 스파크 드라이버 프로세스의 자원을 함께 요구한다. 클러스터 매니저가 이 요청을 받아들이면 클러스터 워커 노드 중 한 곳에서 스파크 드라이버 프로세스를 실행한다. 스파크 잡을 제출한 클라이언트 프로세스는 이 때 종료되며 스파크 애플리케이션은 클러스터 내에서 수행된다. 애플리케이션 시작 드라이버 프로세스가 실행된 다음 사용자 코드가 실행된다. 사용자 코드에는 반드시 스파크 클러..