본문 바로가기

Data Engineering/Apache Airflow

[Airflow] 튜토리얼 따라하기 (with 공식 문서🧐)

Airflow 공식문서의 튜토리얼을 따라해보자. 
 
 
오늘 따라할 코드는 다음과 같다. 
 

airflow/example_dags/tutorial.py

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    'tutorial',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

당황하지 말기,,,

 
시작에 앞서 이 파일은 실제 데이터 처리 작업이 실행되는 파일이 아닌 DAG의 구조를 정의하는 파일이라는걸 알아두자.
 
그럼 시작!
 

Importing Modules

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

 
DAG을 인스턴트화하는데 필요한 모듈, 실제 작동하는데 필요한 모듈을 가져온다.
 
 

Default Arguments

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
},

 
사용할 argument는 위와 같이 파이썬 딕셔너리 형태로 정의할 수 있다. 
 

  • depends_on_past: True로 설정시 task instance는 순차적으로 실행되며, 이전 instance가 성공했거나 스킵된 경우에만 실행된다.
  • email: 이메일 알림을 받을 메일 주소이다. 주소가 여러개일 경우 쉼표나 세미콜론, 문자열 리스트를 사용할 수 있다.
  • email_on_failure: True로 설정시 작업이 실패했을 때 이메일 알림을 받을 수 있다.
  • email_on_retry: True로 설정시 작업을 재시도할 때 이메일 알림을 받을 수 있다.
  • retries: 작업 실패 전 재시도할 횟수
  • retry_delay: 재시도간의 딜레이. 기본값은 timedelta(seconds=300)이다.

* BaseOperator의 파라미터 정보는 여기에서 더 확인할 수 있다.

 
 

Instantiate a DAG

with DAG(
    'tutorial',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

 
DAG 객체를 생성하는 코드이다. 여기서는 DAG의 고유 식별자 역할을 하는 dag_id를 'tutorial'로 지정하고, 앞서 정의한 default_args 딕셔너리를 넣어주었다. schedule_interval은 1일로 정의했다.
 
 

Tasks

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
)

 
Task는 operator 객체를 인스턴스화할 때 생성된다. task_id는 Task의 고유 식별자 역할을 한다.
 
argument와 관련해 알아두어야 하는 것은 다음과 같다.

  • argument의 우선순위는 1) 명시적으로 전달된 값, 2) degault_args에 있는 값, 3) 디폴트 값(존재하는 경우) 순이다. 
  • Task는 task_id와 owner 값을 포함하거나 상속해아한다.

 

Templating with Jinja

templated_command = dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
)

 
airflow에서는 Jinja Templating을 활용한다. 이는 사용자에게 기본 파라미터와 매크로 집합을 제공하며, 사용자가 자체 파라미터와 매크로, 템플릿을 정의할 수도 있다.
 
template_command는 {{ ds }}로 파라미터를 참조하고 {{ macro.ds_add(ds, 7) }}과같이 함수를 호출한다. 이때 ds는 실행 날짜를, ds_add(ds, days)는 실행 날짜에서 days만큼을 더하는 것을 의미한다.
 
 

Adding DAG and Tasks documentation

t1.doc_md = dedent(
    """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

"""
)

dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
"""  # otherwise, type it like this

 
DAG은 markdown을 사용해서, Task는 markdown을 포함해 text, json 등으로 설명을 작성할 수 있다.
 
아래는 웹에서 확인한 모습!
 

airflow tutorial - Task doc
airflow - DAG doc

 

Setting up Dependencies

Task의 종속성을 정의할 수 있는 코드를 보자.
 

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

 
위와 같이 set_downstream과 set_upstream, 또는 >>와 <<를 사용해 Task간의 종속성을 정의할 수 있다.
 
❗ raise exception: DAG에 cycle이 나타나거나 종속성이 두 번 이상 참조될 경우
 
 


오늘은 공식문서에서 DAG의 기본 구조를 정의하는 파이썬 코드를 따라해봤다. 다음번엔 이 코드로 테스트를 해볼 예정이다😃
 

참고자료

 

 

Tutorial — Airflow Documentation

airflow.apache.org

 

[Python] with문 이해하기

Python의 with문에 대해서 알아보겠습니다. 자원을 획득하고 사용 후 반납해야 하는 경우 주로 사용합니다...

blog.naver.com

 

객체와 인스턴스

간단하게 객체는 실제로 존재하는 것이라고 정의하는데, 실제로 존재하는 것이라면, 속성과 기능의 집합이라고 말할 수 있다. 여기서 속성과 기능에 따라 용도가 달라진다고 정의하였다. 그렇

velog.io