Airflow?
스케쥴러
특징
장점
Install
~ $ conda create -n airflow_tutorial python=3.8
~ $ conda activate airflow_tutorial
(airflow_tutorial) ~ $ pip install apache-airflow
(airflow_tutorial) ~ $ airflow version # install check
2.1.1
(airflow_tutorial) ~ $ mkdir airflow && cd airflow
airflow home path 설정해주기
(airflow_tutorial) ~/airflow $ vi ~/.zshrc # 자신이 사용하는 쉘에 따라서 vi로 편집
# ~/.zshrc
export AIRFLOW_HOME=path/to/airflow/dir
export PATH=$PATH:$AIRFLOW_HOME
(airflow_tutorial) ~/airflow $ source ~/.zshrc
(airflow_tutorial) ~/airflow $ echo $AIRFLOW_HOME # path 설정 확인
/Users/seongminheo/airflow
초기설정
- airflow에서는 기본 데이터베이스로 sqlite를 사용한다. 다른 데이터베이스를 활용하기 위해서는 추가적인 설정이 필요하다.
(airflow_tutorial) ~/airflow $ ll
(airflow_tutorial) ~/airflow $ airflow db init
(airflow_tutorial) ~/airflow $ ll
total 1384
rw-r--r-- 1 seongminheo staff 41K 7 10 16:36 airflow.cfg
rw-r--r-- 1 seongminheo staff 596K 7 10 16:37 airflow.db
drwxr-xr-x 3 seongminheo staff 96B 7 10 16:36 logs
rw-r--r-- 1 seongminheo staff 4.6K 7 10 16:36 webserver_config.py
web server 유저 생성
- 아래와 같은 명령어를 실행하고 나서 패스워드를 묻는다.
- 각 args에는 각자에 맞는 값을 입력해서 유저를 생성하면 된다.
(airflow_tutorial) ~/airflow $ airflow users create \
--username heosm \
--firstname seongmin \
--lastname heo \
--role Admin \
--email admin@example.org
web server 실행
- 아래 명령어를 실행하면 기본 포트인 8080포트로 웹서버와 스케줄러가 백그라운드로 실행된다.
(airflow) [user@namenode airflow]$ airflow webserver -D
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2022-02-07 11:22:42,612] {dagbag.py:487} INFO - Filling up the DagBag from /dev/null
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat:
=================================================================
(airflow) [user@namenode airflow]$
(airflow) [user@namenode airflow]$
(airflow) [user@namenode airflow]$ airflow scheduler -D
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
위에서 생성한 유저네임과 패스워드로 로그인할 수 있다.
처음 로그인하면 DAG 예제가 목록에 있을텐데, 나는 list에 나타나지 않도록 수정했다. 아래의 명령어로 일일이 삭제하지 않아도 보이지 않도록 할 수 있다. (하나 있는 DAG은 본인이 생성한 DAG)
$ vi airflow.cfg
load_example = False
DAG
DAG은 하나의 워크플로우로, task들의 집합이다. operator클래스를 이용해 각 task를 정의하고, task들의 순서를 정의하여 하나의 DAG을 만든다. operator에는 bash operator, python operator 등 다양한 operator들이 제공되고, 그 외에 사람들이 만든 operator들도 사용할 수 있다.
DAG 작성
- schedule_interval
실행 주기를 지정하는 부분으로 cron 명령과 같은 방식으로 주기를 정의한다. 예시에서는 매분 실행하도록 정의했다. - start_date
DAG를 언제부터 시작할 것인지 지정한다. 현재보다 과거의 날짜로 지정한다면. 해당 시간부터 backfill을 진행한다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
import pendulum
local_tz = pendulum.timezone("Asia/Seoul")
# DAG에 대한 정보를 저장 및 유지할 객체
dag = DAG(
dag_id="hello-airflow",
description="Hello airflow DAG",
schedule_interval="* * * * *",
start_date=datetime(2021, 7, 10, 16, tzinfo=local_tz),
catchup=False
)
# "Hello hello"를 반환하는 함수
def print_hello():
return "Hello World"
# PythonOperator로 print_hello 함수를 호출하는 task를 생성
python_task = PythonOperator(
task_id="python_operator",
python_callable=print_hello,
dag=dag
)
# BashOperator로 쉘 커맨드에서 whoami 명령어를 실행할 task를 생성
bash_task = BashOperator(
task_id="print_whoami",
bash_command="whoami",
dag=dag
)
# bash_task 실행 후 python_task 실행
bash_task.set_downstream(python_task)
# 아래 코드라인들은 위 코드와 같은 코드
# python_task.set_upstream(bash_task)
# bash_task >> python_task
토글을 on시켜 DAG을 실행시켰다. airflow home 디렉토리에서도 log를 볼 수 있지만, 웹에서도 로그를 각 task별로, 실행 시간대별로 볼 수 있어 유용하다.
해당 DAG 이름을 클릭하여 tree view 메뉴로 들어갔다. 외에도 graph view, calendar view도 제공하니 필요에 따라 각 메뉴를 활용하면 유용할 것 같다. 동그라미는 DAG을 의미하고, 네모는 각 task를 의미한다. 색깔을 통해 각 task의 상태를 표현해주니 한 눈에 진행상황을 파악할 수 있다.
task를 클릭하면 다양한 기능을 사용할 수 있는데 log 기능을 통해 해당 task의 log를 바로 볼 수 있다.
쉘에서 whoami 명령어를 실행하는 task의 log를 보니 내 이름이 잘 찍혀있다. ^^
'Data Engineering > Airflow' 카테고리의 다른 글
Airflow의 parallelism과 dag_concurrency (0) | 2022.05.26 |
---|---|
airflow-webserver-monitor.pid is already locked (0) | 2022.05.26 |
Dockeroperator의 Bind mount을 활용한 Airflow 운영 (0) | 2021.09.05 |