您现在的位置是:网站首页> 编程资料编程资料
Python快速实现一个线程池的示例代码_python_
2023-05-26
344人已围观
简介 Python快速实现一个线程池的示例代码_python_
楔子
当有多个 IO 密集型的任务要被处理时,我们自然而然会想到多线程。但如果任务非常多,我们不可能每一个任务都启动一个线程去处理,这个时候最好的办法就是实现一个线程池,至于池子里面的线程数量可以根据业务场景进行设置。
比如我们实现一个有 10 个线程的线程池,这样可以并发地处理 10 个任务,每个线程将任务执行完之后,便去执行下一个任务。通过使用线程池,可以避免因线程创建过多而导致资源耗尽,而且任务在执行时的生命周期也可以很好地把控。
而线程池的实现方式也很简单,但这里我们不打算手动实现,因为 Python 提供了一个标准库 concurrent.futures,已经内置了对线程池的支持。所以本篇文章,我们就来详细介绍一下该模块的用法。
Future 对象
当我们往线程池里面提交一个函数时,会分配一个线程去执行,同时立即返回一个 Future 对象。通过 Future 对象可以监控函数的执行状态,有没有出现异常,以及有没有执行完毕等等。如果函数执行完毕,内部便会调用 future.set_result 将返回值设置到 future 里面,然后外界便可调用 future.result 拿到返回值。
除此之外 future 还可以绑定回调,一旦函数执行完毕,就会以 future 为参数,自动触发回调。所以 future 被称为未来对象,可以把它理解为函数的一个容器,当我们往线程池提交一个函数时,会立即创建相应的 future 然后返回。函数的执行状态什么的,都通过 future 来查看,当然也可以给它绑定一个回调,在函数执行完毕时自动触发。
那么下面我们就来看一下 future 的用法,文字的话理解起来可能有点枯燥。
""" 将函数提交到线程池里面运行时,会立即返回一个对象 这个对象就叫做 Future 对象,里面包含了函数的执行状态等等 当然我们也可以手动创建一个Future对象。 """ from concurrent.futures import Future # 创建 Future 对象 future future = Future() # 给 future 绑定回调 def callback(f: Future): print("当set_result的时候会执行回调,result:", f.result()) future.add_done_callback(callback) # 通过 add_done_callback 方法即可给 future 绑定回调 # 调用的时候会自动将 future 作为参数 # 如果需要多个参数,那么就使用偏函数 # 回调函数什么时候执行呢? # 显然是当 future 执行 set_result 的时候 # 如果 future 是向线程池提交函数时返回的 # 那么当函数执行完毕时会自动执行 future.set_result(xx) # 并将自身的返回设置进去 # 而这里的 future 是我们手动创建的,因此需要手动执行 future.set_result("嘿嘿") """ 当set_result的时候会执行回调,result: 嘿嘿 """需要注意的是:只能执行一次 set_result,但是可以多次调用 result 获取结果。
from concurrent.futures import Future future = Future() future.set_result("哼哼") print(future.result()) # 哼哼 print(future.result()) # 哼哼 print(future.result()) # 哼哼执行 future.result() 之前一定要先 set_result,否则会一直处于阻塞状态。当然 result 方法还可以接收一个 timeout 参数,表示超时时间,如果在指定时间内没有获取到值就会抛出异常。
提交函数自动创建 Future 对象
我们上面是手动创建的 Future 对象,但工作中很少会手动创建。我们将函数提交到线程池里面运行的时候,会自动创建 Future 对象并返回。这个 Future 对象里面就包含了函数的执行状态,比如此时是处于暂停、运行中还是完成等等,并且函数在执行完毕之后,还会调用 future.set_result 将自身的返回值设置进去。
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" # 创建一个线程池 # 里面还可以指定 max_workers 参数,表示最多创建多少个线程 # 如果不指定,那么每提交一个函数,都会为其创建一个线程 executor = ThreadPoolExecutor() # 通过 submit 即可将函数提交到线程池,一旦提交,就会立刻运行 # 因为开启了一个新的线程,主线程会继续往下执行 # 至于 submit 的参数,按照函数名,对应参数提交即可 # 切记不可写成task("古明地觉", 3),这样就变成调用了 future = executor.submit(task, "古明地觉", 3) # 由于函数里面出现了 time.sleep,并且指定的 n 是 3 # 所以函数内部会休眠 3 秒,显然此时处于运行状态 print(future) """ """ # 我们说 future 相当于一个容器,包含了内部函数的执行状态 # 函数是否正在运行中 print(future.running()) """ True """ # 函数是否执行完毕 print(future.done()) """ False """ # 主程序也 sleep 3 秒 time.sleep(3) # 显然此时函数已经执行完毕了 # 并且打印结果还告诉我们返回值类型是 str print(future) """ """ print(future.running()) """ False """ print(future.done()) """ True """ # 函数执行完毕时,会将返回值设置在 future 里 # 也就是说一旦执行了 future.set_result # 那么就表示函数执行完毕了,然后外界可以调用 result 拿到返回值 print(future.result()) """ 古明地觉 睡了 3 秒 """ 这里再强调一下 future.result(),这一步是会阻塞的,举个例子:
# 提交函数 future = executor.submit(task, "古明地觉", 3) start = time.perf_counter() future.result() end = time.perf_counter() print(end - start) # 3.00331525
可以看到,future.result() 这一步花了将近 3s。其实也不难理解,future.result() 是干嘛的?就是为了获取函数的返回值,可函数都还没有执行完毕,它又从哪里获取呢?所以只能先等待函数执行完毕,将返回值通过 set_result 设置到 future 里面之后,外界才能调用 future.result() 获取到值。
如果不想一直等待的话,那么在获取值的时候可以传入一个超时时间。
from concurrent.futures import ( ThreadPoolExecutor, TimeoutError ) import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() future = executor.submit(task, "古明地觉", 3) try: # 1 秒之内获取不到值,抛出 TimeoutError res = future.result(1) except TimeoutError: pass # 再 sleep 2 秒,显然函数执行完毕了 time.sleep(2) # 获取返回值 print(future.result()) """ 古明地觉 睡了 3 秒 """当然啦,这么做其实还不够智能,因为我们不知道函数什么时候执行完毕。所以最好的办法还是绑定一个回调,当函数执行完毕时,自动触发回调。
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" def callback(f): print(f.result()) executor = ThreadPoolExecutor() future = executor.submit(task, "古明地觉", 3) # 绑定回调,3 秒之后自动调用 future.add_done_callback(callback) """ 古明地觉 睡了 3 秒 """需要注意的是,在调用 submit 方法之后,提交到线程池的函数就已经开始执行了。而不管函数有没有执行完毕,我们都可以给对应的 future 绑定回调。
如果函数完成之前添加回调,那么会在函数完成后触发回调。如果函数完成之后添加回调,由于函数已经完成,代表此时的 future 已经有值了,或者说已经 set_result 了,那么会立即触发回调。
future.set_result 到底干了什么事情
当函数执行完毕之后,会执行 set_result,那么这个方法到底干了什么事情呢?

我们看到 future 有两个被保护的属性,分别是 _result 和 _state。显然 _result 用于保存函数的返回值,而 future.result() 本质上也是返回 _result 属性的值。而 _state 属性则用于表示函数的执行状态,初始为 PENDING,执行中为 RUNING,执行完毕时被设置为 FINISHED。
调用 future.result() 的时候,会判断 _state 的属性,如果还在执行中就一直等待。当 _state 为 FINISHED 的时候,就返回 _result 属性的值。
提交多个函数
我们上面每次只提交了一个函数,但其实可以提交任意多个,我们来看一下:
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() futures = [executor.submit(task, "古明地觉", 3), executor.submit(task, "古明地觉", 4), executor.submit(task, "古明地觉", 1)] # 此时都处于running print(futures) """ [, , ] """ time.sleep(3) # 主程序 sleep 3s 后 # futures[0]和futures[2]处于 finished # futures[1]仍处于 running print(futures) """ [, , ] """ 如果是多个函数,要如何拿到返回值呢?很简单,遍历 futures 即可。
executor = ThreadPoolExecutor() futures = [executor.submit(task, "古明地觉", 5), executor.submit(task, "古明地觉", 2), executor.submit(task, "古明地觉", 4), executor.submit(task, "古明地觉", 3), executor.submit(task, "古明地觉", 6)] for future in futures: print(future.result()) """ 古明地觉 睡了 5 秒 古明地觉 睡了 2 秒 古明地觉 睡了 4 秒 古明地觉 睡了 3 秒 古明地觉 睡了 6 秒 """
这里面有一些值得说一说的地方,首先 futures 里面有 5 个 future,记做 future1, future2, future3, future4, future5。
当使用 for 循环遍历的时候,实际上会依次遍历这 5 个 future,所以返回值的顺序就是我们添加的函数的顺序。由于 future1 对应的函数休眠了 5s,那么必须等到 5s 后,future1 里面才会有值。
但这五个函数是并发执行的,future2, future3, future4 由于只休眠了 2s, 4s, 3s,所以肯定会先执行完毕,然后执行 set_result,将返回值设置到对应的 future 里。
但 Python 的 for 循环不可能在第一次迭代还没有结束,就去执行第二次迭代。因为 futures 里面的几个 future 的顺序已经一开始就被定好了,只有当第一个 future.result() 执行完成之后,才会执行第二个 future.result(),以及第三个、第四个。
因此即便后面的函数已经执行完毕,但由于 for 循环的顺序,也只能等着,直到前面的 future.result() 执行完毕。所以当第一个 future.result() 结束时,后面三个 future.result() 会立刻输出,因为它们内部的函数已经执行结束了。
而最后一个 future,由于内部函数 sleep 了 6 秒,因此要再等待 1 秒,才会打印 future.result()。
使用 map 来提交多个函数
使用 submit 提交函数会返回一个 future,并且还可以给 future 绑定一个回调。但如果不关心回调的话,那么还可以使用 map 进行提交。
executor = ThreadPoolExecutor() # map 内部也是使用了 submit results = executor.map(task, ["古明地觉"] * 3, [3, 1, 2]) # 并且返回的是迭代器 print(results) """""" # 此时遍历得到的是不再是 future # 而是 future.result() for result in results: print(result) """ 古明地觉 睡了 3 秒 古明地觉 睡了 1 秒 古明地觉 睡了 2 秒 """
可以看到,当使用for循环的时候,map 执行的逻辑和 submit 是一样的。唯一的区别是,此时不需要再调用 result 了,因为返回的就是函数的返回值。
或者我们直接调用 list 也行。
executor = ThreadPoolExecutor() results = executor.map(task, ["古明地觉"] * 3, [3, 1, 2]) print(list(results)) """ ['古明地觉 睡了 3 秒', '古明地觉 睡了 1 秒', '古明地觉 睡了 2 秒'] """
results 是一个生成器,调用 list 的时候会将里面的值全部产出。由于 map 内部还是使用的 submit,然后通过 future.result() 拿到返回值,而耗时最长的函数需要 3 秒,因此这一步会阻塞 3 秒。3 秒过后,会打印所有函数的返回值。
按照顺序等待执行
上面在获取返回值的时候,是按照函数的提交顺序获取的。如果我希望哪个函数先执行完毕,就先获取哪个函数的返回值,该怎么做呢?
from concurrent.futures import ( ThreadPoolExecutor, as_completed ) import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() futures = [executor.submit(task, "古明地觉", 5), executor.submit(task, "古明地觉", 2), executor.submit(task, "古明地觉", 1), executor.submit(task, "古明地觉", 3), executor.submit(task, "古明地觉", 4)] for future in as_completed(futures): print(future.result()) """ 古明地觉 睡了 1 秒 古明地觉 睡了 2 秒 古明地觉 睡了 3 秒 古明地觉 睡了 4 秒 古明地觉 睡了 5 秒 """此时谁先完成,谁先返回。
取消一个函数的执行
我们通过 submit 可以将函数提交到线程池中执行,但如果我们想取消该怎么办呢?
executor = ThreadPoolExecutor() future1 = executor.submit(task, "古明地觉", 1) future2 = executo
相关内容
- python开发sdk模块的方法_python_
- Python用requests-html爬取网页的实现_python_
- Python实现智能贪吃蛇游戏的示例代码_python_
- Python argparse库的基本使用步骤_python_
- python测试开发django之使用supervisord 后台启动celery 服务(worker/beat)_python_
- python 通过dict(zip)和{}的方式构造字典的方法_python_
- python3.8+django2+celery5.2.7环境准备(python测试开发django)_python_
- Python详解复杂CSV文件处理方法_python_
- python中的集合及集合常用的使用方法_python_
- Python中.py程序在CMD控制台以指定虚拟环境运行_python_
点击排行
本栏推荐
