python的多任务其实用了很久了,因为刚开始写代码的时候总是看网上说高并发、异步之类的,就觉得很高大上,所以刻意地去学过,后来在实际开发工作有过为了使用而使用,也有过真正因为性能问题而必须要使用。今天想把目前掌握的一些内容记录下来。
其实应该介绍一下网上流传甚广的“Python速度慢”和GIL,但是这两个话题在网上有非常多的文章讨论过,就不想再多写了。
Python多任务其实有多线程、多进程和协程三种实现方法,但是协程一般只在性能要求特别高的情况下使用,并且在实现上相对于多线程和多进程要复杂一些,所以不在这里写,以后单独为协程写一篇笔记。
多线程和多进程的适用场景
一句话总结就是:多线程适用于IO密集型的代码,多进程适用于CPU密集型的代码。
所谓IO密集型,就是代码中涉及的大量的磁盘、网络、数据库等数据交互。例如爬虫,涉及到大量的网络请求和磁盘读写操作,再比如远程数据库读写,也涉及到网络请求和磁盘读写, 还有就是内存中的内容写入到本地磁盘。
所谓CPU密集型,是指代码会进行大量的计算而导致占用大量CPU,比如像AI的算法(计算量大到CPU都不够,必须使用GPU了),或者是计算一个很大的数是不是素数(这是后面的一个例子)。
多线程
目标函数
在实际编写多线程之前,要先编写一个函数作为多任务的目标函数。在这里我以一个爬虫函数作为目标函数。
我在代码中使用cnblog作为爬取对象,cnblog是一个很不错的博客站点,代码仅用于展示功能,如果有读者想要尝试运行代码的话,希望不要过于频繁爬取他们的网站,以免给他们带来过大的请求负担。
以下代码写在blog_spider.py中
"""
爬取cnblog首页的的信息
"""
import requests
from bs4 import BeautifulSoup
# 定义需要爬取的url
urls = [f"https://www.cnblogs.com/sitehome/p/{page}" for page in range(1, 51)]
def craw(url):
"""爬取指定url的信息"""
content = requests.get(url).text
return content
def parse(html):
"""对给定的html进行解析"""
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]
多线程的简单实现
下面的代码中用单线程和多线程分别进行爬虫,对比耗时以观测性能上的差距
以下代码写在multi_thread.py中
"""
对比多线程和单线程在爬虫上的效率
"""
import time
import threading
from blog_spider import urls, craw
def single_thread():
"""
单线程爬虫
"""
for url in urls:
craw(url)
def multi_thread():
"""
多线程爬虫
"""
threads = []
for url in urls:
# target是目标函数,args是目标函数的参数所组成的一个元组,
threads.append(
threading.Thread(target=craw, args=(url,))
)
# 开始线程任务
for thread in threads:
thread.start()
# 阻塞主线程,直到所有的线程多执行完成
for thread in threads:
thread.join()
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print("单线程耗时:%s s" % (end - start))
start = time.time()
multi_thread()
end = time.time()
print("多线程耗时:%s s" % (end - start))
```
上面的代码中,thread.join()的作用是阻塞主线程,这样可以使用所有的子线程都运行完成后才结束主线程,避免当子线程还在执行的时候但是由于主线程的结束而被迫终止。
## 多线程中的资源竞争和线程锁
在使用多线程的时候经常会遇到资源竞争的问题,比如当多个子线程同时对一个变量进行计算,如果不加控制,最终的结果很可能就不是预期的。
下面以对一个数字进行大数量级次数的累加为例讲解这个问题。
其实逻辑很简单,将number初始化为0,然后for循环一百万次,每次对number执行+1操作,然后用两个子线程同时进行一样的操作,我们所预期的结果最终number应该等于2000000,但是由于资源竞争问题,所以不用线程锁(也叫互斥锁)加以控制,几乎不会得到正确的答案。
下面的样例代码中同时写了有互斥锁和没有互斥锁两种函数
以下代码写在multi_thread_lock.py中
```python
"""
对一个数字进行多次累加,可以观察到在多线程情况下,
如果不加互斥锁,可能会出现脏数据,
plus_with_lock是加了互斥锁的,
plus_without_lock是没有互斥锁的
"""
import threading
from concurrent.futures import ThreadPoolExecutor
number_with_lock = 0
number_without_lock = 0
lock = threading.Lock()
def plus_with_lock():
global number_with_lock
with lock:
for _ in range(1000000):
number_with_lock += 1
def plus_without_lock():
global number_without_lock
for _ in range(1000000):
number_without_lock += 1
if __name__ == '__main__':
t1 = threading.Thread(target=plus_with_lock,)
t2 = threading.Thread(target=plus_with_lock,)
t1.start()
t2.start()
t1.join()
t2.join()
print(number_with_lock)
t3 = threading.Thread(target=plus_without_lock,)
t4 = threading.Thread(target=plus_without_lock,)
t3.start()
t4.start()
t3.join()
t4.join()
print(number_without_lock)
线程池
根据我个人的实际使用经验来看,在应用多线程的时候,大部分情况下都是使用线程池,而不是像前面的两个案例那样手工控制每个线程的行为。使用线程池有两个好处:
- 降低性能消耗
创建线程这个动作会消耗一定的资源,像上面那样每次需要的时候都创建一个新的子线程,如果创建很多个子线程的话对性能有一定的影响 - 代码简单
线程池在代码实现上相对简单一点
下面是一个以爬虫为目标函数的线程池案例
以下代码写在multi_thread_pool.py中
from blog_spider import craw, parse, urls
from concurrent.futures import ThreadPoolExecutor
import concurrent
import time
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executer:
htmls = executer.map(craw, urls)
# map 方法
url_html_maps = list(zip(urls, htmls))
for url, html in url_html_maps:
print(url)
print(len(html))
end = time.time()
print("多线程爬虫耗时:%s " % (end - start))
with ThreadPoolExecutor(max_workers=5) as executer:
fs = {}
for url, html in url_html_maps:
future = executer.submit(parse, html)
fs[future] = url
for future in concurrent.futures.as_completed(fs):
# as_completed的作用是当fs中有任何一个future完成的时候会先返回,而不是顺序等待
# https://blog.csdn.net/panguangyuu/article/details/105335900
url = fs[future]
print(url, future.result())
```
在上面的代码中可以看出来,我比较喜欢配合with(上下文管理器)来使用线/进程池,因为这样不用手工管理创建和关闭线/进程池,代码更简单。
可以看到,ThreadPoolExecutor有map和submit两种运行子线程,map在代码上简单一些,适合提交线程后不用再对其进行操作和管理的情况,submit适合在线程提交后还要对其进行操作和管理的操作。个人感觉可以优先考虑使用map方法,如果map不能满足需求在考虑使用submit。
## 多线程的回调函数
ThreadPoolExecuter还有一个add_done_callback方法也是非常有用的,他可以为进程添加一个回调函数,当线程执行完成后可以触发这个回调函数,比如可以用于发送邮件、钉钉等消息通知。
这里做一个简单的示例
```python
from blog_spider import craw, parse, urls
from concurrent.futures import ThreadPoolExecutor
import concurrent
import time
def notify():
"""
模拟一个消息通知函数
"""
pass
with ThreadPoolExecutor(max_workers=5) as executer:
fs = {}
for url, html in url_html_maps:
future = executer.submit(parse, html)
fs[future] = url
for future in concurrent.futures.as_completed(fs):
future.add_done_callback(notify)
多进程
多进程和多线程在代码实现上是非常类似的,我正常也是配合with使用进程池,而不是手动控制每一个进程的创建和运行,所以只将进程池的用法。
下面的代码内容比较简单,也有详细的注释,就不多解释,只说两点:
- 代码中同时对比了单线程、多线程和多进程在CPU消耗型的场景下性能对比。
- 代码中调用了一个对代码执行时间计时的计数器。
"""
可以为函数计时的装饰器
"""
import time
def func_timer(function):
"""
:param function: function that will be timed
:return: duration
"""
def function_timer(*args, **kwargs):
t0 = time.time()
result = function(*args, **kwargs)
t1 = time.time()
print(
"[Function: {name} finished, spent time: {time:.4f}s]".format(
name=function.__name__, time=t1 - t0
)
)
return result
return function_timer
以下代码写在multi_process_pool.py中
"""
计算一个大数是不是一个素数,
这是一个CPU消耗型的代码,更适合多进程,
这段代码会对比单线程、多线程和多进程的性能区别
"""
"""
计算一个大数是不是一个素数,
这是一个CPU消耗型的代码,更适合多进程,
这段代码会对比单线程、多线程和多进程的性能区别
"""
import math
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from utils.function_timer import func_timer
def is_prime(n):
"""
判断一个数是不是素数,
n 要能走完所有的逻辑,这样才能消耗大量的CPU,
如果从中间某一步就结束的话,后面三中情况的对比结果可能就不是预期的那样
"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def single_thread(numbers):
for number in numbers:
is_prime(number)
def multi_thread(numbers):
with ThreadPoolExecutor(max_workers=10) as executer:
executer.map(is_prime, numbers)
def multi_process(numbers):
with ProcessPoolExecutor(max_workers=10) as executer:
executer.map(is_prime, numbers)
if __name__ == '__main__':
numbers_1 = [112272535095293] * 50 # 这个数会导致代码消耗大量CPU
numbers_2 = [112272535095290] * 50 # 这个数不是素数,在判断过程中就退出了,不会消耗大量CPU
single_thread(numbers_1)
multi_thread(numbers_1)
multi_process(numbers_1)
# 以下代码说明多进程只有在CPU消耗型的情况下才有优势
single_thread(numbers_2)
multi_thread(numbers_2)
multi_process(numbers_2)