반응형

사진 1. concurrent.futures 사용시 경험한 Error ( written by author )

현상 파악

Airflow 2.0 버전 대에서 concurrent.futures.ProcessPoolExecutor 사용 시 AssertionError: daemonic processes are not allowed to have children 에러를 경험하였다. (local , celery, kubernates 모두 같은 현상을 재현할 수 있었음 )

 

리서치 결과 정확히는 multiprocessing 패키지에서 발생하는 문제라는 것을 알 수 있었다.

 

concurrent.futures

python 작업 시 multiprocessing이나 multithread 작업을 위해 자주 사용하는 패키지로 사진의 에러를 발생시킨 것은 ProcessPoolExecutor 사용 시 경험했다. 이 ProcessPoolExecutor 클래스는 프로세스 풀을 사용하여 호출을 비동기적으로 실행 가능하게 한다. ( code )

 

pyhton에서 multiprocessing을 구현하는 경우 다음과 같은 특징을 갖는다

  • GIL의 영향을 받지 않고 병렬성을 구현하기 좋다! → 시스템적으로 보면 멀티 프로세싱이 아니라 멀티프로그래밍으로 처리해야 한다!
  • 이러한 이유로 CPU 부하가 큰 작업일수록 해당 클래스를 활용해서 작업하면 이점을 갖는다
  • 다만 개별 프로세스는 서로 데이터(메모리)를 공유하지 않고 간섭할 수 없기 때문에 copy 가 필요하여 리소스 낭비가 발생할 수 있다

 

때문에 multiprocessing은 보통 대량의 데이터를 한번에 처리할 때 많이 쓴다. 이미지 처리( 화소 값 분리 혹은 전환 (CMYK ↔ RGB), convolution 등) 나 신호 처리 ( FFT 등) 작업을 할 때 또는 어떤 연산 결과들을 병합하는 작업 등에 적합하다

 

현상 분석

def start(self):
        '''
        Start child process
        '''
        assert self._popen is None, 'cannot start a process twice'
        assert self._parent_pid == os.getpid(), \
               'can only start a process object created by current process'
        assert not _current_process._daemonic, \
               'daemonic processes are not allowed to have children'
        _cleanup()
        if self._Popen is not None:
            Popen = self._Popen
        else:
            from .forking import Popen
        self._popen = Popen(self)
        # Avoid a refcycle if the target function holds an indirect
        # reference to the process object (see bpo-30775)
        del self._target, self._args, self._kwargs
        _current_process._children.add(self)

AssertionError: daemonic processes are not allowed to have children 에러는 assert not _current_process._config.get(’daemon’) 조건을 만족시키지 못해서 발생 하는 것으로 데몬 프로세스는 자식 프로세스를 만들 수 없는데 현재 process 가 데몬 프로세스이므로 새로운 프로세스를 만드는 작업을 할 수 없다는 의미이다. 다만 이해할 수 없는 것은 airflow 1.x 버전에서는 문제가 생긴 적이 한 번 도 없다는 것…

 

프로세스와 데몬 프로세스

1) 프로세스

 

프로세스란 컴퓨터에서 실제로 실행되고 있는 프로그램 또는 명령을 의미한다. 언제나 유니크한 ID 값(PID) 를 갖는데, 때문에 같은 프로그램이나 명령을 반복적으로 요청하면 PID는 항상 바뀐다.

 

이 프로세스는 계층 구조를 형성할 수 있는데, 사용자가 최초에 생성한(실행한) 프로세스가 상위 프로세스 이며 이 상위 프로세스 동작 과정에서 필요시 하위 프로세스(자식 프로세스)를 생성하여 처리한다.

 

이때 자식 프로세스는 n>0 개일 수 있지만 자식 프로세스가 자신의 자식을 생성하는 것은 불가능하다.

 

2) 데몬 프로세스

 

OS적으로는 사용자가 실제로 실행시키지 않아도 지속적으로 background에 존재하는 프로세스로 둘 이상의 작업자 또는 프로세스가 공유할 수 있다.

 

시스템(프로세스)의 실행 시 생성되며 종료 시 제거되는 것으로 이때 작업이 끝나지 않았어도 종료(SIGTERM.KILLED) 되기 때문에 데몬 프로세스를 활용하는 경우 중도에 강제 종료되어도 문제가 없도록 고려해야 한다. (자식 프로세스는 다르다. 모든 자식 프로세스의 종료가 상위 프로세스의 종료보다 선행되기 때문)

 

프린터 연결 드라이버나 캡쳐보드와 같은 프로세스를 생각하면 된다.

 

파이썬은 인터프리터 언어로 모든 애플리케이션은 파이썬 인터프리터의 new main instance(process)에서 실행되고 os에서 관리된다.

 

파이썬의 multiprocessing 모듈 에서는 프로세스를 start() 하기 전에 daemon = True/False를 선언하여 생성할 수 있으며 그 개념과 같게 프로세스가 종료될 때 daemon 으로 선언된 모든 프로세스를 종료시킨다.( 당연히 이때 생성되는 데몬 프로세스는 unix daemon 이나 서비스와는 다르다.)

 

중요한 것은 daemon=True 인 프로세스는 자식 프로세스를 생성할 수 없는데, 상위 프로세스가 종료되어 daemon 프로세스가 제거될 때 자식 프로세스가 있다면 그 프로세스는 종료되지 않고 고아 상태로 남겨지기 때문이다. (고아는 좀비와는 조금 다른 개념이다)

 

Airflow에서 Process

....
def check_pid():
    print(f"start checking pid task 1")
    print(
        f"parent process : {os.getppid()} is daemon? : {multiprocessing.parent_process().daemon if multiprocessing.parent_process() is not None else None}"
    )
    print(
        f"process : {os.getpid()} is daemon? : {multiprocessing.current_process().daemon}"
    )
....
pp_task = PythonOperator(
        task_id="pp_task",
        python_callable=check_pid,
    )
{standard_task_runner.py:52} INFO - Started process 67058 to run task
{standard_task_runner.py:52} INFO - Job 54375: Subtask pp_task 
...

start checking pid task 1
parent process : 67046 is daemon? : None
process : 67058 is daemon? : True

os.getppid()os.geitpid() 를 통해 실제로 작업이 실행되고 있는 process를 알 수 있고, multiprocessing.<>.daemon 을 통해 각각의 프로세스의 데몬 세팅이 어떻게 되어 있는지 보고자 했다.

 

실제 작업이 실행되고 있는 current process 가 daemon process 임을 알 수 있는데, 이는 DAG의 정상 종료가 아닌 다른 이유로 상태가 바뀔 때 강제로 종료되어야 하기 때문으로 이해할 수 있다.

 

여기서 의문이었던 점은 왜 부모 프로세스인 67046의 daemon이 None 인가? 였다. (이는 multiprocessing.parent_process = None 임을 뜻한다.) 찾고 찾다가 답을 얻을 수 없어 직접 질문을 올린 결과 답변을 받을 수 있었다.
multiprocessing.parent_process()는 자식 프로세스가 선언될 때 결정되는데 이 경우 multiprocessing 은 별도의 프로세스를 선언한 적이 없기 때문이며 os를 통해 나온 결과는 실제 부모 거나 또 다른 프로세스 일 수 있다는 것이다.
...
def check_pid():
    print(f"start checking pid task 1")
    print(
        f"parent process : {os.getppid()} is daemon? : {multiprocessing.parent_process().daemon if multiprocessing.parent_process() is not None else None}"
    )
    print(
        f"process : {os.getpid()} is daemon? : {multiprocessing.current_process().daemon}"
    )

def process_function(i):
    parent_process = multiprocessing.parent_process().pid
    parent_process_daemon = (
        multiprocessing.parent_process().daemon
        if multiprocessing.parent_process() is not None
        else None
    )
    current_process = multiprocessing.current_process().pid
    is_daemon = multiprocessing.current_process().daemon
    result = (
        f"{i}th task : "
        + "partent_process : "
        + str(parent_process)
        + " is daemon : "
        + str(parent_process_daemon)
        + " current_process : "
        + str(current_process)
        + " is daemon : "
        + str(is_daemon)
    )
    time.sleep(3)
    return result

def mp(run_n: int):
    print(f"start checking multiprocessing pid task")
    print("[1] check pid using os modlue")
    print(f"parent process : {os.getppid()} current process : {os.getpid()}")
    print("[2] check pid using multiprocessing modlue")
    print(
        f"parent process : {multiprocessing.parent_process().pid if multiprocessing.parent_process() is not None else None} is daemon? : {multiprocessing.parent_process().daemon if multiprocessing.parent_process() is not None else None} process : {multiprocessing.current_process().pid} is daemon? : {multiprocessing.current_process().daemon}"
    )

    results = []
    print(f"start job")
    with concurrent.futures.ProcessPoolExecutor() as process_executor:
        for pp_res in process_executor.map(process_function, [i for i in range(run_n)]):
            results.append(pp_res)
    print(f"job done")
    for c in results:
        print(c)
...

pp_task = PythonOperator(
        task_id="pp_task",
        python_callable=check_pid,
    )

mpp_job = PythonOperator(
        task_id="mpp_job",
        python_callable=mp,
        op_kwargs={
            "run_n": 5,
        },
    )

pp_task >> mpp_job
{standard_task_runner.py:52} INFO - Started process 67070 to run task
{standard_task_runner.py:52} INFO - Job 54376: Subtask mpp_job
...
start checking multiprocessing pid task
[1] check pid using os modlue
parent process : 67069 current process : 67070

[2] check pid using multiprocessing modlue
parent process : None is daemon? : None process : 67070 is daemon? : True
...
0th task : partent_process : 67070 is daemon : False current_process : 67071 is daemon : True
1th task : partent_process : 67070 is daemon : False current_process : 67072 is daemon : True
2th task : partent_process : 67070 is daemon : False current_process : 67073 is daemon : True
3th task : partent_process : 67070 is daemon : False current_process : 67074 is daemon : True
4th task : partent_process : 67070 is daemon : False current_process : 67071 is daemon : True

조금 더 상황을 확장하면 어떨까? (여기서도 multiprocessing 에서 parent가 None인 것은 위에 기술한 이유와 같다.)

 

가장 먼저 눈에 보이는 차이는 pp_task 와 mpp_job은 1개의 파이썬 코드 내에서 실행되는 task이지만 별도의 job으로 분리되며 별도의 프로세스에서 동작함을 알 수 있다.

 

중요한 것은 해당 프로세스는 실제로 daemon process가 맞으나 ProcessPoolExecutor로 인해 프로세스가 생성될 때 daemon이 아닌 것으로 취급받는다. 현재 프로세스는 multiprocessing 기준 start() 되기 전에 daemon=True으로 설정되어 있지 않은 프로세스이기 때문에 False로 되어 있는 것으로 의심할 수 있다. (주의. 제가 틀렸거나 잘못된 설명 일 수 있습니다.)

 

지금까지 관찰한 결과를 정리하면 아래와 같다.

  1. DAG이 실행될 때 각각의 PythonOperator 는 Task 마다 새로운 Daemon process를 생성하여 작업한다
  2. 실제로 프로세스가 daemon이기 때문에 debuging 시 AssertionError: daemonic processes are not allowed to have children 를 발생시킨다.
  3.  multiprocessing 사용 시 해당 process 가 start() 될 때 daemon=True를 선언하지 않았기 때문에 daemon 프로세스로 취급되지 않고 자식 프로세스를 생성하는 것으로 의심된다.

 

그럼 지금까진 왜 됐다가 이제 안되는 건데..?

의문에 대한 해답은 실제로 같은 문제를 겪어 raise 된 issue를 통해 해소될 수 있었다. 이 issue는 celery 사용 시 동일한 에러 현상을 겪었던 사용자의 issue로 celery contributor인 Ask Solem 는 다음과 같이 설명한다.

I figured out what caused the change in the behaviour.
Tasks are run using daemon processes both in 3.0 and 3.1, but until celery/billiard@4c32d2e and celery/billiard@c676b94 multiprocessing module wasn't aware of that and hence was allowing creating subprocesses.

To my understanding, there was a bug prior to version 3.1 (tasks were allowed to create subprocesses, which could result in orphaned state) and now this bug has been fixed.

위에서 재현한 대로 multiprocessing 모듈이 현재 실행 중인 프로세스가 데몬 프로세스인지 체크하지 못하고 있고 이 것이 assert 조건이 추가되면서 문제를 발생시켰다는 것이다. 즉 쉽게 말해 버그였다는 것이다. (WTF)

 

문제 해결

우선 airflow contributor 인 Jarek Potiuk 은 이 문제에 대해 부정적인 의견을 표했다.

It broke because of optimisations implemented in airflow that make use of multiprocessing. The best way for you to proceed will be to turn your multiprocessing jobs into separate Airflow tasks

생각해보면 기본적으로 각각의 task 가 서로 다른 프로세스로 움직이니까… 분리하는 것이 더 좋아 보이기도 한다. 다만 이러면 구현하는 과정이 매우 까다로워(귀찮아) 지고 생각해보면 쪼갠 거 안에서도 더 쪼갤 수 있다면… 이건 참을 수 없다. GPU 스레드랑 코어도 쪼개 쓰는 세상이니까!

 

여러 방법을 시도해보고 적용할 법한 방법은 다음과 같다.

Airflow 환경변수 바꾸기

가장 간단한 방법이다. Airflow의 환경변수에 PYTHONOPTIMIZE = 1 옵션을 주어 PYTHONOPTIMIZE -O

가 되도록 한다.

 

PYTHONOPTIMIZE-O 옵션이 들어가면 debug 값에서 assert statements를 제거할 수 있다.

 

다만 이 방법을 사용하면 데몬 프로세스가 자식 프로세스를 형성하는 문제는 여전히 존재하기 때문에 추천할 방법이라고 보기는 어렵다.

virtualenv에서 실행

PythonOperator 가 실행될 때 viretual env 에 진입하여 실행되도록 DAG을 짠다. 이 경우 가상 환경에서 코드가 동작하면서 파이썬 인터프리터는 데몬이 아닌 새 프로세스를 생성하기 때문에 문제를 해결할 수 있다.

 

billiard 패키지를 사용한다. ( 권장 사항 )

python2.7 버전의 multiprocessing package에서 포크 돼 만들어진 패키지로 위에서 언급한 celery issue 등 의 문제 해결을 위해 celery 팀에서 개발하고 관리하고 있다.

 

기존에 import multiprocessing을 import billiard as multiprocessing 으로 처리해주면 된다.

billiard 는 기존 multiprocessing 패키지보다 더 많은 변수를 사용 가능 및 요구하는 등 더 적극적으로 커스터마이즈를 지원한다.

 

다만 이 방법은 concurrent.futures를 꼭 사용하겠다!라고 할 때 해결 방법으로는 적합하지 않아 보인다.

 

IV. 참고자료

반응형
복사했습니다!