Dagster
대시보드를 업데이트하거나, 파이프라인을 꽂아 데이터를 추출 및 적재해야하는 상황 또는 모델의 학습과 데이터 전처리 자동화 등 정기적으로 수행해야하는 업무들이 있습니다. 이러한 일들은 꼭 필요한 작업이지만 매번 사람이 직접 수행하기 어려운 경우가 많습니다. 우리는 이러한 일들을 여러 툴을 사용해 자동화 시키곤 하는데, 이때 가장 자주 사용되는 것이 Airflow 입니다.
airflow는 python 기반으로 작성이 가능하다는 점에서 데이터 분석가도 쉽게 사용할 수 있는 장점이 있으며, 방대한 생태계와 잘 정리된 문서로 쉽게 도움을 받을 수 있다는 장점이 있습니다. 또한 나름 괜찮은 UI를 제공하고 있기도 합니다. 이러한 사유로 여태껏 airflow는 워크플로엔진의 탑티어로 군림하고 있으며 어마어마한 커뮤니티와 생태계를 조성하였습니다. 이렇게 독보적인 주체가 있는 시장에는 후발주자가 아무리 발전된 서비스를 제공하더라도 성공하기 쉽지 않습니다.
그럼에도 시장에 진입하여 괜찮은 성능과 사용성으로 입소문을 타고 있는 오픈소스 워크플로우엔진 "Dagster" 을 소개합니다.
Dagster 개발자는 Dagster를 데이터 플랫폼 엔지니어, 데이터 엔지니어 및 풀스택 데이터 과학자를 위해 설계된 ML과 ETL, 분석을 위한 data orchestrator 라 소개하고 있습니다. 따라서 Dagster의 목표는 ML 엔지니어 - 데이터 분석가 - 데이터 엔지니어간 협력을 용이하게 하고 쉽고 빠르게 서비스를 테스트 및 배포 하는 것을 목표로 합니다.
- Dagster는 workflow 구성 단계에서 구성 단계별 종속성을 정의하기 때문에 단계별 디버깅에 유리합니다.
- Dagster는 단계별 데이터의 저장 및 기억, 전달이 가능합니다.
- Dagster의 repository 개념을 적용하여 의존성 문제를 해결합니다.
- Dagster는 매우 강력한 UI를 제공하여 코드에 대한 개념이 부족한 사용자도 쉽게 사용 가능하도록 합니다.
- Dagster는 다양한 서비스와 도구의 통합 환경을 지원합니다.
- Apache Airflow/Spark, Dask, Datadog, snowflake, AWS, Azure, GCP, slack 등
함께 읽으면 좋은 자료
- Moving past Airflow: Why Dagster is the next-generation data orchestrator
- Dagster and dbt: Better Together
- Dagster Document
Dagster 맛보기
- Dagster 의 아주 기초적인 구성 개념과 몇가지 항목을 맛보기 정도로만 훑습니다.
- Dagster는 예제를 굉장히 잘 만들어 놓은 오픈소스 라이브러리 중 하나 입니다. 실제 개발을 위한 정보를 얻고자 한다면 Dagster Github의 examples 코드를 읽어보시는 것을 추천드립니다.
Setting
Dagster : pip install dagster dagit
Build Workflow
In Airflow
- Apache-Airflow 에서는 각각의 Job간의 관계와 작업순서를 ">>" 라는 표현식으로 구성
- 각각의 작업은 여러 Operator 들로 정의됨
- 작업 스케쥴링은 크론탭(cron) 형식으로 설정
- 하나의 스크립트(.py) 가 하나의 workflow 가 되며 DAG 이라 정의
In Dagster
- Dagster 에서 Job은 "solid" 이며 workflow는 "pipeline" 으로 구현...Management of computed data is the primary concern of Dagster. Another name for computed data would be software-structured data. Or SSD. Given that SSD is already a well-known acronym for Solid State Drives we named our core concept for software-structured data a Solid.
- 각각의 solid와 pipeline 은 데코레이터로 정의됨
- 작업 스케쥴링도 별도의 데코레이터로 정의되어 있으며, 용도에 따라 파티션/비파티션으로 각각 제공
- 하나의 스크립트에 다수의 pipeline을 포함할 수 있으며, 이를 통해 여러 workflow를 구현할 수 있음. 이런 스크립트 하나를 repository 라 정의
Solid
- 기본적으로 solid는 별도의 설정없이 @solid 데코레이터를 붙여주는것만으로도 동작 가능함
- 각각의 solid는 별도의 input 과 output을 가질 수 있으며, solid 간 변수 상속이 가능함
- pipeline 으로 묶지 않고 solid 단일 실행도 가능함 ( dagster.execute_solid )
from dagster import solid
@solid
def my_input_solid(abc, xyz):
pass
@solid
def my_output_solid():
return 5
- solid에 input이 있는 경우 반드시 input이 모두 전달되어야 실행됨. 이를 확인하기 위해 입력을 확인하는 함수를 포함할 수 있음
from dagster import solid, InputDefinition, DagsterType
MyDagsterType = DagsterType(type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType")
@solid(input_defs=[InputDefinition(name="abc", dagster_type=MyDagsterType)])
def my_typed_input_solid(abc):
pass
- 각각의 solid는 context object를 갖고 있으며 아래 네가지 항목을 제공함
- 솔리드 구성 ( context.solid_config )
- 로거 ( context.log )
- 리소스 ( context.resource s)
- 현재 실행 ID : ( context.run_id )
- 각각의 solid는 pipeline을 통해 종속성이 생성되지만, Composite Solid 를 통해 solid 자체의 종속성을 사전에 정의할수도 있음
- 아래 예제의 경우 add_n_times_m_solid 는 add_n solid의 input이 정확히 들어오고, 올바르게 실행되야만 실행되게됨(종속 관계 형성)
@solid(config_schema={"n": int})
def add_n(context, number: int):
return number + context.solid_config["n"]
@solid(config_schema={"m": int})
def multiply_by_m(context, number: int):
return number * context.solid_config["m"]
@composite_solid(input_defs=[InputDefinition("number", int)])
def add_n_times_m_solid(number):
return multiply_by_m(add_n(number))
- Factory pattern 구현 가능
def x_solid(
arg,
name="default_name",
input_defs=None,
**kwargs,
):
"""
Args:
args (any): One or more arguments used to generate the nwe solid
name (str): The name of the new solid.
input_defs (list[InputDefinition]): Any input definitions for the new solid. Default: None.
Returns:
function: The new solid.
"""
@solid(name=name, input_defs=input_defs or [InputDefinition("start", Nothing)], **kwargs)
def _x_solid(context):
# Solid logic here
pass
return _x_solid
Pipeline
- 워크플로우 흐름대로 실행이 설정된 메소드에 @pipeline 데코레이터를 붙여주기만 하면 구성이 끝남
execute_pipeline(pipeline method)
로 실행- 동일한 solid를 여려번 호출할 경우 실행순서에 맞게 자동으로 별칭 지정(method_name_n)
- 아래 예제의 경우 add_one, add_one_2
@solid
def return_one(context):
return 1
@solid
def add_one(context, number: int):
return number + 1
@pipeline
def multiple_usage_pipeline():
add_one(add_one(return_one()))
if __name__ == "__main__" :
execute_pipeline(multiple_usage_pipeline)
- 별도로 이름/태그를 지정하고 싶다면, 아래와 같은 방법을 사용
@pipeline
def alias_pipeline():
add_one.alias("second_addition")(add_one(return_one()))
@pipeline
def tag_pipeline():
add_one.tag({"my_tag": "my_value"})(add_one(return_one()))
- 환경 관리가 가능 함. mode 참고!
dev_mode = ModeDefinition("dev")
staging_mode = ModeDefinition("staging")
prod_mode = ModeDefinition("prod")
@pipeline(mode_defs=[dev_mode, staging_mode, prod_mode])
def my_modes_pipeline():
my_solid()
- 실행 환경도 사전에 정의하여 사용가능함
@pipeline(
preset_defs=[
PresetDefinition(
name="one",
run_config={"solids": {"add_one": {"inputs": {"number": 1}}}},
),
PresetDefinition(
name="two",
run_config={"solids": {"add_one": {"inputs": {"number": 2}}}},
),
]
)
def my_presets_pipeline():
add_one()
- 분기를 설정할 수 있고 동적 매핑이 가능
- 동적 매핑이 설정을 잘해도 문제를 일으키는 경우가 종종 있어 사용을 추천하지는 않음.. 다음 버전을 기대해보면 좋을듯
@solid(
output_defs=[
OutputDefinition(name="branch_1", is_required=False),
OutputDefinition(name="branch_2", is_required=False),
]
)
def branching_solid():
num = random.randint(0, 1)
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@solid
def branch_1_solid(_input):
pass
@solid
def branch_2_solid(_input):
pass
@pipeline
def branching_pipeline():
branch_1, branch_2 = branching_solid()
branch_1_solid(branch_1)
branch_2_solid(branch_2)
- sensor 데코레이터를 사용하여 다른 파이프라인의 실행 여부 등 외부이벤트를 사용해 다른 동작이 실행되도록 할 수 있음
- 이를 통해 보다 쉽게 멱등한 워크플로우를 구성할 수 있음
@solid(config_schema={"filename": str})
def process_file(context):
filename = context.solid_config["filename"]
context.log.info(filename)
@pipeline
def log_file_pipeline():
process_file()
@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor(_context):
for filename in os.listdir(MY_DIRECTORY):
filepath = os.path.join(MY_DIRECTORY, filename)
if os.path.isfile(filepath):
yield RunRequest(
run_key=filename,
run_config={"solids": {"process_file": {"config": {"filename": filename}}}},
)
Schedules
- airflow가 Dag configuration 에서 schedule_interval 을 지정하는것과 달리 스케쥴링도 데코레이터 형태로 개별 생성함
- 정해진 날짜, 요일 마다 실행하고싶으면 제공되는 partition Sshedule 데코레이터를 사용하면 됨
- 매 시간, 분 단위로 실행하고 싶으면 schedule 데코레이터로 cron tab을 구성하여 사용
- Timezone을 지원하며 인수로 datetime object를 받기 때문에 airflow에 비해 스케쥴링 관리나 구성 난이도는 낮음
@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2021, 1, 1),
execution_time=datetime.time(11, 0),
execution_timezone="US/Central",
)
def my_daily_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_execution_time_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"solids": {"dataset_name": "my_dataset", "execution_date": date}}
Test and Debug
- 강력한 예외처리(에러 핸들링) 기능을 갖고 있어 로깅이 굉장히 용이함
- text, float, int, json, markdown, path, url, callable method 지원
- 주로 사용하는 메일링, 슬랙 알람 등 메소드를 제공하여 에러 알람 구성이 쉬움
@solid
def my_retry_solid():
try:
result = flaky_operation()
except:
raise RetryRequested(max_retries=3)
return result
@solid
def my_failure_solid():
data_path = "/oh/my/path"
try:
data = pd.read_csv(data_path)
except:
raise Failure(
description="No files to process",
metadata={
"path": EventMetadata.path(data_path),
},
)
Workflow UI
- dagster의 ui 는 굉장히 많은 정보를 제공해줌
- 기본적으로 repository를 선택하면 해당 repository의 solid와 sensor, pipeline, scheduler 정보와 실행 여부를 보여줌
- solid tab에서는 각각의 solid가 어떤 input을 받아 어떤 output을 냈는지 확인할 수 있으며 구성된 형태를 관찰 가능
- 각각의 pipeline 클릭시 마찬가지로 세부정보 확인가능
- 최근 실행 시점
- 파이프라인의 구성
- 실행 기록(id) 를 클릭하면 세부 실행 내역을 제공
- 시간별 스냅샷을 제공하여 실행과정과 작업 흐름을 파악하고, 디버깅하기 매우 유용
- 각 스텝별 경과시간과 하위작업 등을 간트차트로 제공하여 리팩토링 우선순위 설정에 유리
- logging 을 구현한 경우 함께 관찰할 수 있음