(编辑:jimmy 日期: 2025/1/11 浏览:2)
线程池的使用
线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
Exectuor 提供了如下常用方法:
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
实际上,在 Java 的多线程编程中同样有 Future,此处的 Future 与 Java 的 Future 大同小异。
Future 提供了如下方法:
在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下:
a、调用 ThreadPoolExecutor 类的构造器创建一个线程池。
b、定义一个普通函数作为线程任务。
c、调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
d、当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
下面程序示范了如何使用线程池来执行线程任务:
from concurrent.futures import ThreadPoolExecutor import threading import time # 定义一个准备作为线程任务的函数 def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ' ' + str(i)) my_sum += i return my_sum # 创建一个包含2条线程的线程池 pool = ThreadPoolExecutor(max_workers=2) # 向线程池提交一个task, 50会作为action()函数的参数 future1 = pool.submit(action, 50) # 向线程池再提交一个task, 100会作为action()函数的参数 future2 = pool.submit(action, 100) # 判断future1代表的任务是否结束 print(future1.done()) time.sleep(3) # 判断future2代表的任务是否结束 print(future2.done()) # 查看future1代表的任务返回的结果 print(future1.result()) # 查看future2代表的任务返回的结果 print(future2.result()) # 关闭线程池 pool.shutdown()
上面程序中,第 13 行代码创建了一个包含两个线程的线程池,接下来的两行代码只要将 action() 函数提交(submit)给线程池,该线程池就会负责启动线程来执行 action() 函数。这种启动线程的方法既优雅,又具有更高的效率。
当程序把 action() 函数提交给线程池时,submit() 方法会返回该任务所对应的 Future 对象,程序立即判断 futurel 的 done() 方法,该方法将会返回 False(表明此时该任务还未完成)。接下来主程序暂停 3 秒,然后判断 future2 的 done() 方法,如果此时该任务已经完成,那么该方法将会返回 True。
程序最后通过 Future 的 result() 方法来获取两个异步任务返回的结果。
读者可以自己运行此代码查看运行结果,这里不再演示。
当程序使用 Future 的 result() 方法来获取结果时,该方法会阻塞当前线程,如果没有指定 timeout 参数,当前线程将一直处于阻塞状态,直到 Future 代表的任务返回。
获取执行结果
前面程序调用了 Future 的 result() 方法来获取线程任务的运回值,但该方法会阻塞当前主线程,只有等到钱程任务完成后,result() 方法的阻塞才会被解除。
如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
下面程序使用 add_done_callback() 方法来获取线程任务的返回值:
from concurrent.futures import ThreadPoolExecutor import threading import time # 定义一个准备作为线程任务的函数 def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ' ' + str(i)) my_sum += i return my_sum # 创建一个包含2条线程的线程池 with ThreadPoolExecutor(max_workers=2) as pool: # 向线程池提交一个task, 50会作为action()函数的参数 future1 = pool.submit(action, 50) # 向线程池再提交一个task, 100会作为action()函数的参数 future2 = pool.submit(action, 100) def get_result(future): print(future.result()) # 为future1添加线程完成的回调函数 future1.add_done_callback(get_result) # 为future2添加线程完成的回调函数 future2.add_done_callback(get_result) print('--------------')
上面主程序分别为 future1、future2 添加了同一个回调函数,该回调函数会在线程任务结束时获取其返回值。
主程序的最后一行代码打印了一条横线。由于程序并未直接调用 future1、future2 的 result() 方法,因此主线程不会被阻塞,可以立即看到输出主线程打印出的横线。接下来将会看到两个新线程并发执行,当线程任务执行完成后,get_result() 函数被触发,输出线程任务的返回值。
另外,由于线程池实现了上下文管理协议(Context Manage Protocol),因此,程序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池,如上面的程序所示。
此外,Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1) 方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。
例如,如下程序使用 Executor 的 map() 方法来启动线程,并收集线程任务的返回值:
from concurrent.futures import ThreadPoolExecutor import threading import time # 定义一个准备作为线程任务的函数 def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ' ' + str(i)) my_sum += i return my_sum # 创建一个包含4条线程的线程池 with ThreadPoolExecutor(max_workers=4) as pool: # 使用线程执行map计算 # 后面元组有3个元素,因此程序启动3条线程来执行action函数 results = pool.map(action, (50, 100, 150)) print('--------------') for r in results: print(r)
上面程序使用 map() 方法来启动 3 个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。
运行上面程序,同样可以看到 3 个线程并发执行的结果,最后通过 results 可以看到 3 个线程任务的返回结果。
通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致。也就是说,上面 results 的第一个元素是 action(50) 的结果,第二个元素是 action(100) 的结果,第三个元素是 action(150) 的结果。
实例扩展:
# coding:utf-8 import Queue import threading import sys import time import math class WorkThread(threading.Thread): def __init__(self, task_queue): threading.Thread.__init__(self) self.setDaemon(True) self.task_queue = task_queue self.start() self.idle = True def run(self): sleep_time = 0.01 # 第1次无任务可做时休息10毫秒 multiply = 0 while True: try: # 从队列中取一个任务 func, args, kwargs = self.task_queue.get(block=False) self.idle = False multiply = 0 # 执行之 func(*args, **kwargs) except Queue.Empty: time.sleep(sleep_time * math.pow(2, multiply)) self.idle = True multiply += 1 continue except: print sys.exc_info() raise class ThreadPool: def __init__(self, thread_num=10, max_queue_len=1000): self.max_queue_len = max_queue_len self.task_queue = Queue.Queue(max_queue_len) # 任务等待队列 self.threads = [] self.__create_pool(thread_num) def __create_pool(self, thread_num): for i in xrange(thread_num): thread = WorkThread(self.task_queue) self.threads.append(thread) def add_task(self, func, *args, **kwargs): '''添加一个任务,返回任务等待队列的长度 调用该方法前最后先调用isSafe()判断一下等待的任务是不是很多,以防止提交的任务被拒绝 ''' try: self.task_queue.put((func, args, kwargs)) except Queue.Full: raise # 队列已满时直接抛出异常,不给执行 return self.task_queue.qsize() def isSafe(self): '''等待的任务数量离警界线还比较远 ''' return self.task_queue.qsize() < 0.9 * self.max_queue_len def wait_for_complete(self): '''等待提交到线程池的所有任务都执行完毕 ''' #首先任务等待队列要变成空 while not self.task_queue.empty(): time.sleep(1) # 其次,所以计算线程要变成idle状态 while True: all_idle = True for th in self.threads: if not th.idle: all_idle = False break if all_idle: break else: time.sleep(1) if __name__ == '__main__': def foo(a, b): print a + b time.sleep(0.01) thread_pool = ThreadPool(10, 100) '''在Windows上测试不通过,Windows上Queue.Queue不是线程安全的''' size = 0 for i in xrange(10000): try: size = thread_pool.add_task(foo, i, 2 * i) except Queue.Full: print 'queue full, queue size is ', size time.sleep(2)