[python]파이썬 동시성/비동기 프로그래밍 1. 코루틴
Asyncio
- 파이썬에서 동시성 프로그래밍을 위해 제공되는 패키지로 GIL을 회피하는 방법 중 하나
- 파이썬 3.4 버전에 표준으로 채택
- 비동기 프레임워크의 기반으로 사용되며 IO 병목이 발생하면서 고차원의 수준이 요구되는 네트워크 코드에 가장 유용
- 아래는 공식 레퍼런스 문서에서 설명하는 사항
1.파이썬 코르틴을 동시에 실행하고 완전히 제어할 수 있다.
2.네트워크 IO와 IPC를 수행
3.Child proccess 제어
4.Queue를 통한 작업 분산
5.동시성 코드의 동기화
6.콜백기반 라이브러리와 async/await 구문을 사용한 코드를 연결
7.transport를 이용한 효율적인 프로토콜 구현
8.비동기 API와 이벤트 루프를 만들고 관리
* 참고로 파이썬 언어 개발자는 async/await 보다 def/yield를 선호한다고 합니다. 때문에 가능한 상황에서는 def를 활용한 코루틴 구성을 권장한다고 합니다.
I.asyncio 사용
1. Native Coroutine
asyncio를 사용하여 코루틴을 구성하는 경우 def
앞에 async
를 붙히고 yield
대신 await
을 사용합니다.
await
뒤에 코루틴이 오는 경우 네이티브 코루틴이 아닌 generator기반 코루틴도 가능 합니다. (generator 기반 코루틴위에@types.coroutine
데코레이터를 붙히기만 하면 됩니다)- 이렇게
await
표현식과 연결하여 사용할 수 있는 객체를 awaitable 객체라 부르며 네이티브 코루틴, 태스크, 퓨처 3가지 유형 존재 - async로 만들어진 코루틴을 generator 기반 코루틴과 구분하기 위해 "네이티브 코루틴" 이라고 별칭
- 3.10 버전부터 네이티브 코루틴만 지원할 예정
import asyncio
async def sub():
print('me too')
async def main():
print('I love u')
await asyncio.sleep(1)
await sub()
>>> print(type(sub()))
<class 'coroutine'>
>>> asyncio.run(main())
I love u
me too
2. Task
태스크는 asyncio.Future의 파생 클래스로 코루틴을 동시에 예약/취소/완료 등을 확인하기 위해 사용됩니다.
create_task(coroutine)
으로 사용, 3.7버전 이하에서는ensure_future()
를 사용- task 취소 시
cancel()
메서드 사용(취소 확인은cancelled()
) - eventloop을 사용할 경우 get_runnung_loop()에 의해 반환된 루프에서 실행되며 스레드에 실행 중인 루프가 없으면 runtimeError가 발생
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(
say_after(0, 'I love u'))
task2 = asyncio.create_task(
say_after(1, 'me too'))
await task1
await task2
>>> asyncio.run(main())
I love u
me too
II. Asyncio의 유용한 함수들
1. async.sleep(n)
다수의 awaitable 객체가 있을 때 현재 실행중이 객체를 입력된 시간만큼 일시 중단시켜주는 기능
- time.sleep(n)은 입력된 시간만큼 프로그램 자체가 정지하여 cpu가 그만큼 유휴(idle)하게 되지만 asyncio.sleep()은 현재 실행중인 awaitable 객체가 정지할 뿐 다른 객체는 실행된다는 차이가 있음
import asyncio
import datetime
async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
>>> asyncio.run(display_date())
2020-07-18 22:50:14.860300
2020-07-18 22:50:15.863113
2020-07-18 22:50:16.864836
2020-07-18 22:50:17.866177
2020-07-18 22:50:18.866458
2. async.wait(awaitable*, timeout=None, return_when = arg) -> task, future
입력된 모든 awaitable 객체를 동시에 실행하고, return_when에 지정된 조건을 만족할 때 블락하여 대기
- 코루틴 객체를 wait()에 직접 전달할 수 없음
3. async.gather(awaitable*, return_exceptions = bool) -> list[]
- 입력된 모든 awaitable 객체를 동시에 실행
- awaitable*에 입력된 객체가 코루틴인 경우 자동으로 태스크 예약
- 모든 객체가 실행된 후 결과는 입력한 순서대로 리스트에 담겨 반환
- return_excpetions = false 일 경우 다른 객체는 계속 실행되지만 실패한 객체는 실패한 결과로 저장, true일 경우 실패하더라도 성공으로 처리하고 결과 리스트에 집계됨
- 따라서 입력된 객체 실행 중 task 또는 future가 취소돼도 cancelledError 처리되며 gather()는 계속 실행
- 단 gather 자체가 취소되면 return_exceptions와 무관하게 취소(3.7 버전 이상부터)
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
>>> asyncio.run(main())
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
4.asyncio.Queue(maxsize = int)
First In First out 큐로 queue 모듈과 유사하게 설계
-
timeout 이 없기 때문에 시간제한을 두고 연산하려면 asyncio.wait_for() 사용
-
queue 모듈의 queue 클래스와 달리 최대 사이즈가 지정( 0보다 작으면 무한)
-
Queue 종류
1.asyncio.PriorityQueue()
* get()으로 호출될 때 FIFO가 아닌 지정된 우선순위 순서로 반출 * 순서는 (priority_num, data) 튜플형식
2.asyncio.LifoQueue()
* get()으로 호출될 때 List in First out 으로 반출되는 큐
-
Queue 함수
1.queue.put()
* queue에 항목을 입력 * queue 항목수가 maxsize인 경우 get()으로 호출될 때까지 대기
2.queue.get()
* Queue 에서 대기중인 코루틴을 꺼내 실행 * queue가 비어 있는 경우 입력될 때 까지 대기
4.queue.join()
* queue 안의 모든 항목을 수신하고 처리 할 때까지 블락 * 완료하지 않은 작업 수는 큐에 항목이 추가될 때마다 증가하고, task_done()을 호출할 때마다 감소(0보다 작아지는 경우 블락 해제)
2.queue.task_done()
* get()으로 호출된 코루틴 실행이 종료되었음을 큐에 전달
import asyncio
import random
import time
async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
queue = asyncio.Queue()
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(2):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
>>> asyncio.run(main())
worker-1 has slept for 0.64 seconds
worker-0 has slept for 0.65 seconds
worker-1 has slept for 0.23 seconds
worker-1 has slept for 0.72 seconds
worker-0 has slept for 0.98 seconds
worker-0 has slept for 0.37 seconds
worker-0 has slept for 0.07 seconds
worker-1 has slept for 0.50 seconds
worker-0 has slept for 0.75 seconds
worker-1 has slept for 0.79 seconds
worker-1 has slept for 0.14 seconds
worker-0 has slept for 0.31 seconds
worker-1 has slept for 0.21 seconds
worker-0 has slept for 0.28 seconds
worker-0 has slept for 0.53 seconds
worker-1 has slept for 0.96 seconds
worker-1 has slept for 0.40 seconds
worker-0 has slept for 0.90 seconds
worker-0 has slept for 0.09 seconds
worker-1 has slept for 0.85 seconds
====
3 workers slept in parallel for 5.46 seconds
total expected sleep time: 10.37 seconds