python 进程池/线程池

concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。

问题:有10万条url需要爬取,如何以比较简单的方式实现多进程/多线程爬取?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import logging
import requests
import datetime
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format=' %(asctime)s - %(levelname)s - %(message)s')


def __get_data(curr_id):
complete_url = 'http://www.baidu.com'
logging.info('当前id:{}'.format(curr_id))
try_times = 0
while try_times < 3:
try_times += 1
try:
response = requests.get(complete_url)
if str(response.status_code) == "200":
logging.info('请求成功')
break
except Exception as _e:
logging.exception(_e)


def id_gen():
i = 0
while i < 100000:
i += 1
yield i


if __name__ == '__main__':
start_time = datetime.datetime.now()
# 5个进程
# with ProcessPoolExecutor(5) as tp
# 5个线程
with ThreadPoolExecutor(5) as tp:
tp.map(__get_data, id_gen())
end_time = datetime.datetime.now()
delta = end_time - start_time
logging.info('共用{}秒'.format(delta.seconds))

此处,id_gen()函数是一个生成器,模拟10万条url。

map(fn, *iterables, timeout=None),第一个参数fn是线程执行的函数;第二个参数接受一个可迭代对象;第三个参数timeout跟wait()的timeout一样,但由于map是返回线程执行的结果,如果timeout小于线程执行时间会抛异常TimeoutError。

ProcessPoolExecutor在使用上和ThreadPoolExecutor大致是一样的,它们在futures中的方法也是相同的,但是对于map()方法ProcessPoolExecutor会多一个参数chunksize(ThreadPoolExecutor中这个参数没有任何作用),chunksize将迭代对象切成块,将其作为分开的任务提交给pool,对于很大的iterables,设置较大chunksize可以提高性能。

submit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time

def test(i):
print(i)
time.sleep(5)

if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=2)
for i in range(10):
executor.submit(test, i)

0
1
2
3
4
5
6
7
8
9

若test有多个参数,在submit里面也直接写多个参数即可;如test(a, b, c)executor.submit(test, a, b, c)