본문 바로가기

Developer/Python

[Python] 파이썬 CPU 확장 방법 2 (퓨처스)

이번 페이지에서는 CPU 확장 방법 중 하나인 '퓨처스'에 대해 설명한다.

 

멀티 프로세스와 멀티 스레드 관련 포스트는 이전 포스트를 참고한다.

https://tttto-factory.tistory.com/7

 

파이썬 - CPU 확장 방법 1 (멀티 스레드, 멀티 프로세스)

CPU 확장 이유? CPU 속도는 무한히 빨라질 수 없으므로, 애플리케이션에 동시성 + 병렬성 도입 CPU 확장 방법 1. 스레드 <장점> 여러 함수를 동시에 실행하기에 좋다 싱글 CPU : 스레드 차례로 실행 멀�

tttto-factory.tistory.com

 

futures 사용하기

 

파이썬3 : concurrent.futures 모듈을 사용하면 비동기 작업 쉽게 처리 가능

파이썬2 : pip install futures명령으로, 파이썬 2에도 back-porting

 

concurrent.futures 모듈은 실행자(executor)를 선택해야한다.

  • 실행자는 스케줄링과 비동기 작업을 실행할 책임
  • 어떤 작업을 실행할 때 사용할 '엔진'
  • 모듈
    1. concurrent.futures.ThreadPoolExecutor : GIL 의 제약으로 완벽한 확장 솔루션이 아님
    2. concurrent.futures.ProcessPoolExecutor : 프로세스는 CPU를 사용할 때 더 큰 이득이 있으며, 오랜 시간 동안 실행되는 작업에 효율적

concurrent.futures의 장점

  • threading, multiprocessing 모듈의 상위에서 추상 계층을 쉽게 사용할 수 있음
  • concurrent.futures.Future와 같은 추상 데이터 구조를 활용해서, 코드 실행 및 병렬화를 간단히 처리 가능

 

from concurrent import futures
import random


# 공유 객체의 상태를 변경하는 대신 결괏값 반환
# Future 객체 변경해서 전송 편리, 결과 수집 쉬움
def compute():
    return sum([random.randint(1,100) for i in range(1000000)])

with futures.ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(compute) for _ in range(8)]

results = [f.result() for f in futures]
print(results)

기본 엔진으로 threading 모듈이 사용되기 때문에, threading을 사용했던 결과와 비슷

 

from concurrent import futures
import random

def compupte():
    return sum([random.randint(1,100) for i in range(1000000)])

# multiprocessing.cpu_count 함수를 호출해서 사용할 워커 수 결정
# max_workers를 별도 설정할 필요 x
with futures.ProcessPoolExecuter() as executor:
    futures = [executor.submit(compute) for _ in range(8)]

results = [f.result() for f in futures]
print(results)


# 스레드보다 훨씬 빠름

 

 

futurist 사용하기

import futurist
from futurist import waiters
import random

def compute():
    return sum([random.randint(1,100) for i in range(10000)])

# futurist는 사용 중인 실행자에 관한 통계 제공
# 작업의 현재 상태 추적, 코드 실행 정보 얻기
with futurist.ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(compute) for _ in range(8)]
    print(executor.statistics)

results = waiters.wait_for_all(futures)
print(executor.statistics)

print([r.result() for r in results.done])

 

futurist는 함수 전달과 check_and_reject 인수를 써서 작업 거부 가능

import futurist
from futurist import rejection
import random

def compute():
    return sum([random.randint(1,100) for i in range(1000000)])

# 백로그 크기를 2로 제한했기 때문에 컴퓨터 속도에 따라 실행자가 작업을 빠르게 처리하지 못하면,
# futurist.RejectedSubmission 예외가 발생할 가능성 있다
with futurist.ThreadPoolExecutor(max_workers=8, check_and_reject=rejection.reject_when_reached(2)) as executor:
    futures = [executor.submit(compute) for _ in range(20)]
    print(executor.statistics)

results = [f.result() for f in futures]
print(executor.statistics)

print(results)

 

futurist 함수 실행을 주기적으로 스케줄링해 주는 futurist.periodics.PeriodicWorker 클래스와 함께 자주 사용된다.

import time

from futurist import periodics

# 1초마다 실행되어 작업 경과 시간 출력
@periodics.periodic(1)
def every_one(started_at):
    print(time.time() - started_at))

w = periodics.PeriodicWorker([(every_one, (time.time(),),{},])

# 4초마다 실행되어 작업 진행에 관한 통계 출력
@periodics.periodic(4)
def print_stats():
    print(list(w.iter_watchers()))

w.add(print_stats)
w.start()

처리 중인 작업 통계를 제공하므로, 애플리케이션 상태를 알고 싶을 때 유용하게 사용 가능

 

 

데몬 프로세스

데몬?

오랜 시간 동안 백그라운드에서 실행되는 프로세스

주기적으로 작업 스케줄링하거나, 큐에서 작업 꺼내와서 처리

 

코들리든 라이브러리?

https://github.com/sileth/cotyledon

오랜 시간 실행되는 프로세스를 만들기 위한 목적으로 개발

 

# 코들리든을 사용한 데몬

import threading
import time
import cotyledon

class PrinterService(cotyledon.Service):
    name = "printer"
    
    def __init__(self, worker_id):
        super(PrinterService, self).__init__(worker_id)
        
        # 코틀리든은 내부적으로 여러 개의 스레드를 사용하기 때문에
        # threading.Event 객체가 run과 terminate 함수를 동기화 하는데 사용
        self._shutdown = threading.Event()
    
    # 메인 루프 포함
    def run(self):
        while not self._shutdown.is_set():
            print("Doing stuff")
            time.sleep(1)
    
    # 다른 스레드가 서비스 종료할 때 호출
    def terminate(self):
        self._shutdown.set()

# manager 생성
manager = cotyledon.ServiceManager()

# PrinterService 두 개를 실행하기 위해 추가
# 시작할 서비스 개수로 2를 지정했기 때문에, 서비스는 두 번 시작
# 프로세스 두 개를 시작하고, 각 프로세스는 PrinterService.run 함수를 하나씩 실행
manager.add(PrinterService, 2)

# manager에 추가된 작업을 모두 실행
manager.run()

 

  1. 코틀리든은 마스터 프로세스를 실행해서 자식 프로세스 관리.
  2. PrinterService 인스턴스 두 개를 시작.
  3. 독특한 프로세스 이름을 부여해서, 프로세스 목록에서 쉽게 구분할 수 있게 함
  4. 프로세스 중 하나가 죽거나 크래시되면 코틀리든이 자동으로 프로세스를 다시 실행한다.
    • os.fork 호출을 수행하고, 데몬에 올바른 모드를 설정하는 등 내부에서 많은 작업을 처리

 

위의 작업은 워커가 자신만의 고유 작업을 처리하기 때문에 서로 커뮤니케이션 필요 x

 

아래는 일반적인 생산자(producer)/소비자(consumer) 패턴

생산자 서비스는 큐에 필요한 작업을 채우고, 소비자 서비스는 큐에서 작업을 꺼내 실행한다.

 

import multiprocessing
import time

import cotyledon

# 큐 객체는 모든 서비스에 전달
class Manager(cotyledon.ServiceManager):
    def __init__(self):
        super(Manager, self).__init__()
        queue = multiprocessing.Manager().Queue()
        self.add(ProducerService, args(queue,))
        self.add(PrinterService, args-(queue,), workers=2)

# 1초마다 하나씩 증가하는 정수로 큐를 채우고
class ProducerService(cotyledon.Service):
    def __init__(self, worker_id, queue):
        super(ProducerService, self).__init__(worker_id)
        self.queue = queue
    
    def run(self):
        i=0
        while True:
            self.queue.put(i)
            i += 1
            time.sleep(1)

# 큐의 내용을 꺼내 출력한다.
class PrinterService(cotyledon.Service):
    name = "printer"
    
    def __init__(self, worker_id, queue):
        super(PrinterService, self).__init__(worker_id)
        self.queue = queue
    
    def run(self):
        while True:
            job = self.queue.get(block=True)
            print(self.workerid, self.pid, job)

multiprocessing.queues.Queue 객체는 서로 다른 프로세스 간의 커뮤니케이션을 쉽게 해 준다.

 

코틀리든은 프로그램 설정을 다시 로딩하거나, 클래스의 워커 수를 동적으로 변경하는 등 추가 기능 제공

import multiprocessing
import time
import cotyledon

class Manager(cotyledon.ServiceManager):
    
    # vmfhtptm en rofh tlwkr
    def __init__(self):
        super(Manager, self).__init__()
        queue = multiprocessing.Manager().Queue()
        self.add(ProducerService, args=(queue, ))
        self.printer = self.add(PrinterService, args=(queue,) , workers=2)
        self.register_hooks(on_reload=self.reload)
    
    # 주 프로세스에 SIGHUP(signal hang up)이 보내지면 코틀리든이 Manager.reload 함수를 호출
    # Printer Service 프로세스를 다섯 개로 늘린다.
    def reload(self):
        print("Reloading")
        self.reconfigure(self.printer, 5)

class ProducerService(cotyledon.Service):
    def __init__(self, worker_id, queue):
        super(ProducerService, self).__init__(worker_id)
        self.queue = queue
    
    def run(self):
        i=0
        while True:
            self.queue.put(i)
            i+=1
            time.sleep(1)
    
    class PrinterService(cotyledon.Service):
        name = "printer"
        
        def __init__(self,worker_id, queue):
            super(PrinterService, self).__init__(worker_id)
            self.queue = queue
        
        def run(self):
            while True:
                job = self.queue.get(block=True)
                print(self.worker_id, self.pid, job))

Manager().run()