2025. 1. 19. 23:58ㆍData Engineering/BigQuery
들어가며
현재 제가 속한 데이터 팀에서는 준실시간 분석 요청에 대응하기 위해 Kafka와 BigQuery를 활용한 CDC 기반 데이터 파이프라인을 구축하고 있습니다. 그러나 Kafka 커넥터를 사용하는 과정에서 데이터 규모 증가로 인한 비용 문제 발생 가능성이 제기되었습니다. 이번 글에서는 Kafka 커넥터와 BigQuery MERGE 문을 사용한 기존 파이프라인의 한계를 살펴보고, 이를 개선하기 위해 테스트한 과정을 공유하고자합니다.
✔️ CDC(Change Data Capture)
이름 그대로 데이터 베이스의 변경 사항을 실시간으로 추적하고 캡처하는 기술입니다. 데이터 웨어하우스 등 시스템으로의 동기화에 사용됩니다. 실시간 분석, 이벤트 기반 아키텍처 등에 활용할 수 있습니다.
1. 기존 구조
- Kafka에서 생성된 토픽을 [1] BigQuery Sink Connector를 통해 테이블로 수집
- BigQuery Sink Connector의 upsert=True, delete=True 옵션을 사용해 변경 사항을 15분에 한번씩 반영
2. 문제 상황
데이터 INSERT 뿐만 아니라 UPDATE, DELETE 까지 반영하기 위해 커넥터 내부적으로 MERGE문이 사용되었습니다. 실행 쿼리를 확인해보았을 때 변경 데이터의 크기에 관계없이 타겟 테이블을 풀 스캔하는 구조였습니다. 예를 들어 수강 기록 데이터와 같이 크기가 큰 테이블의 경우 비용이 증가할 가능성이 있었습니다.
✔️ BigQuery의 MERGE문
[2] MERGE문은 INSERT, UPDATE, DELETE 연산을 하나의 문으로 결합하여 원자적으로 연산을 수행할 수 있는 DML 문입니다. 커넥터에서는 아래와 같은 MERGE문을 사용하고 있었습니다(깃허브의 코드를 예시로 변환하였습니다).
MERGE target_table AS target
USING (
-- intermediate table 중복 key에 대한 최신 데이터만 추출
SELECT *
FROM (
SELECT ARRAY_AGG(
x ORDER BY i DESC LIMIT 1
)[OFFSET(0)] AS src
FROM intermediate_table AS x
WHERE batch_number = batch_umber
GROUP BY key
)
) AS source
ON target.key = src.key
-- 1) key 값과 value가 둘다 있는 경우 업데이트
WHEN MATCHED AND src.value IS NOT NULL THEN
UPDATE SET
target.col1 = src.value.col1,
target.col2 = src.value.col2
-- 2) key값이 있으나 value가 없는 경우 삭제
WHEN MATCHED AND src.value IS NULL THEN
DELETE
-- 3) key값이 없으나 value가 있는 경우 삽입
WHEN NOT MATCHED AND src.value IS NOT NULL THEN
INSERT (key, col1, col2)
VALUES (
src.key,
src.value.col1,
src.value.col2
);
3. 해결 방법
1) 클러스터링 추가
임시 테이블의 ID 컬럼을 기준으로 클러스터링을 적용하고 MERGE문을 실행하였습니다.
- 실행 결과: 클러스터링을 적용하기 전과 후의 예상 비용은 약 29GB로 차이가 없었지만, 실제 청구된 바이트 수는 클러스터링 후 1.2GB로 약 28배 정도의 차이가 나는 것을 확인할 수 있었습니다.


클러스터링을 통해 데이터 스캔 범위를 줄일 수 있었지만, 여전히 업데이트되는 데이터의 크기에 비해 높은 비용이었습니다. 이를 개선하기 위해 파티셔닝과 클러스터링을 같이 적용하고 MERGE문을 최적화해야겠다고 생각했습니다.
2) 데이터 처리 구조 변경
a. 스테이징 테이블 생성
Kafka 커넥터를 INSERT 방식으로 수집하는 테이블을 생성하였습니다. 수집 시간인 kafka_ts를 기준으로 파티셔닝 하였습니다.
b. 타겟 테이블 생성
분석용 데이터를 저장하는 타겟 테이블을 생성하였습니다. 데이터 생성 시간 (예) created_at)을 기준으로 파티셔닝 하였습니다. 분석에 필요하지 않은 메타 데이터(kafka offset, topic 등)는 컬럼에서 제외하였습니다.
CREATE TABLE DEV_TEST.TARGET_TABLE
PARTITION BY DATE(created_at)
CLUSTER BY id AS
SELECT
[COLUMN_NAME] (...)
FROM [DATASET_NAME].[TABLE_NAME];
c. MERGE문 수정
[3] BigQuery MERGE문 최적화 글을 참고해 파티션 프루닝을 적용할 수 있도록 쿼리를 수정하였습니다.
DECLARE src_range STRUCT<date_min DATE, date_max DATE>;
SET src_range=(
SELECT
STRUCT(
MIN(DATE(created_at)) AS date_min,
MAX(DATE(created_at)) AS date_max
)
FROM DEV_TEST.STAGING_TABLE
);
- 수정 사항: MIN(created_at)과 MAX(created_at) 값을 미리 계산해 타겟 테이블의 데이터 스캔 범위를 제한
- 주의 사항: 값을 미리 계산하지 않고 런타임 중 계산하는 경우 파티션 프루닝을 할 수 없으니 주의
아래는 수정된 쿼리의 예시입니다.
DECLARE src_range STRUCT<date_min DATE, date_max DATE>;
SET src_range=(
SELECT
STRUCT(
MIN(DATE(created_at)) AS date_min,
MAX(DATE(created_at)) AS date_max
)
FROM DEV_TEST.STAGING_TABLE
);
MERGE DEV_TEST.TARGET_TABLE AS target USING (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY id ORDER BY kafka_ts DESC) seq_no
FROM DEV_TEST.STAGING_TABLE x
QUALIFY seq_no=1
) src
ON target.id = src.id
AND DATE(target.created_at) BETWEEN src_range.date_min AND src_range.date_max
(...)
*Kafka에서 자동으로 생성되는 임시 테이블과 스테이징 테이블의 구조가 달라 쿼리를 수정하였습니다(이때 id는 수집해오는 DB에서 PK 역할을 합니다).

- 실행 결과: 파티션 적용과 MERGE문 최적화를 통해 약 18MB를 처리 하는 것을 확인하였습니다. 테이블 크기가 12GB인 것을 고려했을때 이는 기존 방식에 비해 600배 감소한 것입니다.
4. 결론
Kafka 커넥터에서 UPSERT 및 DELETE 설정 시 생성되는 MERGE문의 비용 문제를 해결하기 위해 파티셔닝과 클러스터링을 적용하였고, 쿼리를 최적화하여 데이터 스캔 범위를 줄였습니다. 이번 테스트를 통해서 데이터가 계속 증가하는 환경에서 BigQuery의 비용 최적화와 성능 개선에 대해서 배울 수 있었습니다.
테스트 결과를 운영에 적용하기 위해 주기적으로 MERGE문을 실행하는 Airflow DAG를 만들고, 일일 데이터 처리량을 모니터링하면 좋을 것 같습니다. 그럼 다음 글로 돌아오도록 하겠습니다. 읽어주셔서 감사합니다☘️
참고 자료
[1] Kafka Connect BigQuery Connector
GitHub - confluentinc/kafka-connect-bigquery: A Kafka Connect BigQuery sink connector
A Kafka Connect BigQuery sink connector. Contribute to confluentinc/kafka-connect-bigquery development by creating an account on GitHub.
github.com
Data manipulation language (DML) statements in GoogleSQL | BigQuery | Google Cloud
Send feedback Stay organized with collections Save and categorize content based on your preferences. Data manipulation language (DML) statements in GoogleSQL The BigQuery data manipulation language (DML) enables you to update, insert, and delete data from
cloud.google.com
Bigquery Merge Optimization through Partition Pruning
Lets understand Partition Pruning using the Merge Statement.
medium.com
'Data Engineering > BigQuery' 카테고리의 다른 글
BigQuery에서 Nested Data의 처리: Dremel 논문으로 이해하기②🕵️♀️ (0) | 2025.03.16 |
---|---|
BigQuery에서 Nested Data의 처리: Dremel 논문으로 이해하기①🕵️♀️ (0) | 2025.03.02 |
데이터 분석의 경계에 선 엔지니어의 <인프런 BigQuery(SQL) 활용편>후기 (feat. 빠짝 스터디)🌱 (2) | 2024.11.24 |
이직 5개월 차의 BigQuery 최적화: 파티셔닝과 클러스터링으로 시작하기 (1) | 2024.10.27 |