Airflow(GCP Composer) 에서 KubernetesPodOperator 사용
2023. 4. 23. 13:27
Data/Data Engineering
Airflow(Google Composer2) 를 활용해 워크플로우 오케스트레이션을 하고 있습니다. 최근 사내 파이프라인에 DBT 를 도입하다보니 composer 2의 기본 패키지 의존성과 DBT의 패키지 의존성간에 충돌이 있기에 컨테이너 기반으로 DBT 를 활용하도록 KubernetesPodOperator를 고려하였습니다. 본 문서는 비공개 환경의 k8s(GKE) 기반 airflow(GCP Composer)에서 KubernetesPodOperator를 사용하는 방법에 대해 안내합니다. 참고로 composer 1 과 composer2 에서 사용 설정이 일부 상이합니다. 본 문서는 autopilot 으로 관리되는 composer2 환경에서 KubernetesPodOperator 사용법을 설명합니다. 환경 ..
Airflow 사용 시 AssertionError: daemonic processes are not allowed to have children 의 해결
2022. 10. 10. 23:37
Data/Data Engineering
현상 파악 Airflow 2.0 버전 대에서 concurrent.futures.ProcessPoolExecutor 사용 시 AssertionError: daemonic processes are not allowed to have children 에러를 경험하였다. (local , celery, kubernates 모두 같은 현상을 재현할 수 있었음 ) 리서치 결과 정확히는 multiprocessing 패키지에서 발생하는 문제라는 것을 알 수 있었다. concurrent.futures python 작업 시 multiprocessing이나 multithread 작업을 위해 자주 사용하는 패키지로 사진의 에러를 발생시킨 것은 ProcessPoolExecutor 사용 시 경험했다. 이 ProcessPoolExe..
[Airflow] 자주 쓰는 Branch Task
2022. 7. 21. 13:05
Data/Data Engineering
I. 개요 일련의 작업 진행 시 상황에 따라 다른 작업으로 이어져야 하는 경우는 굉장히 빈번하게 발생한다. Airflow 는 기본적으로 DAG 으로 작업을 구조화해서 작업을 진행하기 때문에, 자동화할 때 이러한 조건부 작업을 구현하지 못한다면 매번 실패 후 재처리하는 작업이 필요하다. 기본적이지만 자주 사용되는 Branch task 인 BranchPythonOperator 와 BranchSQLOperator 의 사용법과 예제를 기록해둔다. II. Branch Task 1. BranchPythonOperator PythonOperator 기반으로 구성되어 task_id(s) 를 output 으로 하는 Python callable 을 통해 바로 다음에 이어지는 작업 요소를 결정한다. BranchPythonO..