반응형

Airflow(Google Composer2) 를 활용해 워크플로우 오케스트레이션을 하고 있습니다. 최근 사내 파이프라인에 DBT 를 도입하다보니 composer 2의 기본 패키지 의존성과 DBT의 패키지 의존성간에 충돌이 있기에 컨테이너 기반으로 DBT 를 활용하도록 KubernetesPodOperator를 고려하였습니다.

본 문서는 비공개 환경의 k8s(GKE) 기반 airflow(GCP Composer)에서 KubernetesPodOperator를 사용하는 방법에 대해 안내합니다.

참고로 composer 1 과 composer2 에서 사용 설정이 일부 상이합니다. 본 문서는 autopilot 으로 관리되는 composer2 환경에서 KubernetesPodOperator 사용법을 설명합니다.

환경

  • gcp composer 2.1 (airflow -v 2.4.3)
  • 클러스터 환경
    • 클러스터 : 비공개
    • VPC 기반 트래픽 라우팅 : 사용
    • 제어 영역 승인 네트워크 : 서비스 한정
  • gcloud -v
    • Google Cloud SDK 412.0.0
    • core 2022.12.09
    • gcloud-crc32c 1.0.0

KubernetesPodOperator란?

KubernetesPodOperator는 Composer 환경의 일부인 GKE 로 배포하여 pod을 실행하는 operator 입니다. google composer 2 가 배포되는 클러스터는 google autopilot ( Google에서 노드, 확장, 보안, 기타 사전 구성된 설정을 포함한 클러스터 구성을 관리하는 GKE의 작동 모드 ) 으로 관리되기 때문에 KubernetesPodOperator 으로 여러 pod을 배포해도 전체 시스템 성능에 영향을 주지 않기 때문에 k8s cluster 를 손쉽게 사용할 수 있는 대안이 될 수 있습니다.

만약 다른 환경 클러스터에 배포 및 관리를 하고 싶다면 Google Kubernetes Engine 연산자를 활용한 GKECreateClusterOperator, GKEStartPodOperator, GKEDeleteClusterOperator 와 같은 operator 를 활용할 수 있습니다.

KubernetesPodOperator 사용 설정

k8s 클러스터 사용자 인증

클러스터에서 액션을 위해 아래와 같이 사용자 인증 정보를 가져옵니다.

gcloud container clusters get-credentials {CLUSTER_NAME} --region {CLUSTER_REGION} --project {GCP_PROJECT_NAME}

->
Fetching cluster endpoint and auth data.
kubeconfig entry generated for {CLUSTER_NAME}.

이때 kubectl 명령어 들을 날릴 때 다음과 같은 에러가 발생할 수 있습니다.

  • Unable to connect to the server: dial tcp IP_ADDRESS: connect: connection timed out
  • Unable to connect to the server: dial tcp IP_ADDRESS: i/o timeout

이러한 경우 클러스터 인증 정보를 가져오는 과정에서 잘못된 것 일 수 있으므로 다음과 같은 명령어를 통해 클러스터의 config 설정에 cluster context 와 외부 IP 주소 존재 여부를 확인합니다.

kubectl config view

설정에 문제가 없다면 비공개 클러스터 환경에 연결하려는 머신의 발신 IP 가 승인된 기존 네트워크 목록 ( 제어 영역 승인 네트워크 ) 에 포함되어 있지 않기 때문일 수 있습니다. 다음과 같은 명령어를 통해 승인된 네트워크를 확인합니다.

gcloud container clusters describe {CLUSTER_NAME} \
    --region={COMPUTE_REGION} \
    --project={PROJECT_ID} \
    --format "flattened(masterAuthorizedNetworksConfig.cidrBlocks[])"

-> 
masterAuthorizedNetworksConfig.cidrBlocks[0].cidrBlock: *.**.**.**/32
masterAuthorizedNetworksConfig.cidrBlocks[1].cidrBlock: **.**.***.**/32

제어 영역 승인 네트워크에 ip가 등록되어 있지 않다면 다음과 같은 절차를 따릅니다.

dig +short myip.opendns.com @resolver1.opendns.com

->
my.machine.ip.address
gcloud container clusters update {CLUSTER_NAME} --region {CLUSTER_REGION} --project {PROJECT_ID}\                                                                                                             --enable-master-authorized-networks \
--master-authorized-networks {exist_ip},{my.machine.ip.address/32}

->
Updating {CLUSTER_NAME}...done.     
To inspect the contents of your cluster, go to: https://console.cloud.google.com/kubernetes/workload_/gcloud/{CLUSTER_REGION}/{CLUSTER_NAME}?project={PROJECT_ID}

다시 한번 아래 명령어로 클러스터의 사용자 인증정보를 가져옵니다.

gcloud container clusters get-credentials {CLUSTER_NAME} --region {CLUSTER_REGION} --project {GCP_PROJECT_NAME}

# Cloud shell 에서 진행하였거나 여전히 connect server error 아래 명령어로 재시도
gcloud container clusters get-credentials {CLUSTER_NAME} --region {CLUSTER_REGION} --project {GCP_PROJECT_NAME} --internal-ip

워크로드 아이덴티티 설정

Composer 2 에서 클러스터는 워크로드 아이덴티티를 사용합니다. 따라서 새로 생성된 namespace 나 default 에서 실행되는 pod은 google cloud resource에 접근하지 못합니다.

사용자는 2가지 옵션을 선택할 수 있습니다.

  • 기 구성된 composer-user-workloads namespace 를 사용한다
  • 자체 namespace 를 구성하는 경우 적절한 IAM 바인딩이 생성되도록 하namespace 연결된 k8s 서비스 계정과 google service 계정을 매핑한다.

아래 내용은 2번째 옵션인 자체 namespace 를 구성하는 예입니다. 구글 서비스 계정은 이미 존재한다고 가정합니다.

# namespace 생성
kubectl create namespace data-engineering
# Kubernetes 서비스 계정을 namespace 에 생성
kubectl create serviceaccount data-engineering \                     
    --namespace data-engineering
# Kubernetes 서비스 계정과 IAM 서비스 계정 사이에 IAM 정책 바인딩.
# 이렇게 하면 Kubernetes 서비스 계정이 IAM 서비스 계정처럼 동작할 수 있음
gcloud iam service-accounts add-iam-policy-binding {GOOGLE_SA} \ 
    --role roles/iam.workloadIdentityUser \
    --member "serviceAccount:project-id.svc.id.goog[namespace/k8s_SA]"
# Kubernetes 서비스 계정에 주석 달아 놓기
kubectl annotate serviceaccount {k8s_SA} \
    --namespace {NAMESPACE} \
    iam.gke.io/gcp-service-account={GOOGLE_SA}

KubernetesPodOperator 사용

KubernetesPodOperator 설치

composer2 로 구성되는 airflow 에는 기본적으로 KubernetesPodOperator 를 사용하기 위한 패키지가 사전에 설치되어 있지만 패키지가 없는 경우 PYPI 패키지에 apache-airflow-providers-cncf-kubernetes 를 추가하거나 pip install 합니다.

KubernetesPodOperator 선언

KubernetesPodOperator를 사용하려면 DAG 파일에서 다음과 같이 import합니다.

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

그리고 다음과 같이 KubernetesPodOperator를 정의합니다.

KubernetesPodOperator(
        task_id = "task_id", # airflow 워크로드에 생성될 task id
        name = "pod_id", # 생성될 Pod id
        namespace = "my_namesapce", # 기 생성한 namesapce 이름
        image = "image path", # 사용할 docker container image,
        image_pull_policy = "Always", # 이미지를 매번 새로 pull 할 것인지 cache 를 사용할 것인지에 대한 옵션
        cmds= ["entriy point"], # 이미지의 Entrypoint. 선언하지 않으면 이미지의 Entrypoint 를 사용
        arguments= ["my", "command"], # Entrypoint argument. 선언하지 않으면 이미지의 CMD 를 사용 
        get_logs=True,  # pod 의 로그 출력 여부
        log_events_on_failure=True,  # pod 동작 실패시 로그 출력 여부
        is_delete_operator_pod = True, # 동작 이후 pod 제거 여부
        service_account_name= "k8s_servce_account",
        config_file="/home/airflow/composer_kube_config",
        kubernetes_conn_id="kubernetes_default",
    )

실적용 예제

제 경우 KubernetesPodOperator 를 활용해 DBT 를 동작시키는게 목적이었습니다. 따라서 다음과 같이 구성하여 활용하고 있습니다.

더보기

dags/dependencies/k8s_operator.py

import json
import logging
from typing import Union
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

def generate_dbt_on_kubernetes(mode : str, target : str, tag : Union[str,list] = None , **context):
    """
        args:
            - mode (str) : 실행 옵션("run", "test")을 받음. 그 외의 경우 valueerror
            - target (str) : 실행 model("stage", "dw", "mart")를 받음. 그 외의 경우 valueerror
            - tag (str or list) : 모델에 정의된 tag 옵션
    """
    target_datetime = context.get("data_interval_end")
    date = target_datetime.strftime("%Y-%m-%d")    
    var_dict = {"execution_date" : date}
    vars = json.dumps(var_dict)

    if mode not in ["run", "test"]:
        raise ValueError("generate_dbt_on_kubernetes() : mode must be 'run' or 'test'")
    if target not in ["stage", "dw", "mart"] : 
        raise ValueError("generate_dbt_on_kubernetes() : target must be 'stage', 'test' or 'mart'")

    args = [f"{mode}", "--vars" , vars, "--profiles-dir", f"profile/{target}", "--select", f"path:models/{target}"]

    if tag is not None:
        if type(tag) == str : tag = [tag]
        tag_args =''
        for v in tag:
            tag_args += f"tag:{v},"
        args.append(tag_args.rstrip(','))    

    logging.info(f"generate_dbt_on_kubernetes() : dbt args : [{args}]")
    KubernetesPodOperator(
        task_id=f"{mode}-DBT-{target}",
        name=f"{mode}-DBT-{target}",
        ...
    ).execute(context)

...

dags/my_dag.py

...

from dependencies import k8s_operator

from airflow import models
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup

...

with models.DAG(
    ...
) as dag:

    with TaskGroup("EL_Job") as airbyte_job:
        ...

    with TaskGroup("T_job") as dbt_job:
        run_dbt_stage_task = PythonOperator(
            task_id='dbt_run_stage_model',
            provide_context=True,
            python_callable=k8s_operator.generate_dbt_on_kubernetes,
            op_kwargs={
                "mode": "run",
                "target": "stage",
                "tag" : ["hourly","staging"]
                },
            on_failure_callback = slack_alert
        )

                ...

 

참고

https://cloud.google.com/kubernetes-engine/docs/troubleshooting#connection_refused

https://cloud.google.com/kubernetes-engine/docs/how-to/private-clusters#public_cp

https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity?hl=ko&cloudshell=false#authenticating_to

반응형
복사했습니다!