파이썬 동시성/비동기 프로그래밍 3. GIL(Global interpreter Lock)
concurrent.futures
기존에 파이썬에서 비동기 실행 환경을 만드는 데 있었던 어려움을 해결해준 패키지
- ThreadPoolExecutor(), ProcessPoolExecutor()를 사용하여 다중 스레드/다중 프로세스를 구현
- concurrent.futures 특징
- 멀티쓰레딩, 멀티프로세싱 API 통일
- Promise 개념 사용 (실행 중인 작업의 취소, 재실행, 타임아웃, 완료 여부, 콜백 주기 등의 코드를 쉽게 작성 가능)
I. Executor
비동기적 호출을 실행하는 메서드를 제공하는 추상 객체로 직접 사용은 불가능하며 하위 클래스를 이용하여 호출
ThreadPoolExecutor()
와ProcessPoolExecutor()
2가지 클래스를 제공하는데 단순히 다중 스레드냐 프로세스냐의 차이일 뿐 사용법이 다르지는 않습니다.- 이름에 pool 이라는 단어가 의미하듯 초기화 후에 몇 개의 작업이 실행될 것인지 정해져 있는 work list를 queue에 받아 작업을 처리
1. Executor 사용
A. ThreadPoolExecutor(max_workers = None, thread_name_prefix='', initializer=None, initargs=())
max_workers 스레드의 풀을 사용해 호출을 비동기적으로 실행하는 executor 서브 클래스로 네트워크 또는 I/O 기반 작업에 대한 효율을 증가시키는데 유리
- GIL의 영향을 받음
- initializer 는 콜러블 객체로 각 작업 스레드의 시작 부분에서 호출되며 initargs는 initializer에 전달되는 인수 튜플 -> 예외가 발생할 경우 모든 작업에 BrokenThreadPool 발생
- Max_workers 미지정시 3.5버전 이상에서는 local process 개수 * 5로 지정되며 3.8 버전 이상에서는 min(32, os.cpu_count()+4)로 지정
B. ProcessPoolExecutor(max_workers = None, mp_context=None, initializer=None, initargs=())
max_workers 프로세스의 풀을 사용해 호출을 비동기적으로 실행하는 executor 서브 클래스로 CPU 부하가 큰 작업의 분산처리가 목적일 때 유리
- GIL의 영향을 받지 않음
- max_workers 미지정시 local process 갯수를 사용, 단 윈도우 기준 61이 최댓값이기 때문에 None으로 지정 시 더 많은 프로세스가 있더라도 61개로 자동 지정됨
- max_workers <= 0 인 경우 ValueError
2. submit(func, args, *kwargs) -> concurrent.futures._base.Future
입력된 func이 func(*args, **kwargs)
로 실행되도록 예약하고, 실행 결과를 나타내는 future 객체를 반환
import concurrent.futures as futures
class A :
pass
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 2, 3)
>>> print(type(future))
<class 'concurrent.futures._base.Future'>
>>> print(future.result())
8
3. map(func, *iter, timeout=None, chunksize : int) -> generator()
map(func, *iter)
함수에 timeout, chunksize 인수가 추가된 것 외에는 기능상 동일
map()
함수가 입력된 iter를 "순서대로" 처리하는 반면futures.map()
은 func을 비동기로 실행하기 때문에 "동시에" 처리한다는 점이 다름- 함수가 완료된 순서대로
generator()
로 저장되는데, timeout을 설정한 경우__next__()
로 호출했을 때 실행이 완료되지 않았으면 timeoutError 발생(timeout=None 또는 선언하지 않은 경우 대기시간은 무한) - 입력된 실행 함수 결과가 예외를 발생하더라도
next()
로 호출하기 전까지 알려주지 않음 ProcessPoolExecutor()
인 경우 *iter 값을 여러 개로 분할하여 프로세스에 할당하는데 이 크기를 chunksize로 지정한다.(기본 = 1) 따라서ThreadPoolExcutor()
에서는 사용할 필요가 없음
import concurrent.futures as futures
class A :
pass
def factorial(n):
if n == 1 :
return 1
return n * factorial(n-1)
with futures.ProcessPoolExecutor() as executor:
future = executor.map(factorial, [1,5,10,20,40], chunksize = 2)
>>> print(type(future))
<class 'generator'>
>>> print(set(sorted(dir(future))) - set(sorted(dir(A))))
{'__next__', '__qualname__', '__iter__', 'send', 'gi_running', 'gi_code', 'close', '__del__', 'gi_frame', 'gi_yieldfrom', '__name__', 'throw'}
>>> print(future.__next__())
1
>>>print(future.__next__())
120
...
>>>print(future.__next__())
StopIteration
4. shutdown(wait : bool)
executor에 실행 중이거나 대기 중인 모든 future 객체 중지 및 리소스 정리를 실시
- wait = True인 경우 진행중인 작업이 종료될 때까지 기다린 후 shutdown 발생
shutdown()
된 executor 객체에submit()
또는map()
호출 시 RuntimeError 발생
II. Future
asyncio.future()
와 유사한 객체로 executor.submit()
에 의해 생성되어 비동기 실행을 캡슐화하며 하나의 작업에 대해 다양한 콜백을 추가할 수 있음
- JS의 Promise API와도 유사함
import concurrent.futures as futures
class A :
pass
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 2, 3)
>>> print(type(future))
<class 'concurrent.futures._base.Future'>
>>> print(set(sorted(dir(future))) - set(sorted(dir(A))))
{'_state', 'set_result', '_condition', '_waiters', 'cancelled', '_exception', '_result', 'exception', 'result', 'set_running_or_notify_cancel', 'set_exception', '_Future__get_result', '_done_callbacks', 'cancel', 'done', 'running', 'add_done_callback', '_invoke_callbacks'}
1. cancel()
cancel()
은 executor의 작업 호출을 취소하며 실행 중인 아닌 경우 취소하며 true 반환
2. done(), cancelled()
done()
은 호출의 실행이 완료되었거나 취소가 완료된 경우 true를 반환하며 cancelled()
는 취소가 완료된 경우만 true 반환
3. result(timeout = None)
실행된 호출의 결과값을 보여주는데, timeout이 초과되는 경우 timeoutError가 발생하며 완료 전에 future가 취소된 경우 cancelledError가 발생합니다. (+ 호출에 예외가 발생하는 경우 동일한 예외를 반환합니다.)
4. exception(timeout=None)
호출의 결과가 아닌 예외를 반환하는 것을 제외하고 result()
와 동일합니다.(예외가 없으면 None 반환)
5. add_done_callback(func)
콜러블 함수를 future와 연결하여 취소되거나 실행 종료될 때 호출됩니다.
- 다수의 콜백을 추가할 경우 추가한 순서대로 동작합니다.
- 콜러블 함수가 exception을 발생시키면 기록되고 넘어가지만 BaseException을 일으키면 동작하지 않습니다.
import concurrent.futures as futures
import functools
def factorial(n):
if n == 1 :
return 1
return n * factorial(n-1)
def is_done(Future):
print(Future.done())
def show_result(Future):
print(Future.result())
def is_exception(Future):
print(Future.exception())
with futures.ThreadPoolExecutor() as executor:
future = executor.submit(factorial, 3)
>>> future.add_done_callback(is_done)
True
>>> future.add_done_callback(show_result)
6
>>> future.add_done_callback(is_exception)
None
>>> future.add_done_callback(functools.partial(print, "Future : "))
Future : <Future at 0x110573090 state=finished returned int>
III. 모듈함수
1. wait(*futures, timeout = None, return_when = 'given') -> tuple
입력된 여러 future 객체들에 대해 모두 완료될 때까지 대기하고, 입력된 future들의 성공 여부에 따른 집합을 담은 튜플 출력
- 다수의 executor에서 만들어진 future 입력 가능
- not_done에는 대기 중이거나 실행 중인 객체가, done에는 성공/실패한 객체가 입력
- return_when 인수는 아래와 같음
- FIRST_CONPLETED : 하나의 future가 종료 시 반환
- ALL_COMPLETED : 모든 future 종료 또는 취소 시 반환
- FIRST_EXCEPTION : 첫 번째 예외가 발생하는 경우 반환, 예외 없으면 all_completed와 동일
import concurrent.futures as futures
work_list = [10, 100, 1000]
def sum_generator(n):
return sum(n for n in range(1,n+1))
def main():
# 시작 시간
start_tm = time.time()
futures_list = []
with futures.ThreadPoolExecutor() as excutor:
for work in work_list:
future = excutor.submit(sum_generator, work)
# 스케쥴링
futures_list.append(future)
print('Scheduled for {} : {}'.format(work, future))
# #wait 결과 출력
result = futures.wait(futures_list, timeout=7)
print(result)
# 결과 값 출력
print([future.result() for future in result.done])
# 종료 시간
end_tm = time.time() - start_tm
# 출력 포멧
msg = '\n Time : {:.2f}s'
# 최종 결과 출력
print(msg.format(end_tm))
# 실행
>>> if __name__ == '__main__':
main()
Scheduled for 10 : <Future at 0x10b083f90 state=finished returned int>
Scheduled for 100 : <Future at 0x10b08d3d0 state=finished returned int>
Scheduled for 1000 : <Future at 0x10b08d890 state=finished returned int>
DoneAndNotDoneFutures(done={<Future at 0x10b083f90 state=finished returned int>, <Future at 0x10b08d890 state=finished returned int>, <Future at 0x10b08d3d0 state=finished returned int>}, not_done=set())
[55, 500500, 5050]
Time : 0.00s
2. as_completed(*futures, timeout=None) -> iterator()
입력된 future들이 종료되는 즉시 yield 하는 iterator를 반환
- 다수의 executor에서 만들어진 future 입력 가능
- 중복된 future가 있는 경우 한 번만 실행
* wait에서 사용한 예제와 동일하게 만들었으나 결과의 순서가 다르게 나타나는 모습을 볼 수 있습니다.
import concurrent.futures as futures
work_list = [10, 100, 1000]
def sum_generator(n):
return sum(n for n in range(1,n+1))
def main():
worker = min(10, len(work_list))
start_tm = time.time()
futures_list = []
with futures.ThreadPoolExecutor() as excutor:
for work in work_list:
future = excutor.submit(sum_generator, work)
futures_list.append(future)
print('Scheduled for {} : {}'.format(work, future))
# as_completed 결과 출력
for future in futures.as_completed(futures_list):
result = future.result()
done = future.done()
cancelled = future.cancelled()
# future 결과 확인
print('Future Result : {}, Done : {}'.format(result, done))
print('Future Cancelled : {}'.format(cancelled))
end_tm = time.time() - start_tm
msg = '\n Time : {:.2f}s'
print(msg.format(end_tm))
>>> if __name__ == '__main__':
main()
Scheduled for 10 : <Future at 0x1026f3e90 state=finished returned int>
Scheduled for 100 : <Future at 0x1026fe2d0 state=finished returned int>
Scheduled for 1000 : <Future at 0x1026fe790 state=finished returned int>
Future Result : 500500, Done : True
Future Cancelled : False
Future Result : 5050, Done : True
Future Cancelled : False
Future Result : 55, Done : True
Future Cancelled : False
Time : 0.00s