【编程学习】代码效率优化——并行与异步

GIL

  • “CPython解释器”所采用的一种机制,确保同一时刻只有一个线程在执行 Python bytecode

Thread

img

Thread对象

  • 如何创建线程?创建线程的2种方式:
    • 方式1:传递一个可调用对象给Thread构造函数
    • 方式2:继承Thread,在子类中重载run()方法。要注意的是,子类只能重载构造函数和run()方法,不能重载其它方法
  • 如何启动线程?线程被创建之后,调用start()启动,会在独立的线程中调用run()方法
  • 一旦现成被启动,该线程被认为是“存活”的,当run()方法正常退出或者抛出未被处理的异常,线程就不是“存活”的了,可以用is_alive()方法检测线程是否存活
  • 如何等待一个线程结束?其他线程可以调用一个线程的 join() 方法,这会阻塞调用该方法的线程,直到被调用 join() 方法的线程终结

守护线程

  • 守护线程的含义:当剩下的线程都是守护线程时,整个Python程序将会退出
  • 守护线程在程序退出时会突然关闭,他们的资源(例如已经打开的文档等)可能没有被正确释放
  • 主线程不是一个守护线程
  • 可以通过设置参数daemon来设置一个线程是否是守护线程,这个值必须在调用start()之前设置。如果不设置,默认继承自创建线程

basic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
from datetime import datetime, timedelta

def task(task_id: int):
print('task {} thread name {}, {}'.format(
task_id, threading.current_thread().getName(), datetime.utcnow() + timedelta(hours=8)))
return

if __name__ == '__main__':
threads = []

for i in range(5):
t = threading.Thread(target=task, args=(i, ))
threads.append(t)
t.setName('name-{}'.format(i))
t.start()

for t in threads:
t.join()

输出:

1
2
3
4
5
task 0 thread name name-0, 2022-07-25 11:55:15.709096
task 1 thread name name-1, 2022-07-25 11:55:15.709423
task 2 thread name name-2, 2022-07-25 11:55:15.709562
task 3 thread name name-3, 2022-07-25 11:55:15.709658
task 4 thread name name-4, 2022-07-25 11:55:15.709718

consumer & producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from threading import Thread
from queue import Empty, Queue
import time
import random

_shutdown = False

class producer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue

def run(self):
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
print('Producer notify: item N° %d appended to queue by %s' %
(item, self.name))
time.sleep(0.1)
# 等待所有的item被消费掉
self.queue.join()
global _shutdown
_shutdown = True
print('Producer done %s' % (self.name))

class consumer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue

def run(self):
while True and not _shutdown:
try:
item = self.queue.get(block=True, timeout=1)
print('Consumer notify : %d popped from queue by %s' %
(item, self.name))
self.queue.task_done()
except Empty:
break
print('Consumer done %s' % (self.name))

if __name__ == '__main__':
queue = Queue()
t1 = producer(queue)
t2 = consumer(queue)
t3 = consumer(queue)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Producer notify: item N° 126 appended to queue by Thread-1
Consumer notify : 126 popped from queue by Thread-2
Producer notify: item N° 36 appended to queue by Thread-1
Consumer notify : 36 popped from queue by Thread-3
Producer notify: item N° 205 appended to queue by Thread-1
Consumer notify : 205 popped from queue by Thread-2
Producer notify: item N° 55 appended to queue by Thread-1
Consumer notify : 55 popped from queue by Thread-3
Producer notify: item N° 152 appended to queue by Thread-1
Consumer notify : 152 popped from queue by Thread-2
Producer notify: item N° 70 appended to queue by Thread-1
Consumer notify : 70 popped from queue by Thread-3
Producer notify: item N° 168 appended to queue by Thread-1
Consumer notify : 168 popped from queue by Thread-2
Producer notify: item N° 74 appended to queue by Thread-1
Consumer notify : 74 popped from queue by Thread-3
Producer notify: item N° 87 appended to queue by Thread-1
Consumer notify : 87 popped from queue by Thread-2
Producer notify: item N° 59 appended to queue by Thread-1
Consumer notify : 59 popped from queue by Thread-3
Producer done Thread-1
Consumer done Thread-2
Consumer done Thread-3

Process

img

basic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import multiprocessing
from datetime import datetime, timedelta

def task(task_id: int):
print('task {} process id {}, {}'.format(
task_id, multiprocessing.current_process().pid, datetime.utcnow() + timedelta(hours=8)))
return

if __name__ == '__main__':
processes = []

for i in range(5):
p = multiprocessing.Process(target=task, args=(i, ))
processes.append(p)
p.start()

for p in processes:
p.join()

输出:

1
2
3
4
5
task 0 process id 7651, 2022-07-25 11:54:22.553022
task 3 process id 7654, 2022-07-25 11:54:22.554469
task 1 process id 7652, 2022-07-25 11:54:22.555789
task 4 process id 7655, 2022-07-25 11:54:22.564164
task 2 process id 7653, 2022-07-25 11:54:22.567585

consumer & producer

和多线程的区别:

  • multiprocessing.Queue相比标准库的queue.Queue,缺少了 task_done()和 join()这两个方法
  • 如果同步_shutdown
    • 多线程用全局变量作为_shutdown
    • 多进程使用共享内存变量作为_shutdown

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from multiprocessing import Process, Queue, Value
from queue import Empty
import time
import random

class producer(Process):
def __init__(self, queue: Queue, shutdown: Value) -> None:
Process.__init__(self)
self.queue = queue
self.shutdown = shutdown

def run(self):
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
print('Producer notify: item N° %d appended to queue by %s' %
(item, self.name))
time.sleep(0.1)
# 当前进程将不会再往队列中放入对象, 一旦所有缓冲区中的数据被写入管道之后,
# 后台的线程会退出
self.queue.close()
# 等待后台线程, 这个方法仅在调用了close()方法之后可用, 会阻塞当前进程, 直
# 到后台线程退出, 确保所有缓冲区中的数据都被写入管道中
self.queue.join_thread()
self.shutdown.value = 1
print('Producer done %s' % (self.name))

class consumer(Process):
def __init__(self, queue: Queue, shutdown: Value) -> None:
Process.__init__(self)
self.queue = queue
self.shutdown = shutdown

def run(self):
while True and not self.shutdown.value:
try:
item = self.queue.get(block=True, timeout=1)
print('Consumer notify : %d popped from queue by %s' %
(item, self.name))
except Empty:
if self.shutdown.value:
break
print('Consumer done %s' % (self.name))

if __name__ == '__main__':
queue = Queue()
# 返回一个从共享内存上创建的对象
_shutdown = Value('i', 0)
p1 = producer(queue, _shutdown)
p2 = consumer(queue, _shutdown)
p3 = consumer(queue, _shutdown)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Producer notify: item N° 119 appended to queue by producer-1
Consumer notify : 119 popped from queue by consumer-2
Producer notify: item N° 250 appended to queue by producer-1
Consumer notify : 250 popped from queue by consumer-2
Producer notify: item N° 215 appended to queue by producer-1
Consumer notify : 215 popped from queue by consumer-2
Producer notify: item N° 210 appended to queue by producer-1
Consumer notify : 210 popped from queue by consumer-2
Producer notify: item N° 70 appended to queue by producer-1
Consumer notify : 70 popped from queue by consumer-2
Producer notify: item N° 29 appended to queue by producer-1
Consumer notify : 29 popped from queue by consumer-2
Producer notify: item N° 237 appended to queue by producer-1
Consumer notify : 237 popped from queue by consumer-2
Producer notify: item N° 55 appended to queue by producer-1
Consumer notify : 55 popped from queue by consumer-2
Producer notify: item N° 10 appended to queue by producer-1
Consumer notify : 10 popped from queue by consumer-2
Producer notify: item N° 224 appended to queue by producer-1
Consumer notify : 224 popped from queue by consumer-2
Producer done producer-1
Consumer done consumer-2
Consumer done consumer-3

Executor

img

异步执行可调用对象,异步执行可以由 ThreadPoolExecutor 使用线程或由 ProcessPoolExecutor 使用单独的进程来实现

  • concurrent.futures.Executor
  • concurrent.futures.ThreadPoolExecutor
  • concurrent.futures.ProcessPoolExecutor
  • concurrent.futures.Future

ThreadPoolExecutor VS ProcessPoolExecutor

  • ThreadPoolExecutor:
    • Threads
    • Lightweight workers
    • 全局变量
    • GIL
    • IO-bound Tasks
  • ProcessPoolExecutor:
    • Processes
    • Heavyweight Workers
    • 共享内存变量(跨进程)
    • No GIL
    • CPU-bound Tasks

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from concurrent.futures import ThreadPoolExecutor, Future
import requests
import threading
from datetime import datetime, timedelta

def task(file_name: str, url: str) -> str:
res = requests.get(url)
with open(file_name, mode='wb') as f:
f.write(res.content)

response = 'task thread name {}'.format(
threading.current_thread().getName())
print(file_name, datetime.utcnow() + timedelta(hours=8))
return response

def done_callback(future: Future) -> None:
print('task done[thread name={}]:'.format(
threading.current_thread().getName()), future.result())

if __name__ == '__main__':
task_lists = [
('regex101.txt', 'https://regex101.com/'),
('morning.txt', 'https://www.politico.com/tipsheets/morning-money'),
('economics.txt', 'https://www.bloomberg.com/markets/economics')
]
future_lists: list[Future] = []
pool = ThreadPoolExecutor(10)
for file_name, url in task_lists:
future = pool.submit(task, file_name, url)
future.add_done_callback(done_callback)
future_lists.append(future)

# 在所有待执行的future对象完成执行且释放已分配的资源后才会返回
pool.shutdown(True)
response_lists = []
for f in future_lists:
response_lists.append(f.result())
print(response_lists)

输出:

1
2
3
4
5
6
7
8
9
economics.txt 2022-07-25 09:49:59.536341                    
task done[thread name=ThreadPoolExecutor-0_2]: task thread name ThreadPoolExecutor-0_2
regex101.txt 2022-07-25 09:49:59.766962
task done[thread name=ThreadPoolExecutor-0_0]: task thread name ThreadPoolExecutor-0_0
morning.txt 2022-07-25 09:50:00.081044
task done[thread name=ThreadPoolExecutor-0_1]: task thread name ThreadPoolExecutor-0_1
['task thread name ThreadPoolExecutor-0_0',
'task thread name ThreadPoolExecutor-0_1',
'task thread name ThreadPoolExecutor-0_2']

ProcessPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from concurrent.futures import ProcessPoolExecutor, Future
import multiprocessing
import requests
from datetime import datetime, timedelta

def task(file_name: str, url: str) -> str:
res = requests.get(url)
with open(file_name, mode='wb') as f:
f.write(res.content)

response = 'task process id {}'.format(
multiprocessing.current_process().pid)
print(file_name, datetime.utcnow() + timedelta(hours=8))
return response

def done_callback(future: Future) -> None:
print('task done [process id={}]:'.format(
multiprocessing.current_process().pid), future.result())

if __name__ == '__main__':
task_lists = [
('regex101.txt', 'https://regex101.com/'),
('morning.txt', 'https://www.politico.com/tipsheets/morning-money'),
('economics.txt', 'https://www.bloomberg.com/markets/economics')
]
future_lists: list[Future] = []
pool = ProcessPoolExecutor(10)
for file_name, url in task_lists:
future = pool.submit(task, file_name, url)
future.add_done_callback(done_callback)
future_lists.append(future)

# 在所有待执行的future对象完成执行且释放已分配的资源后才会返回
pool.shutdown(True)
response_lists = []
for f in future_lists:
response_lists.append(f.result())
print(response_lists)

输出:

1
2
3
4
5
6
7
8
9
morning.txt 2022-07-25 09:43:24.237463                      
task done [process id=6920]: task process id 6924
economics.txt 2022-07-25 09:43:24.389305
task done [process id=6920]: task process id 6923
regex101.txt 2022-07-25 09:43:24.849390
task done [process id=6920]: task process id 6922
['task process id 6922',
'task process id 6924',
'task process id 6923']

Process Pool

apply async

  • 使用AsyncResult.get()获取进程的返回值
  • 放入Pool中计算的函数参数最好是基本Python类型,所有的类型都是可以pickle
  • 如果放入Pool中计算的task需要一定时间,要控制放入的频率,否则Pool队列会堆积,导致task过很长时间才会被执行。例如:一个task的计算时间是100毫秒,Pool中进程数量是10,理论上每秒最多处理10x10=100个task,如果我们每秒放入Pool 200个task,这会导致放入Pool中的task很长时间才会被执行,在实际计算中,我们可以将这个时间写入日志,方便后续统计观测&发现问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import multiprocessing as mp
import time
import random
import os

def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
mp.current_process().name,
func.__name__, args, result
)

def calculatestar(args):
return calculate(*args)

def mul(a, b):
time.sleep(0.5 * random.random())
return a * b

def plus(a, b):
time.sleep(0.5 * random.random())
return a + b

TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]

if __name__ == '__main__':
pool = mp.Pool(os.cpu_count())
results = [pool.apply_async(calculate, t) for t in TASKS]
for r in results:
print(r.get())
pool.close()
pool.join()

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SpawnPoolWorker-4 says that mul(0, 7) = 0                   
SpawnPoolWorker-1 says that mul(1, 7) = 7
SpawnPoolWorker-3 says that mul(2, 7) = 14
SpawnPoolWorker-5 says that mul(3, 7) = 21
SpawnPoolWorker-6 says that mul(4, 7) = 28
SpawnPoolWorker-7 says that mul(5, 7) = 35
SpawnPoolWorker-4 says that mul(6, 7) = 42
SpawnPoolWorker-2 says that mul(7, 7) = 49
SpawnPoolWorker-8 says that mul(8, 7) = 56
SpawnPoolWorker-8 says that mul(9, 7) = 63
SpawnPoolWorker-1 says that plus(0, 8) = 8
SpawnPoolWorker-8 says that plus(1, 8) = 9
SpawnPoolWorker-4 says that plus(2, 8) = 10
SpawnPoolWorker-8 says that plus(3, 8) = 11
SpawnPoolWorker-5 says that plus(4, 8) = 12
SpawnPoolWorker-7 says that plus(5, 8) = 13
SpawnPoolWorker-3 says that plus(6, 8) = 14
SpawnPoolWorker-6 says that plus(7, 8) = 15
SpawnPoolWorker-1 says that plus(8, 8) = 16
SpawnPoolWorker-1 says that plus(9, 8) = 17
  • 使用回调函数callback获取进程的返回值
1
2
3
4
5
6
7
8
9
10
11
12
def on_success(res):
print('on_success: {0}'.format(res))

def on_error(e):
print('on_error: {0}'.format(e))

if __name__ == '__main__':
pool = mp.Pool(os.cpu_count())
[pool.apply_async(calculate, t, callback=on_success,
error_callback=on_error) for t in TASKS]
pool.close()
pool.join()

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
on_success: SpawnPoolWorker-7 says that mul(4, 7) = 28      
on_success: SpawnPoolWorker-5 says that mul(6, 7) = 42
on_success: SpawnPoolWorker-1 says that mul(0, 7) = 0
on_success: SpawnPoolWorker-2 says that mul(3, 7) = 21
on_success: SpawnPoolWorker-6 says that mul(5, 7) = 35
on_success: SpawnPoolWorker-3 says that mul(1, 7) = 7
on_success: SpawnPoolWorker-7 says that mul(8, 7) = 56
on_success: SpawnPoolWorker-5 says that mul(9, 7) = 63
on_success: SpawnPoolWorker-6 says that plus(2, 8) = 10
on_success: SpawnPoolWorker-4 says that mul(2, 7) = 14
on_success: SpawnPoolWorker-6 says that plus(6, 8) = 14
on_success: SpawnPoolWorker-8 says that mul(7, 7) = 49
on_success: SpawnPoolWorker-1 says that plus(0, 8) = 8
on_success: SpawnPoolWorker-3 says that plus(3, 8) = 11
on_success: SpawnPoolWorker-7 says that plus(4, 8) = 12
on_success: SpawnPoolWorker-8 says that plus(9, 8) = 17
on_success: SpawnPoolWorker-2 says that plus(1, 8) = 9
on_success: SpawnPoolWorker-6 says that plus(8, 8) = 16
on_success: SpawnPoolWorker-5 says that plus(5, 8) = 13
on_success: SpawnPoolWorker-4 says that plus(7, 8) = 15

map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import multiprocessing as mp
import time
import random
import os

def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
mp.current_process().name,
func.__name__, args, result
)

def calculatestar(args):
return calculate(*args)

def mul(a, b):
time.sleep(0.5 * random.random())
return a * b

def plus(a, b):
time.sleep(0.5 * random.random())
return a + b

TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]

if __name__ == '__main__':
pool = mp.Pool(os.cpu_count())
for x in pool.map(calculatestar, TASKS):
print(x)
pool.close()
pool.join()

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SpawnPoolWorker-2 says that mul(0, 7) = 0                   
SpawnPoolWorker-1 says that mul(1, 7) = 7
SpawnPoolWorker-4 says that mul(2, 7) = 14
SpawnPoolWorker-3 says that mul(3, 7) = 21
SpawnPoolWorker-5 says that mul(4, 7) = 28
SpawnPoolWorker-6 says that mul(5, 7) = 35
SpawnPoolWorker-8 says that mul(6, 7) = 42
SpawnPoolWorker-7 says that mul(7, 7) = 49
SpawnPoolWorker-8 says that mul(8, 7) = 56
SpawnPoolWorker-7 says that mul(9, 7) = 63
SpawnPoolWorker-3 says that plus(0, 8) = 8
SpawnPoolWorker-6 says that plus(1, 8) = 9
SpawnPoolWorker-2 says that plus(2, 8) = 10
SpawnPoolWorker-1 says that plus(3, 8) = 11
SpawnPoolWorker-4 says that plus(4, 8) = 12
SpawnPoolWorker-8 says that plus(5, 8) = 13
SpawnPoolWorker-5 says that plus(6, 8) = 14
SpawnPoolWorker-5 says that plus(7, 8) = 15
SpawnPoolWorker-1 says that plus(8, 8) = 16
SpawnPoolWorker-8 says that plus(9, 8) = 17

异常

  • AsyncResult.get([timeout])用于获取执行结果
    • 如果 timeout 不是 None 并且在 timeout 秒内仍然没有执行完得到结果,则抛出 multiprocessing.TimeoutError 异常
    • 如果远程调用发生异常,这个异常会通过 get() 重新抛出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import multiprocessing as mp
import os

def f(x):
return 1.0 / (x - 5.0)

if __name__ == '__main__':
pool = mp.Pool(os.cpu_count())
r = pool.apply_async(f, (5,))
try:
# 1. 如果timeout不是None并且在timeout秒内仍然没有执行完得到结果, 则抛出
# multiprocessing.TimeoutError异常
# 2. 如果远程调用发生异常, 这个异常会通过get()重新抛出。
r.get()
except ZeroDivisionError as e:
print(e)
except mp.TimeoutError as e:
print(e)
pool.close()
pool.join()

# 输出:
# float division by zero
  • 通过回调函数on_error捕获进程抛出的异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def on_success(res):
print('on_success: {0}'.format(res))

def on_error(e):
# 捕获进程抛出的异常
assert isinstance(e, ZeroDivisionError)
print('on_error: {0}'.format(e))

if __name__ == '__main__':
pool = mp.Pool(os.cpu_count())
r = pool.apply_async(f, (5,), callback=on_success, error_callback=on_error)
pool.close()
pool.join()


# 输出:
# on_error: float division by zero
  • 捕获进程抛出的异常
1
2
3
4
5
6
7
8
9
10
11
if __name__ == '__main__':
pool = mp.Pool(os.cpu_count())
try:
r = pool.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9])
except ZeroDivisionError as e:
print(e)
pool.close()
pool.join()

# 输出:
# float division by zero