Airflow(GCP Composer) 에서 KubernetesPodOperator 사용
2023. 4. 23. 13:27
Data/Data Engineering
Airflow(Google Composer2) 를 활용해 워크플로우 오케스트레이션을 하고 있습니다. 최근 사내 파이프라인에 DBT 를 도입하다보니 composer 2의 기본 패키지 의존성과 DBT의 패키지 의존성간에 충돌이 있기에 컨테이너 기반으로 DBT 를 활용하도록 KubernetesPodOperator를 고려하였습니다. 본 문서는 비공개 환경의 k8s(GKE) 기반 airflow(GCP Composer)에서 KubernetesPodOperator를 사용하는 방법에 대해 안내합니다. 참고로 composer 1 과 composer2 에서 사용 설정이 일부 상이합니다. 본 문서는 autopilot 으로 관리되는 composer2 환경에서 KubernetesPodOperator 사용법을 설명합니다. 환경 ..
Airflow 사용 시 AssertionError: daemonic processes are not allowed to have children 의 해결
2022. 10. 10. 23:37
Data/Data Engineering
현상 파악 Airflow 2.0 버전 대에서 concurrent.futures.ProcessPoolExecutor 사용 시 AssertionError: daemonic processes are not allowed to have children 에러를 경험하였다. (local , celery, kubernates 모두 같은 현상을 재현할 수 있었음 ) 리서치 결과 정확히는 multiprocessing 패키지에서 발생하는 문제라는 것을 알 수 있었다. concurrent.futures python 작업 시 multiprocessing이나 multithread 작업을 위해 자주 사용하는 패키지로 사진의 에러를 발생시킨 것은 ProcessPoolExecutor 사용 시 경험했다. 이 ProcessPoolExe..
[Airflow] 자주 쓰는 Branch Task
2022. 7. 21. 13:05
Data/Data Engineering
I. 개요 일련의 작업 진행 시 상황에 따라 다른 작업으로 이어져야 하는 경우는 굉장히 빈번하게 발생한다. Airflow 는 기본적으로 DAG 으로 작업을 구조화해서 작업을 진행하기 때문에, 자동화할 때 이러한 조건부 작업을 구현하지 못한다면 매번 실패 후 재처리하는 작업이 필요하다. 기본적이지만 자주 사용되는 Branch task 인 BranchPythonOperator 와 BranchSQLOperator 의 사용법과 예제를 기록해둔다. II. Branch Task 1. BranchPythonOperator PythonOperator 기반으로 구성되어 task_id(s) 를 output 으로 하는 Python callable 을 통해 바로 다음에 이어지는 작업 요소를 결정한다. BranchPythonO..
[Dagster/Orchestrator] Airflow를 이길수있을까? 새로운 data orchestrator, Dagster 맛보기
2021. 5. 22. 18:24
Data/Data Engineering
Dagster 대시보드를 업데이트하거나, 파이프라인을 꽂아 데이터를 추출 및 적재해야하는 상황 또는 모델의 학습과 데이터 전처리 자동화 등 정기적으로 수행해야하는 업무들이 있습니다. 이러한 일들은 꼭 필요한 작업이지만 매번 사람이 직접 수행하기 어려운 경우가 많습니다. 우리는 이러한 일들을 여러 툴을 사용해 자동화 시키곤 하는데, 이때 가장 자주 사용되는 것이 Airflow 입니다. airflow는 python 기반으로 작성이 가능하다는 점에서 데이터 분석가도 쉽게 사용할 수 있는 장점이 있으며, 방대한 생태계와 잘 정리된 문서로 쉽게 도움을 받을 수 있다는 장점이 있습니다. 또한 나름 괜찮은 UI를 제공하고 있기도 합니다. 이러한 사유로 여태껏 airflow는 워크플로엔진의 탑티어로 군림하고 있으며 어..