summary : chapter7,8,9 내용 정리/요약, cython / concurrency / multiprocessing
ch7 cython
ctypes
ch8 동시성
https://github.com/kchhero/high_performance_python/tree/master/08_concurrency
비동기 프로그래밍
- 간단한 이벤트 루프
from queue import Queue # from functools import partial # - eventloop = None # class EventLoop(Queue): def start(self): while True: function = self.get() function() # - def do_hello(): global eventloop print("Hello") eventloop.put(do_world) # - def do_world(): global eventloop print("World") eventloop.put(do_hello) # - if __name__ == "__main__": eventloop = EventLoop() eventloop.put(do_hello) eventloop.start()
- 이벤트 루프를 asynchronous I/O 연산과 결합하면 I/O 작업을 수행하면서 엄청난 성능 향상을 얻을 수 있다.
Callback, Future
event loop를 사용하는 프로그래밍에는 callback과 future의 두 가지 형태가 있다.
- 콜백 패러다임에서는 각 함수를 호출할 때 callback이라는 인자를 넘긴다. 불려진 함수는 값을 반환하는 대신, 그 값을 인자로 실어 콜백 함수를 호출한다.
- 퓨처를 실제 결과가 아닌 미래에 얻을 결과를 담을 promise를 반환하는 비동기 함수다. 비동기 함수가 반환하는 퓨처가 완료되어 필요한 값이 채워지기를 기다려야 한다.
sequencial crawler
- 각 요청에 0.1초가 걸리고 요청을 500번 보냈으므로, 전체 요청을 처리하는데 약 50초 걸릴것이다.
import requests import string import random #. def generate_urls(base_url, num_urls): for i in range(num_urls): yield base_url + "".join(random.sample(string.ascii_lowercase, 10)) #. def run_experiments(base_url, num_iters=500): response_size = 0 for url in generate_urls(base_url, num_iters): response = requests.get(url) response_size += len(response.text) return response_size #. if __name__ == "__main__": import time delay = 100 num_iter = 500 base_url = "http://127.0.0.1:8080/add?name=serial&delay={}&".format(delay) #. start = time.time() result = run_experiments(base_url, num_iter) end = time.time() print("Result: {}, Time: {}".format(result, end - start))
- 결과
$ python3 test8_2.py Result: 500, Time: 52.93217206001282
gevent
가장 단순한 비동기 라이브러리 중 하나.
tornado
페이스북에서 개발한 비동기 I/O 패키지.
AsyncIO
파이썬 3.4 부터 이전의 asyncio 표준 라이브러리 모듈을 개선했다. gevent와 tornado 방식의 동시성에서 영향을 받았다. 코루틴을 더 간단하게 다룰 수 있도록 yield from 이라는 키워드를 추가했다.
ch9 multiprocessing
- 문제를 여러 CPU로 병렬화한다면, n 코어 시스템에서 n배의 속도 향상을 기대 할 수 있다? 프로세스를 추가하면 통신에 따른 부가 비용이 늘어나며 한 프로세스가 사용할 수 있는 RAM이 줄어들게되므로 실제 n배의 속도 향상을 온전히 얻을 수는 없다.
- multiprocessing 모듈로 처리할 수 있는 작업의 예 CPU 위주의 작업을 process나 pool 객체를 사용해 병렬화 dummy 모듈을 사용해서 I/O 위주의 작업을 스레드를 사용하는 Pool로 병렬화 Queue를 통해 pickling한 결과를 공유 병렬화한 작업자 사이에서 바이트,원시 데이터 타입, 사전, 리스트 등의 상태를 공유
multiprocessing 모듈
주 구성요소
- process : 현재 프로세스를 fork한 복사본.
- pool : process나 threading API를 wrapping 하여 작업을 공유하고, 합쳐진 결과를 반환해주는 사용하기 편리한 worker pool로 만든다.
- queue : 여러 producer와 conosumer가 사용할 수 있게 해주는 FIFO 대기열이다.
- pipe : 두 프로세스 사이의 통신 채널
- manager : process간에 파이썬 객체를 공유하기 위한 고수준의 managed interface.
- ctypes : process를 fork한 다음 여러 process가 원시 데이터 타입을 공유 할 수 있게 해준다.
몬테카를로 원주율 구하기 알고리즘
import random
TESTCNT = 100000000
def monte_carlo():
circleIn = 0
for i in range(TESTCNT):
tempX = random.random()
tempY = random.random()
if (tempX*tempX + tempY*tempY) <= 1:
circleIn += 1
print("shot cnt = %f" % (circleIn/TESTCNT))
print("pi = %f" % (4.0*circleIn/TESTCNT))
startTime = time.time()
monte_carlo()
print("running time : " + str(time.time()-startTime))
결과
$ python3 test9_2.py
shot cnt = 0.785400
pi = 3.141602
running time : 25.7069725990295
process와 thread를 사용해 원주율 추정하기
파이썬 객체 사용, numpy사용
"""Estimate Pi using Threads and Processes"""
import time
import numpy as np
from multiprocessing import Pool
def estimate_nbr_points_in_quarter_circle(nbr_samples):
# 각각의 새로운 프로세스에서 numpy를 위한 난수 시드를 생성한다.
# 그렇제 않다면 fork로 만들어진 모든 프로세스가 동일한 상태를 공유한다.
np.random.seed()
xs = np.random.uniform(0, 1, nbr_samples)
ys = np.random.uniform(0, 1, nbr_samples)
estimate_inside_quarter_unit_circle = (xs * xs + ys * ys) <= 1
nbr_trials_in_quarter_unit_circle = np.sum(
estimate_inside_quarter_unit_circle)
return nbr_trials_in_quarter_unit_circle
if __name__ == "__main__":
nbr_samples_in_total = 100000000
nbr_parallel_blocks = 8
pool = Pool()
nbr_samples_per_worker = int(nbr_samples_in_total / nbr_parallel_blocks)
print(type(nbr_samples_per_worker))
print("Making {} samples per worker".format(nbr_samples_per_worker))
# confirm we have an integer number of jobs to distribute
assert nbr_samples_per_worker == int(nbr_samples_per_worker)
nbr_samples_per_worker == int(nbr_samples_per_worker)
map_inputs = [nbr_samples_per_worker] * nbr_parallel_blocks
print(type(map_inputs))
t1 = time.time()
results = pool.map(estimate_nbr_points_in_quarter_circle, map_inputs)
pool.close()
print("Dart throws in unit circle per worker:", results)
print("Took {}s".format(time.time() - t1))
nbr_in_circle = sum(results)
combined_nbr_samples = sum(map_inputs)
pi_estimate = float(nbr_in_circle) / combined_nbr_samples * 4
print("Estimated pi", pi_estimate)
print("Pi", np.pi)
Understanding the Python GIL
- GIL (Global Interpreter Lock)
참고 : http://highthroughput.org/wp/cb-1136/
파이썬의 스레드는 I/O 위주의 작업에는 훌륭히 작동하지만, CPU 위주의 문제에는 좋은 선택이 못된다.
데이비드 비즐리, The Python GIL Visualized 참고.
refs : http://dabeaz.blogspot.com/2010/01/python-gil-visualized.html
멀티코어 시스템에서 다중 스레드를 사용하는 경우에만 문제가 된다.
여러 스레드를 실행하는 단일 코어 시스템에서는 GIL전투가 벌어지지 않는다.
Prime Number
프로세스간에 공유해야 할 상태가 전혀 없는 매우 병렬적인 문제.
chunksize 매개변수의 사용 -> 작업 단위의 크기가 소수 판정 속도에 미치는 영향
** sentinel **
import math
import time
import multiprocessing
from multiprocessing import Pool
FLAG_ALL_DONE = b"WORK_FINISHED"
FLAG_WORKER_FINISHED_PROCESSING = b"WORKER_FINISHED_PROCESSING"
def check_prime(possible_primes_queue, definite_primes_queue):
while True:
n = possible_primes_queue.get()
if n == FLAG_ALL_DONE:
definite_primes_queue.put(FLAG_WORKER_FINISHED_PROCESSING)
break
else:
if n % 2 == 0:
continue
for i in range(int(math.sqrt(n) + 1), 2):
if n % i == 0:
break
else:
definite_primes_queue.put(n)
if __name__ == "__main__":
primes = []
manager = multiprocessing.Manager()
# We could limit the input queue size with e.g. `maxsize=3`
possible_primes_queue = manager.Queue()
definite_primes_queue = manager.Queue()
NBR_PROCESSES = 4
pool = Pool(processes=NBR_PROCESSES)
processes = []
for _ in range(NBR_PROCESSES):
p = multiprocessing.Process(
target=check_prime,
args=(
possible_primes_queue,
definite_primes_queue))
processes.append(p)
p.start()
t1 = time.time()
number_range = range(1, 1000000) # A
# number_range = xrange(100000000, 100100000) # B
# number_range = range(100000000, 101000000) # C
# number_range = xrange(1000000000, 1000100000) # D
# number_range = xrange(100000000000, 100000100000) # E
for possible_prime in number_range:
possible_primes_queue.put(possible_prime)
print("ALL JOBS ADDED TO THE QUEUE")
# add poison pills to stop the remote workers
for n in range(NBR_PROCESSES):
possible_primes_queue.put(FLAG_ALL_DONE)
print("NOW WAITING FOR RESULTS...")
processors_indicating_they_have_finished = 0
while True:
# block whilst waiting for results
new_result = definite_primes_queue.get()
if new_result == FLAG_WORKER_FINISHED_PROCESSING:
print("WORKER {} HAS JUST FINISHED".format(processors_indicating_they_have_finished))
processors_indicating_they_have_finished += 1
if processors_indicating_they_have_finished == NBR_PROCESSES:
break
else:
primes.append(new_result)
assert processors_indicating_they_have_finished == NBR_PROCESSES
print("Took:", time.time() - t1)
print(len(primes), primes[:10], primes[-10:])
결과
$ python3 test9_3_prime_sentinel.py
ALL JOBS ADDED TO THE QUEUE
NOW WAITING FOR RESULTS...
WORKER 0 HAS JUST FINISHED
WORKER 1 HAS JUST FINISHED
WORKER 2 HAS JUST FINISHED
WORKER 3 HAS JUST FINISHED
Took: 199.00096464157104
500000 [1, 3, 5, 7, 9, 11, 13, 15, 17, 19] [999981, 999983, 999985, 999987, 999989, 999991, 999993, 999995, 999997, 999999]
프로세스간 통신을 사용
** 순차적 해법 **
def check_prime(n):
if n % 2 == 0:
return False
from_i = 3
to_i = math.sqrt(n) + 1
for i in xrange(from_i, int(to_i), 2):
if n % i == 0:
return False
return True
** 단순한 풀 해법 **
def check_prime(n, pool, nbr_processes):
from_i = 3
to_i = int(math.sqrt(n)) + 1
ranges_to_check = create_range.create(from_i, to_i, nbr_processes)
ranges_to_check = zip(len(ranges_to_check) * [n], ranges_to_check)
assert len(ranges_to_check) == nbr_processes
results = pool.map(check_prime_in_range, ranges_to_check)
if False in results:
return False
return True
** 조금 덜 단순한 풀 해법 **
https://github.com/kchhero/high_performance_python/blob/master/09_multiprocessing/prime_validation/primes_pool_per_number2.py 가능성이 높은 인수는 순차적으로 검사하여 빠르게 처리하고, 큰 값은 인수들은 병렬로 처리한다.
def check_prime(n, pool, nbr_processes):
# cheaply check high probability set of possible factors
from_i = 3
to_i = 21
if not check_prime_in_range((n, (from_i, to_i))):
return False
from_i = to_i
to_i = int(math.sqrt(n)) + 1
ranges_to_check = create_range.create(from_i, to_i, nbr_processes)
ranges_to_check = zip(len(ranges_to_check) * [n], ranges_to_check)
assert len(ranges_to_check) == nbr_processes
results = pool.map(check_prime_in_range, ranges_to_check)
if False in results:
return False
return True
** Manager.Value 플래그 사용하기 **
고수준 파이썬 객체를 프로세스 간에 managed 공유 객체로 활용할 수 있다. 저수준 객체들은 proxy 객체로 감싼다. 안정성을 보장하게되고 속도는 느려지지만 유연성을 얻을 수 있다. https://github.com/kchhero/high_performance_python/blob/master/09_multiprocessing/prime_validation/primes_pool_per_number_manager.py
** 레디스를 플래그로 사용하기 **
- redis : 인메모리 키/값 저장소 엔진이다. 자체 락을 제공하며, 각 연산은 원자적이다. 언어와 무관한 데이터 저장소를 만들 수 있다.
- 저장 : 문자열 리스트, 문자열의 집합, 문자열을 정렬한 집합, 문자열의 해시 ==> https://www.joinc.co.kr/w/man/12/REDIS/IntroDataType
** RawValue를 플래그로 사용하기 **
** mmap를 플래그로 사용하기 **
바이트들을 공유하는 가장 빠른 방식이다. https://github.com/kchhero/high_performance_python/blob/master/09_multiprocessing/prime_validation/primes_pool_per_number_mmap.py