이번 페이지에서는 CPU 확장 방법 중 하나인 '퓨처스'에 대해 설명한다.
멀티 프로세스와 멀티 스레드 관련 포스트는 이전 포스트를 참고한다.
https://tttto-factory.tistory.com/7
futures 사용하기
파이썬3 : concurrent.futures 모듈을 사용하면 비동기 작업 쉽게 처리 가능
파이썬2 : pip install futures명령으로, 파이썬 2에도 back-porting
concurrent.futures 모듈은 실행자(executor)를 선택해야한다.
- 실행자는 스케줄링과 비동기 작업을 실행할 책임
- 어떤 작업을 실행할 때 사용할 '엔진'
- 모듈
- concurrent.futures.ThreadPoolExecutor : GIL 의 제약으로 완벽한 확장 솔루션이 아님
- concurrent.futures.ProcessPoolExecutor : 프로세스는 CPU를 사용할 때 더 큰 이득이 있으며, 오랜 시간 동안 실행되는 작업에 효율적
concurrent.futures의 장점
- threading, multiprocessing 모듈의 상위에서 추상 계층을 쉽게 사용할 수 있음
- concurrent.futures.Future와 같은 추상 데이터 구조를 활용해서, 코드 실행 및 병렬화를 간단히 처리 가능
from concurrent import futures
import random
# 공유 객체의 상태를 변경하는 대신 결괏값 반환
# Future 객체 변경해서 전송 편리, 결과 수집 쉬움
def compute():
return sum([random.randint(1,100) for i in range(1000000)])
with futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(compute) for _ in range(8)]
results = [f.result() for f in futures]
print(results)
기본 엔진으로 threading 모듈이 사용되기 때문에, threading을 사용했던 결과와 비슷
from concurrent import futures
import random
def compupte():
return sum([random.randint(1,100) for i in range(1000000)])
# multiprocessing.cpu_count 함수를 호출해서 사용할 워커 수 결정
# max_workers를 별도 설정할 필요 x
with futures.ProcessPoolExecuter() as executor:
futures = [executor.submit(compute) for _ in range(8)]
results = [f.result() for f in futures]
print(results)
# 스레드보다 훨씬 빠름
futurist 사용하기
import futurist
from futurist import waiters
import random
def compute():
return sum([random.randint(1,100) for i in range(10000)])
# futurist는 사용 중인 실행자에 관한 통계 제공
# 작업의 현재 상태 추적, 코드 실행 정보 얻기
with futurist.ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(compute) for _ in range(8)]
print(executor.statistics)
results = waiters.wait_for_all(futures)
print(executor.statistics)
print([r.result() for r in results.done])
futurist는 함수 전달과 check_and_reject 인수를 써서 작업 거부 가능
import futurist
from futurist import rejection
import random
def compute():
return sum([random.randint(1,100) for i in range(1000000)])
# 백로그 크기를 2로 제한했기 때문에 컴퓨터 속도에 따라 실행자가 작업을 빠르게 처리하지 못하면,
# futurist.RejectedSubmission 예외가 발생할 가능성 있다
with futurist.ThreadPoolExecutor(max_workers=8, check_and_reject=rejection.reject_when_reached(2)) as executor:
futures = [executor.submit(compute) for _ in range(20)]
print(executor.statistics)
results = [f.result() for f in futures]
print(executor.statistics)
print(results)
futurist 함수 실행을 주기적으로 스케줄링해 주는 futurist.periodics.PeriodicWorker 클래스와 함께 자주 사용된다.
import time
from futurist import periodics
# 1초마다 실행되어 작업 경과 시간 출력
@periodics.periodic(1)
def every_one(started_at):
print(time.time() - started_at))
w = periodics.PeriodicWorker([(every_one, (time.time(),),{},])
# 4초마다 실행되어 작업 진행에 관한 통계 출력
@periodics.periodic(4)
def print_stats():
print(list(w.iter_watchers()))
w.add(print_stats)
w.start()
처리 중인 작업 통계를 제공하므로, 애플리케이션 상태를 알고 싶을 때 유용하게 사용 가능
데몬 프로세스
데몬?
오랜 시간 동안 백그라운드에서 실행되는 프로세스
주기적으로 작업 스케줄링하거나, 큐에서 작업 꺼내와서 처리
코들리든 라이브러리?
https://github.com/sileth/cotyledon
오랜 시간 실행되는 프로세스를 만들기 위한 목적으로 개발
# 코들리든을 사용한 데몬
import threading
import time
import cotyledon
class PrinterService(cotyledon.Service):
name = "printer"
def __init__(self, worker_id):
super(PrinterService, self).__init__(worker_id)
# 코틀리든은 내부적으로 여러 개의 스레드를 사용하기 때문에
# threading.Event 객체가 run과 terminate 함수를 동기화 하는데 사용
self._shutdown = threading.Event()
# 메인 루프 포함
def run(self):
while not self._shutdown.is_set():
print("Doing stuff")
time.sleep(1)
# 다른 스레드가 서비스 종료할 때 호출
def terminate(self):
self._shutdown.set()
# manager 생성
manager = cotyledon.ServiceManager()
# PrinterService 두 개를 실행하기 위해 추가
# 시작할 서비스 개수로 2를 지정했기 때문에, 서비스는 두 번 시작
# 프로세스 두 개를 시작하고, 각 프로세스는 PrinterService.run 함수를 하나씩 실행
manager.add(PrinterService, 2)
# manager에 추가된 작업을 모두 실행
manager.run()
- 코틀리든은 마스터 프로세스를 실행해서 자식 프로세스 관리.
- PrinterService 인스턴스 두 개를 시작.
- 독특한 프로세스 이름을 부여해서, 프로세스 목록에서 쉽게 구분할 수 있게 함
- 프로세스 중 하나가 죽거나 크래시되면 코틀리든이 자동으로 프로세스를 다시 실행한다.
- os.fork 호출을 수행하고, 데몬에 올바른 모드를 설정하는 등 내부에서 많은 작업을 처리
위의 작업은 워커가 자신만의 고유 작업을 처리하기 때문에 서로 커뮤니케이션 필요 x
아래는 일반적인 생산자(producer)/소비자(consumer) 패턴
생산자 서비스는 큐에 필요한 작업을 채우고, 소비자 서비스는 큐에서 작업을 꺼내 실행한다.
import multiprocessing
import time
import cotyledon
# 큐 객체는 모든 서비스에 전달
class Manager(cotyledon.ServiceManager):
def __init__(self):
super(Manager, self).__init__()
queue = multiprocessing.Manager().Queue()
self.add(ProducerService, args(queue,))
self.add(PrinterService, args-(queue,), workers=2)
# 1초마다 하나씩 증가하는 정수로 큐를 채우고
class ProducerService(cotyledon.Service):
def __init__(self, worker_id, queue):
super(ProducerService, self).__init__(worker_id)
self.queue = queue
def run(self):
i=0
while True:
self.queue.put(i)
i += 1
time.sleep(1)
# 큐의 내용을 꺼내 출력한다.
class PrinterService(cotyledon.Service):
name = "printer"
def __init__(self, worker_id, queue):
super(PrinterService, self).__init__(worker_id)
self.queue = queue
def run(self):
while True:
job = self.queue.get(block=True)
print(self.workerid, self.pid, job)
multiprocessing.queues.Queue 객체는 서로 다른 프로세스 간의 커뮤니케이션을 쉽게 해 준다.
코틀리든은 프로그램 설정을 다시 로딩하거나, 클래스의 워커 수를 동적으로 변경하는 등 추가 기능 제공
import multiprocessing
import time
import cotyledon
class Manager(cotyledon.ServiceManager):
# vmfhtptm en rofh tlwkr
def __init__(self):
super(Manager, self).__init__()
queue = multiprocessing.Manager().Queue()
self.add(ProducerService, args=(queue, ))
self.printer = self.add(PrinterService, args=(queue,) , workers=2)
self.register_hooks(on_reload=self.reload)
# 주 프로세스에 SIGHUP(signal hang up)이 보내지면 코틀리든이 Manager.reload 함수를 호출
# Printer Service 프로세스를 다섯 개로 늘린다.
def reload(self):
print("Reloading")
self.reconfigure(self.printer, 5)
class ProducerService(cotyledon.Service):
def __init__(self, worker_id, queue):
super(ProducerService, self).__init__(worker_id)
self.queue = queue
def run(self):
i=0
while True:
self.queue.put(i)
i+=1
time.sleep(1)
class PrinterService(cotyledon.Service):
name = "printer"
def __init__(self,worker_id, queue):
super(PrinterService, self).__init__(worker_id)
self.queue = queue
def run(self):
while True:
job = self.queue.get(block=True)
print(self.worker_id, self.pid, job))
Manager().run()
'Developer > Python' 카테고리의 다른 글
[Python] 파이썬 이벤트 루프 (0) | 2020.12.29 |
---|---|
[Python] 파이썬에서 IP 버전 확인하기 (0) | 2020.07.30 |
[Python] 파이썬에서 텔넷(telnet) 사용하기 (1) | 2020.07.29 |
[Python] dict 딕셔너리 키-값 출력 (0) | 2020.07.24 |
[Python] 파이썬 CPU 확장 방법 1 (멀티 스레드, 멀티 프로세스) (0) | 2020.05.09 |