본문 바로가기

Data Engineering/Apache Airflow

[Airflow] Amazon S3에 파일 업로드하기 (+📚 알라딘 Open API 예제)

 
Airflow DAG를 사용해 파일을 Amazon s3에 업로드해보자.
 
*Docker로 Apache Airflow 설치하는 방법은 아래 글 참고

 

Docker로 Apache Airflow 설치하기 (with 공식 문서🧐)

데이터 파이프라인의 자동화를 구현해보고 싶어 Airflow에 대해 알아보았다. Apache Airflow Airflow is a platform to programmatically author, schedule and monitor workflows. Airflow는 Python 프로그래밍 언어를 통해 워크

donghae0230.tistory.com

 

1) AWS S3 버킷 생성

 
 

2) Airflow Connection 생성

Amazon Web Services를 선택한 후 Extra 필드에 JSON 형식으로  AWS 자격 증명(access key)을 입력
 

3) 테스트 파일 생성

docker-compose.yaml

volumes에 ./data:/opt/airflow/data 추가해 호스트의 경로와 컨테이너 내부 경로 연결 후 호스트 경로에 test.csv 파일 생성
 

4) DAG 작성

(코드는 이 블로그를 참고) 

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

def upload_to_s3 (filename: str, key: str, bucket_name: str) -> None:
    hook = S3Hook('aws_conn')
    hook.load_file(
        filename=filename,
        key=key,
        bucket_name=bucket_name,
        replace=True
    )

with DAG (
    dag_id='upload_to_s3',
    schedule_interval=None,
    start_date=datetime(2024, 1, 16),
    catchup=False
) as dag:
    upload = PythonOperator(
        task_id='upload',
        python_callable=upload_to_s3,
        op_kwargs={
            'filename':'/opt/airflow/data/test.csv',
            'key': 'test/test.csv',
            'bucket_name': 'aladin-books-bucket'
        }
    )

 
 
작성한 파이썬 파일은 호스트 경로의 dag 폴더에 저장
 
- 파이썬 실행 함수의 hook은 airflow가 외부 시스템과 상호작용 할 수 있도록 특정 API를 추상화한 것(예) SqliteHook, MySqlHook, S3Hook 등)이다.

- DAG의 catchup 변수는 DAG가 실행될 때 start_date 이후의 과거 실행을 따라잡을지 여부를 표현한다.
- 매개변수 중 key는 버킷 내부의 경로(prefix + object name)를 의미한다.
 
 

5) 컨테이너 재시작

수정된 docker-compose 파일을 적용하기 위해 docker-compose up 명령어 사용

docker-compose up --build  -d

 
✔ --build: 컨테이너 시작 전에 이미지 빌드
✔ --detach (-d): 백그라운드에서 컨테이너 실행
 

6) DAG 실행

이때 권한 에러(Access Denied) 발생 시 IAM에서 사용자 권한정책 추가(AmazonS3FullAccess)

 
 

7) 결과 확인

airflow webserver
Amazon S3

 
 

💁‍♂️ 알라딘 상품 리스트 API 사용해보기

테스트 파일을 업로드 해보았으니 Open API를 사용해 S3에 데이터를 적재해보자. 

*알라딘 Open API에 관한 정보는 여기 참고

 

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

import requests
import json
import pandas as pd

def fetch_books_info(**context):
    params = {
        'ttbkey':'[발급받은 키 입력]',
        'QueryType':'Bestseller',
        'MaxResults':'50',
        'SearchTarget':'Book',
        'output':'JS',
        'Version':'20131101',
        'Cover':'Big'}

    cat_list = {
        '소설/시/희곡':'1',
        '경제경영': '170',
        '과학':'987',
        '사회과학':'798',
        '에세이':'55889',
        '역사':'74',
        '예술/대중문화':'517',
        '인문학':'656',
        '자기계발':'336',
        '장르소설':'112011',
        '가정/요리/뷰티':'1230',
        '건강/취미/레저':'55890',
        '좋은부모':'2030',
        '컴퓨터/모바일':'351',
        '여행':'1196',
        '청소년':'1137',
        '고전':'2105'}

    df = pd.DataFrame()             # 도서 정보를 저장할 데이터 프레임 생성
    for cat_name in cat_list.values():
        for page in range(1, 15):   # 총 결과는 1,000개까지만 조회 가능(페이지 당 50권)
            API_URL = f'http://www.aladin.co.kr/ttb/api/ItemList.aspx?&Start={page}&CategoryId={cat_name}'
            raw_data = requests.get(API_URL, params=params)
            try:
                parsed_data = json.loads(raw_data.text, strict=False)
                df = pd.concat([df, pd.DataFrame(parsed_data['item'])])
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
                continue
    
    # 부가 정보 필드 제거
    df = df.drop('subInfo', axis=1)
    # Parquet 파일로 저장
    df.to_parquet(f"/opt/airflow/data/raw/books.parquet")

def upload_to_s3 (filename: str, key: str, bucket_name: str, **context) -> None:
    hook = S3Hook('aws_conn')
    hook.load_file(
        filename=filename,
        key=key,
        bucket_name=bucket_name,
        replace=True
    )

dag = DAG(
    'upload_to_s3',
    start_date=datetime(2024, 1, 24),
    catchup=False,
    schedule_interval='@monthly'
)

fetch_books_info = PythonOperator(
    task_id='fetch_books_info',
    python_callable=fetch_books_info,
    dag=dag
)

upload_to_s3 = PythonOperator(
    task_id='upload_to_s3',
    python_callable=upload_to_s3,
    op_kwargs={
            'filename':'/opt/airflow/data/raw/books.parquet',
            'key': 'raw/books.parquet',
            'bucket_name': 'aladin-books-bucket'
        },
    dag=dag
)

fetch_books_info >> upload_to_s3

 
학생때 만들었던 프로젝트의 코드를 약간 수정해 사용했다. 임의로 매월 수행되도록 만들었고, 파일은 Parquet 형식으로 저장했다.
 
 

 
사내에서 파일을 생성하는 것처럼 파일 명을 books_{YYYYMM}.parquet과 같이 바꾸면 매달 데이터를 적재하는 파이프라인을 만들 수 있을 것 같다.
 


 

 

Airflow 와 AWS S3 연결하기 + S3 DAG example

이미 초기 학습데이터를 구성했지만, 시간이 지남에 따라 추가로 활용할 수 있는 데이터가 DB에 저장되고 있어 학습 데이터를 주기적인 업데이트가 필요하다.

velog.io

 

Educative Answers - Trusted Answers to Developer Questions

Level up your coding skills. No more passive learning. Interactive in-browser environments keep you engaged and test your progress as you go.

www.educative.io