반응형

[python]파이썬 동시성/비동기 프로그래밍 1. 코루틴

 

[python]파이썬 동시성/비동기 프로그래밍 1. 코루틴

코루틴 3.5 버전부터 지원(async/await 문법은 3.7 이상부터) 코루틴을 사용하면 CPU와 리소스 낭비를 방지할 수 있어 I/O처리를 극대화할 수 있음 일시 정지/정지가 가능한 함수를 말하며 generator 역시

leo-bb.tistory.com

Asyncio

  • 파이썬에서 동시성 프로그래밍을 위해 제공되는 패키지로 GIL을 회피하는 방법 중 하나
  • 파이썬 3.4 버전에 표준으로 채택
  • 비동기 프레임워크의 기반으로 사용되며 IO 병목이 발생하면서 고차원의 수준이 요구되는 네트워크 코드에 가장 유용
  • 아래는 공식 레퍼런스 문서에서 설명하는 사항

    1.파이썬 코르틴을 동시에 실행하고 완전히 제어할 수 있다.
    2.네트워크 IO와 IPC를 수행
    3.Child proccess 제어
    4.Queue를 통한 작업 분산
    5.동시성 코드의 동기화
    6.콜백기반 라이브러리와 async/await 구문을 사용한 코드를 연결
    7.transport를 이용한 효율적인 프로토콜 구현
    8.비동기 API와 이벤트 루프를 만들고 관리

* 참고로 파이썬 언어 개발자는 async/await 보다 def/yield를 선호한다고 합니다. 때문에 가능한 상황에서는 def를 활용한 코루틴 구성을 권장한다고 합니다.

I.asyncio 사용

1. Native Coroutine

asyncio를 사용하여 코루틴을 구성하는 경우 def 앞에 async 를 붙히고 yield 대신 await을 사용합니다.

  • await 뒤에 코루틴이 오는 경우 네이티브 코루틴이 아닌 generator기반 코루틴도 가능 합니다. (generator 기반 코루틴위에 @types.coroutine데코레이터를 붙히기만 하면 됩니다)
  • 이렇게 await 표현식과 연결하여 사용할 수 있는 객체를 awaitable 객체라 부르며 네이티브 코루틴, 태스크, 퓨처 3가지 유형 존재
  • async로 만들어진 코루틴을 generator 기반 코루틴과 구분하기 위해 "네이티브 코루틴" 이라고 별칭
  • 3.10 버전부터 네이티브 코루틴만 지원할 예정
import asyncio

async def sub():
    print('me too')

async def main():
    print('I love u')
    await asyncio.sleep(1)
    await sub()

>>> print(type(sub()))
<class 'coroutine'>
>>> asyncio.run(main())
I love u
me too

2. Task

태스크는 asyncio.Future의 파생 클래스로 코루틴을 동시에 예약/취소/완료 등을 확인하기 위해 사용됩니다.

  • create_task(coroutine)으로 사용, 3.7버전 이하에서는 ensure_future() 를 사용
  • task 취소 시 cancel() 메서드 사용(취소 확인은 cancelled())
  • eventloop을 사용할 경우 get_runnung_loop()에 의해 반환된 루프에서 실행되며 스레드에 실행 중인 루프가 없으면 runtimeError가 발생
import asyncio

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(
        say_after(0, 'I love u'))

    task2 = asyncio.create_task(
        say_after(1, 'me too'))

    await task1
    await task2

>>> asyncio.run(main())
I love u
me too

II. Asyncio의 유용한 함수들

1. async.sleep(n)

다수의 awaitable 객체가 있을 때 현재 실행중이 객체를 입력된 시간만큼 일시 중단시켜주는 기능

  • time.sleep(n)은 입력된 시간만큼 프로그램 자체가 정지하여 cpu가 그만큼 유휴(idle)하게 되지만 asyncio.sleep()은 현재 실행중인 awaitable 객체가 정지할 뿐 다른 객체는 실행된다는 차이가 있음
import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

>>> asyncio.run(display_date())
2020-07-18 22:50:14.860300
2020-07-18 22:50:15.863113
2020-07-18 22:50:16.864836
2020-07-18 22:50:17.866177
2020-07-18 22:50:18.866458

2. async.wait(awaitable*, timeout=None, return_when = arg) -> task, future

입력된 모든 awaitable 객체를 동시에 실행하고, return_when에 지정된 조건을 만족할 때 블락하여 대기

  • 코루틴 객체를 wait()에 직접 전달할 수 없음

3. async.gather(awaitable*, return_exceptions = bool) -> list[]

  • 입력된 모든 awaitable 객체를 동시에 실행
  • awaitable*에 입력된 객체가 코루틴인 경우 자동으로 태스크 예약
  • 모든 객체가 실행된 후 결과는 입력한 순서대로 리스트에 담겨 반환
  • return_excpetions = false 일 경우 다른 객체는 계속 실행되지만 실패한 객체는 실패한 결과로 저장, true일 경우 실패하더라도 성공으로 처리하고 결과 리스트에 집계됨
    • 따라서 입력된 객체 실행 중 task 또는 future가 취소돼도 cancelledError 처리되며 gather()는 계속 실행
    • 단 gather 자체가 취소되면 return_exceptions와 무관하게 취소(3.7 버전 이상부터)
import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

>>> asyncio.run(main())
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

4.asyncio.Queue(maxsize = int)

First In First out 큐로 queue 모듈과 유사하게 설계

  • timeout 이 없기 때문에 시간제한을 두고 연산하려면 asyncio.wait_for() 사용

  • queue 모듈의 queue 클래스와 달리 최대 사이즈가 지정( 0보다 작으면 무한)

  • Queue 종류

    1.asyncio.PriorityQueue()

      * get()으로 호출될 때 FIFO가 아닌 지정된 우선순위 순서로 반출 
      * 순서는 (priority_num, data) 튜플형식

    2.asyncio.LifoQueue()

      * get()으로 호출될 때 List in First out 으로 반출되는 큐
  • Queue 함수

    1.queue.put()

      * queue에 항목을 입력
      * queue 항목수가 maxsize인 경우 get()으로 호출될 때까지 대기

     

    2.queue.get()

      * Queue 에서 대기중인 코루틴을 꺼내 실행
      * queue가 비어 있는 경우 입력될 때 까지 대기

    4.queue.join()

      * queue 안의 모든 항목을 수신하고 처리 할 때까지 블락
      * 완료하지 않은 작업 수는 큐에 항목이 추가될 때마다 증가하고, task_done()을 호출할 때마다 감소(0보다 작아지는 경우 블락 해제)

    2.queue.task_done()

      * get()으로 호출된 코루틴 실행이 종료되었음을 큐에 전달

 

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    queue = asyncio.Queue()

    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    tasks = []
    for i in range(2):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


>>> asyncio.run(main())
worker-1 has slept for 0.64 seconds
worker-0 has slept for 0.65 seconds
worker-1 has slept for 0.23 seconds
worker-1 has slept for 0.72 seconds
worker-0 has slept for 0.98 seconds
worker-0 has slept for 0.37 seconds
worker-0 has slept for 0.07 seconds
worker-1 has slept for 0.50 seconds
worker-0 has slept for 0.75 seconds
worker-1 has slept for 0.79 seconds
worker-1 has slept for 0.14 seconds
worker-0 has slept for 0.31 seconds
worker-1 has slept for 0.21 seconds
worker-0 has slept for 0.28 seconds
worker-0 has slept for 0.53 seconds
worker-1 has slept for 0.96 seconds
worker-1 has slept for 0.40 seconds
worker-0 has slept for 0.90 seconds
worker-0 has slept for 0.09 seconds
worker-1 has slept for 0.85 seconds
====
3 workers slept in parallel for 5.46 seconds
total expected sleep time: 10.37 seconds
반응형
복사했습니다!