Airflow 공식문서의 튜토리얼을 따라해보자.
오늘 따라할 코드는 다음과 같다.
airflow/example_dags/tutorial.py
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
시작에 앞서 이 파일은 실제 데이터 처리 작업이 실행되는 파일이 아닌 DAG의 구조를 정의하는 파일이라는걸 알아두자.
그럼 시작!
Importing Modules
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
DAG을 인스턴트화하는데 필요한 모듈, 실제 작동하는데 필요한 모듈을 가져온다.
Default Arguments
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
사용할 argument는 위와 같이 파이썬 딕셔너리 형태로 정의할 수 있다.
- depends_on_past: True로 설정시 task instance는 순차적으로 실행되며, 이전 instance가 성공했거나 스킵된 경우에만 실행된다.
- email: 이메일 알림을 받을 메일 주소이다. 주소가 여러개일 경우 쉼표나 세미콜론, 문자열 리스트를 사용할 수 있다.
- email_on_failure: True로 설정시 작업이 실패했을 때 이메일 알림을 받을 수 있다.
- email_on_retry: True로 설정시 작업을 재시도할 때 이메일 알림을 받을 수 있다.
- retries: 작업 실패 전 재시도할 횟수
- retry_delay: 재시도간의 딜레이. 기본값은 timedelta(seconds=300)이다.
* BaseOperator의 파라미터 정보는 여기에서 더 확인할 수 있다.
Instantiate a DAG
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
DAG 객체를 생성하는 코드이다. 여기서는 DAG의 고유 식별자 역할을 하는 dag_id를 'tutorial'로 지정하고, 앞서 정의한 default_args 딕셔너리를 넣어주었다. schedule_interval은 1일로 정의했다.
Tasks
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
Task는 operator 객체를 인스턴스화할 때 생성된다. task_id는 Task의 고유 식별자 역할을 한다.
argument와 관련해 알아두어야 하는 것은 다음과 같다.
- argument의 우선순위는 1) 명시적으로 전달된 값, 2) degault_args에 있는 값, 3) 디폴트 값(존재하는 경우) 순이다.
- Task는 task_id와 owner 값을 포함하거나 상속해아한다.
Templating with Jinja
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
airflow에서는 Jinja Templating을 활용한다. 이는 사용자에게 기본 파라미터와 매크로 집합을 제공하며, 사용자가 자체 파라미터와 매크로, 템플릿을 정의할 수도 있다.
template_command는 {{ ds }}로 파라미터를 참조하고 {{ macro.ds_add(ds, 7) }}과같이 함수를 호출한다. 이때 ds는 실행 날짜를, ds_add(ds, days)는 실행 날짜에서 days만큼을 더하는 것을 의미한다.
Adding DAG and Tasks documentation
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
DAG은 markdown을 사용해서, Task는 markdown을 포함해 text, json 등으로 설명을 작성할 수 있다.
아래는 웹에서 확인한 모습!
Setting up Dependencies
Task의 종속성을 정의할 수 있는 코드를 보자.
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
위와 같이 set_downstream과 set_upstream, 또는 >>와 <<를 사용해 Task간의 종속성을 정의할 수 있다.
❗ raise exception: DAG에 cycle이 나타나거나 종속성이 두 번 이상 참조될 경우
오늘은 공식문서에서 DAG의 기본 구조를 정의하는 파이썬 코드를 따라해봤다. 다음번엔 이 코드로 테스트를 해볼 예정이다😃
참고자료
'Data Engineering > Apache Airflow' 카테고리의 다른 글
[Airflow] Amazon S3에 파일 업로드하기 (+📚 알라딘 Open API 예제) (0) | 2024.01.24 |
---|---|
[Airflow] Docker로 Apache Airflow 설치하기 (with 공식 문서🧐) (0) | 2022.05.26 |