Airflow(Google Composer2) 를 활용해 워크플로우 오케스트레이션을 하고 있습니다. 최근 사내 파이프라인에 DBT 를 도입하다보니 composer 2의 기본 패키지 의존성과 DBT의 패키지 의존성간에 충돌이 있기에 컨테이너 기반으로 DBT 를 활용하도록 KubernetesPodOperator를 고려하였습니다.
본 문서는 비공개 환경의 k8s(GKE) 기반 airflow(GCP Composer)에서 KubernetesPodOperator를 사용하는 방법에 대해 안내합니다.
참고로 composer 1 과 composer2 에서 사용 설정이 일부 상이합니다. 본 문서는 autopilot 으로 관리되는 composer2 환경에서 KubernetesPodOperator 사용법을 설명합니다.
환경
- gcp composer 2.1 (airflow -v 2.4.3)
- 클러스터 환경
- 클러스터 : 비공개
- VPC 기반 트래픽 라우팅 : 사용
- 제어 영역 승인 네트워크 : 서비스 한정
- gcloud -v
- Google Cloud SDK 412.0.0
- core 2022.12.09
- gcloud-crc32c 1.0.0
KubernetesPodOperator란?
KubernetesPodOperator는 Composer 환경의 일부인 GKE 로 배포하여 pod을 실행하는 operator 입니다. google composer 2 가 배포되는 클러스터는 google autopilot ( Google에서 노드, 확장, 보안, 기타 사전 구성된 설정을 포함한 클러스터 구성을 관리하는 GKE의 작동 모드 ) 으로 관리되기 때문에 KubernetesPodOperator 으로 여러 pod을 배포해도 전체 시스템 성능에 영향을 주지 않기 때문에 k8s cluster 를 손쉽게 사용할 수 있는 대안이 될 수 있습니다.
만약 다른 환경 클러스터에 배포 및 관리를 하고 싶다면 Google Kubernetes Engine 연산자를 활용한 GKECreateClusterOperator, GKEStartPodOperator, GKEDeleteClusterOperator 와 같은 operator 를 활용할 수 있습니다.
KubernetesPodOperator 사용 설정
k8s 클러스터 사용자 인증
클러스터에서 액션을 위해 아래와 같이 사용자 인증 정보를 가져옵니다.
gcloud container clusters get-credentials {CLUSTER_NAME} --region {CLUSTER_REGION} --project {GCP_PROJECT_NAME}
->
Fetching cluster endpoint and auth data.
kubeconfig entry generated for {CLUSTER_NAME}.
이때 kubectl
명령어 들을 날릴 때 다음과 같은 에러가 발생할 수 있습니다.
Unable to connect to the server: dial tcp IP_ADDRESS: connect: connection timed out
Unable to connect to the server: dial tcp IP_ADDRESS: i/o timeout
이러한 경우 클러스터 인증 정보를 가져오는 과정에서 잘못된 것 일 수 있으므로 다음과 같은 명령어를 통해 클러스터의 config 설정에 cluster context 와 외부 IP 주소 존재 여부를 확인합니다.
kubectl config view
설정에 문제가 없다면 비공개 클러스터 환경에 연결하려는 머신의 발신 IP 가 승인된 기존 네트워크 목록 ( 제어 영역 승인 네트워크 ) 에 포함되어 있지 않기 때문일 수 있습니다. 다음과 같은 명령어를 통해 승인된 네트워크를 확인합니다.
gcloud container clusters describe {CLUSTER_NAME} \
--region={COMPUTE_REGION} \
--project={PROJECT_ID} \
--format "flattened(masterAuthorizedNetworksConfig.cidrBlocks[])"
->
masterAuthorizedNetworksConfig.cidrBlocks[0].cidrBlock: *.**.**.**/32
masterAuthorizedNetworksConfig.cidrBlocks[1].cidrBlock: **.**.***.**/32
제어 영역 승인 네트워크에 ip가 등록되어 있지 않다면 다음과 같은 절차를 따릅니다.
dig +short myip.opendns.com @resolver1.opendns.com
->
my.machine.ip.address
gcloud container clusters update {CLUSTER_NAME} --region {CLUSTER_REGION} --project {PROJECT_ID}\ --enable-master-authorized-networks \
--master-authorized-networks {exist_ip},{my.machine.ip.address/32}
->
Updating {CLUSTER_NAME}...done.
To inspect the contents of your cluster, go to: https://console.cloud.google.com/kubernetes/workload_/gcloud/{CLUSTER_REGION}/{CLUSTER_NAME}?project={PROJECT_ID}
다시 한번 아래 명령어로 클러스터의 사용자 인증정보를 가져옵니다.
gcloud container clusters get-credentials {CLUSTER_NAME} --region {CLUSTER_REGION} --project {GCP_PROJECT_NAME}
# Cloud shell 에서 진행하였거나 여전히 connect server error 아래 명령어로 재시도
gcloud container clusters get-credentials {CLUSTER_NAME} --region {CLUSTER_REGION} --project {GCP_PROJECT_NAME} --internal-ip
워크로드 아이덴티티 설정
Composer 2 에서 클러스터는 워크로드 아이덴티티를 사용합니다. 따라서 새로 생성된 namespace 나 default 에서 실행되는 pod은 google cloud resource에 접근하지 못합니다.
사용자는 2가지 옵션을 선택할 수 있습니다.
- 기 구성된
composer-user-workloads
namespace 를 사용한다 - 자체 namespace 를 구성하는 경우 적절한 IAM 바인딩이 생성되도록 하namespace 연결된 k8s 서비스 계정과 google service 계정을 매핑한다.
아래 내용은 2번째 옵션인 자체 namespace 를 구성하는 예입니다. 구글 서비스 계정은 이미 존재한다고 가정합니다.
# namespace 생성
kubectl create namespace data-engineering
# Kubernetes 서비스 계정을 namespace 에 생성
kubectl create serviceaccount data-engineering \
--namespace data-engineering
# Kubernetes 서비스 계정과 IAM 서비스 계정 사이에 IAM 정책 바인딩.
# 이렇게 하면 Kubernetes 서비스 계정이 IAM 서비스 계정처럼 동작할 수 있음
gcloud iam service-accounts add-iam-policy-binding {GOOGLE_SA} \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:project-id.svc.id.goog[namespace/k8s_SA]"
# Kubernetes 서비스 계정에 주석 달아 놓기
kubectl annotate serviceaccount {k8s_SA} \
--namespace {NAMESPACE} \
iam.gke.io/gcp-service-account={GOOGLE_SA}
KubernetesPodOperator 사용
KubernetesPodOperator 설치
composer2 로 구성되는 airflow 에는 기본적으로 KubernetesPodOperator 를 사용하기 위한 패키지가 사전에 설치되어 있지만 패키지가 없는 경우 PYPI 패키지에 apache-airflow-providers-cncf-kubernetes 를 추가하거나 pip install 합니다.
KubernetesPodOperator 선언
KubernetesPodOperator를 사용하려면 DAG 파일에서 다음과 같이 import합니다.
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
그리고 다음과 같이 KubernetesPodOperator를 정의합니다.
KubernetesPodOperator(
task_id = "task_id", # airflow 워크로드에 생성될 task id
name = "pod_id", # 생성될 Pod id
namespace = "my_namesapce", # 기 생성한 namesapce 이름
image = "image path", # 사용할 docker container image,
image_pull_policy = "Always", # 이미지를 매번 새로 pull 할 것인지 cache 를 사용할 것인지에 대한 옵션
cmds= ["entriy point"], # 이미지의 Entrypoint. 선언하지 않으면 이미지의 Entrypoint 를 사용
arguments= ["my", "command"], # Entrypoint argument. 선언하지 않으면 이미지의 CMD 를 사용
get_logs=True, # pod 의 로그 출력 여부
log_events_on_failure=True, # pod 동작 실패시 로그 출력 여부
is_delete_operator_pod = True, # 동작 이후 pod 제거 여부
service_account_name= "k8s_servce_account",
config_file="/home/airflow/composer_kube_config",
kubernetes_conn_id="kubernetes_default",
)
실적용 예제
제 경우 KubernetesPodOperator 를 활용해 DBT 를 동작시키는게 목적이었습니다. 따라서 다음과 같이 구성하여 활용하고 있습니다.
dags/dependencies/k8s_operator.py
import json
import logging
from typing import Union
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
def generate_dbt_on_kubernetes(mode : str, target : str, tag : Union[str,list] = None , **context):
"""
args:
- mode (str) : 실행 옵션("run", "test")을 받음. 그 외의 경우 valueerror
- target (str) : 실행 model("stage", "dw", "mart")를 받음. 그 외의 경우 valueerror
- tag (str or list) : 모델에 정의된 tag 옵션
"""
target_datetime = context.get("data_interval_end")
date = target_datetime.strftime("%Y-%m-%d")
var_dict = {"execution_date" : date}
vars = json.dumps(var_dict)
if mode not in ["run", "test"]:
raise ValueError("generate_dbt_on_kubernetes() : mode must be 'run' or 'test'")
if target not in ["stage", "dw", "mart"] :
raise ValueError("generate_dbt_on_kubernetes() : target must be 'stage', 'test' or 'mart'")
args = [f"{mode}", "--vars" , vars, "--profiles-dir", f"profile/{target}", "--select", f"path:models/{target}"]
if tag is not None:
if type(tag) == str : tag = [tag]
tag_args =''
for v in tag:
tag_args += f"tag:{v},"
args.append(tag_args.rstrip(','))
logging.info(f"generate_dbt_on_kubernetes() : dbt args : [{args}]")
KubernetesPodOperator(
task_id=f"{mode}-DBT-{target}",
name=f"{mode}-DBT-{target}",
...
).execute(context)
...
dags/my_dag.py
...
from dependencies import k8s_operator
from airflow import models
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
...
with models.DAG(
...
) as dag:
with TaskGroup("EL_Job") as airbyte_job:
...
with TaskGroup("T_job") as dbt_job:
run_dbt_stage_task = PythonOperator(
task_id='dbt_run_stage_model',
provide_context=True,
python_callable=k8s_operator.generate_dbt_on_kubernetes,
op_kwargs={
"mode": "run",
"target": "stage",
"tag" : ["hourly","staging"]
},
on_failure_callback = slack_alert
)
...
참고
https://cloud.google.com/kubernetes-engine/docs/troubleshooting#connection_refused
https://cloud.google.com/kubernetes-engine/docs/how-to/private-clusters#public_cp