Airflow DAG를 사용해 파일을 Amazon s3에 업로드해보자.
*Docker로 Apache Airflow 설치하는 방법은 아래 글 참고
1) AWS S3 버킷 생성
2) Airflow Connection 생성
Amazon Web Services를 선택한 후 Extra 필드에 JSON 형식으로 AWS 자격 증명(access key)을 입력
3) 테스트 파일 생성
volumes에 ./data:/opt/airflow/data를 추가해 호스트의 경로와 컨테이너 내부 경로 연결 후 호스트 경로에 test.csv 파일 생성
4) DAG 작성
(코드는 이 블로그를 참고)
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def upload_to_s3 (filename: str, key: str, bucket_name: str) -> None:
hook = S3Hook('aws_conn')
hook.load_file(
filename=filename,
key=key,
bucket_name=bucket_name,
replace=True
)
with DAG (
dag_id='upload_to_s3',
schedule_interval=None,
start_date=datetime(2024, 1, 16),
catchup=False
) as dag:
upload = PythonOperator(
task_id='upload',
python_callable=upload_to_s3,
op_kwargs={
'filename':'/opt/airflow/data/test.csv',
'key': 'test/test.csv',
'bucket_name': 'aladin-books-bucket'
}
)
작성한 파이썬 파일은 호스트 경로의 dag 폴더에 저장
- 파이썬 실행 함수의 hook은 airflow가 외부 시스템과 상호작용 할 수 있도록 특정 API를 추상화한 것(예) SqliteHook, MySqlHook, S3Hook 등)이다.
- DAG의 catchup 변수는 DAG가 실행될 때 start_date 이후의 과거 실행을 따라잡을지 여부를 표현한다.
- 매개변수 중 key는 버킷 내부의 경로(prefix + object name)를 의미한다.
5) 컨테이너 재시작
수정된 docker-compose 파일을 적용하기 위해 docker-compose up 명령어 사용
docker-compose up --build -d
✔ --build: 컨테이너 시작 전에 이미지 빌드
✔ --detach (-d): 백그라운드에서 컨테이너 실행
6) DAG 실행
이때 권한 에러(Access Denied) 발생 시 IAM에서 사용자 권한정책 추가(AmazonS3FullAccess)
7) 결과 확인
💁♂️ 알라딘 상품 리스트 API 사용해보기
테스트 파일을 업로드 해보았으니 Open API를 사용해 S3에 데이터를 적재해보자.
*알라딘 Open API에 관한 정보는 여기 참고
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import requests
import json
import pandas as pd
def fetch_books_info(**context):
params = {
'ttbkey':'[발급받은 키 입력]',
'QueryType':'Bestseller',
'MaxResults':'50',
'SearchTarget':'Book',
'output':'JS',
'Version':'20131101',
'Cover':'Big'}
cat_list = {
'소설/시/희곡':'1',
'경제경영': '170',
'과학':'987',
'사회과학':'798',
'에세이':'55889',
'역사':'74',
'예술/대중문화':'517',
'인문학':'656',
'자기계발':'336',
'장르소설':'112011',
'가정/요리/뷰티':'1230',
'건강/취미/레저':'55890',
'좋은부모':'2030',
'컴퓨터/모바일':'351',
'여행':'1196',
'청소년':'1137',
'고전':'2105'}
df = pd.DataFrame() # 도서 정보를 저장할 데이터 프레임 생성
for cat_name in cat_list.values():
for page in range(1, 15): # 총 결과는 1,000개까지만 조회 가능(페이지 당 50권)
API_URL = f'http://www.aladin.co.kr/ttb/api/ItemList.aspx?&Start={page}&CategoryId={cat_name}'
raw_data = requests.get(API_URL, params=params)
try:
parsed_data = json.loads(raw_data.text, strict=False)
df = pd.concat([df, pd.DataFrame(parsed_data['item'])])
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
continue
# 부가 정보 필드 제거
df = df.drop('subInfo', axis=1)
# Parquet 파일로 저장
df.to_parquet(f"/opt/airflow/data/raw/books.parquet")
def upload_to_s3 (filename: str, key: str, bucket_name: str, **context) -> None:
hook = S3Hook('aws_conn')
hook.load_file(
filename=filename,
key=key,
bucket_name=bucket_name,
replace=True
)
dag = DAG(
'upload_to_s3',
start_date=datetime(2024, 1, 24),
catchup=False,
schedule_interval='@monthly'
)
fetch_books_info = PythonOperator(
task_id='fetch_books_info',
python_callable=fetch_books_info,
dag=dag
)
upload_to_s3 = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_to_s3,
op_kwargs={
'filename':'/opt/airflow/data/raw/books.parquet',
'key': 'raw/books.parquet',
'bucket_name': 'aladin-books-bucket'
},
dag=dag
)
fetch_books_info >> upload_to_s3
학생때 만들었던 프로젝트의 코드를 약간 수정해 사용했다. 임의로 매월 수행되도록 만들었고, 파일은 Parquet 형식으로 저장했다.
사내에서 파일을 생성하는 것처럼 파일 명을 books_{YYYYMM}.parquet과 같이 바꾸면 매달 데이터를 적재하는 파이프라인을 만들 수 있을 것 같다.
'Data Engineering > Apache Airflow' 카테고리의 다른 글
[Airflow] 튜토리얼 따라하기 (with 공식 문서🧐) (0) | 2022.06.02 |
---|---|
[Airflow] Docker로 Apache Airflow 설치하기 (with 공식 문서🧐) (0) | 2022.05.26 |