一、线程池概述
1. 线程池的设计初衷
线程池是一种池式结构 (内存池、消息队列也属于池式结构),主要解决缓存问题 ,起缓冲作用
C++在进行多线程的创建和销毁时,会有比较大的开销,特别是在进行比快的线程操作时,会把很大的时间消耗在线程的创建和销毁上
为了减少在程序中反复创建和销毁线程,就引入了线程池的概念
线程池是在程序启动时,就创建一定数量的线程,放入一个线程队列 中,当需要使用线程时,就从线程队列中取出一个线程,使用完毕后,再放回线程队列中
2. 线程池的主要作用
线程池可以实现异步解耦 ,将任务分解为多个子任务,然后将子任务分配给线程池中的线程执行,从而提高程序的执行效率
线程池的使用举例:
a. 服务器端处理客户端请求
当有客户端请求时,服务器端就会创建一个线程来处理客户端的请求,但是如果客户端的请求量很大,服务器端就会创建很多线程,这样会导致服务器端的性能下降
因此可以使用线程池来解决这个问题,将客户端的请求放入线程池中,线程池中的线程来处理客户端的请求。具体实现如下:
1. 创建一个线程池,设置线程池的大小为n 2.
当有客户端请求时,就将客户端的请求放入线程池中 3.
线程池中的线程来处理客户端的请求 4.
处理完客户端的请求后,将线程归还给线程池
b. 保存日志文件
在日志保存时,需要进行文件的读写操作,性能会压在磁盘上,可以采用线程池实现异步解耦来解决这个问题
二、C++多线程基础学习
1.
互斥锁解决多线程数据共享问题
多线程操作中,经常会需要在多个线程任务中同时使用同一资源(变量、文件等),如果不加锁往往会出现数据竞争 问题
数据竞争问题会导致数据不一致性 ,比如对于同一个变量a
,线程1对a
进行修改,但是还没结束单条指令的时候,线程2就开始对a
进行修改,那么最终a
的值就会出现混乱
下面举一个例子(Windows下),我们开启两个线程对同一个a
变量分别进行5000次的自增操作,然后打印a
的值,我们会发现a
的值并不是10000,而是一个小于10000的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <thread> using namespace std;int a = 0 ;void func () { for (int i = 0 ; i < 5000 ; i++) { a++; } } void testMultiThread () { thread t1 (func) ; thread t2 (func) ; t1.join (); t2.join (); cout << "final a:" << a << endl; } int main () { testMultiThread (); return 0 ; }
此时我们对两个线程共享的数据a
进行加锁,就可以保证当一个线程拿到a
变量的锁之后,另一个线程就无法对a
进行修改,直到第一个线程释放锁unlock,下一个线程才可以执行对应的操作,保证了共享数据的安全性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <iostream> #include <thread> #include <mutex> using namespace std;int a = 0 ;mutex mtx; void func () { for (int i = 0 ; i < 5000 ; i++) { mtx.lock (); a++; mtx.unlock (); } } void testMultiThread () { thread t1 (func) ; thread t2 (func) ; t1.join (); t2.join (); cout << "final a:" << a << endl; } int main () { testMultiThread (); return 0 ; }
可以看到,此时经过两个线程各自进行5000次的自增操作后,a
的值变为了10000,也就是实现了多线程对共享变量a
的安全操作
2. 死锁问题的出现
1)造成死锁的条件
造成死锁有四个必要条件 :
互斥 :一个资源每次只能被一个进程使用
持有和等待 :一个进程因请求资源而阻塞时,对已获得的资源保持不放
不可剥夺 :进程已获得的资源,在未使用完之前,不能被其他进程强行剥夺(用定时释放解决)
循环等待 :若干进程之间形成头尾相接的循环等待资源关系(通过顺序加锁减少出现概率)
2)造成死锁的情况及对应解决方法
造成死锁的情况可能有:
忘记释放锁
重复加锁
循环等待:两个线程分别在等待对方释放锁
对应的解决方法:
检查锁的释放
多把锁按顺序加锁
引入死锁检查模块
通过定时释放资源解决不可剥夺问题(设置过期时间 )
通过死锁检查工具检查:
CPP用gdb
+pstack
等工具
用pstack 进程pid
查看进程的堆栈信息,发现threadA和threadB一直在lock_wait
用gdb -p 进程pid
进入进程,然后用info thread
查看有多少个线程,最后用thread 线程id
切换线程并查看发生死锁的线程的堆栈信息
Go用pprof
工具
3)死锁中循环等待问题的例子
死锁问题是指两个或多个线程互相等待对方释放资源,导致程序无法继续执行的问题
举个例子,线程1和线程2分别都有两把锁mtx1
和mtx2
,线程1先对mtx1
加锁,线程2先对mtx2
加锁
然后线程1下一条指令是对mtx2
加锁,但是此时mtx2
已经被线程2加锁了,线程1就会等待线程2释放mtx2
的锁
同时线程2下一条指令是对mtx1
加锁,但是此时mtx1
已经被线程1加锁了,线程2就会等待线程1释放mtx1
的锁
这样就会导致线程1和线程2互相等待对方释放锁,导致程序一直卡着无法继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <iostream> #include <thread> #include <mutex> using namespace std;std::mutex mtx1, mtx2; void func1 () { for (int i = 0 ; i < 10 ; i++){ mtx1.lock (); mtx2.lock (); mtx2.unlock (); mtx1.unlock (); } } void func2 () { for (int i = 0 ; i < 10 ; i++){ mtx2.lock (); mtx1.lock (); mtx1.unlock (); mtx2.unlock (); } } void testMultiThread () { thread t1 (func1) ; thread t2 (func2) ; t1.join (); t2.join (); } int main () { testMultiThread (); return 0 ; }
因此,我们在使用多线程的时候,需要注意避免死锁问题的出现:在多个线程中,尽量不要同时对多个锁进行加锁,如果需要同时对多个锁进行加锁,需要保持多个线程中对锁的加锁顺序一致
同样采用上面的例子,我们可以将func1
和func2
中对锁的加锁顺序保持一致:
func1
中先对mtx1
加锁,此时由于mtx1
被加锁了,第二个线程拿不到mtx1
的锁,就会先等待mtx1
被释放,此时func1
就可以接着直接对mtx2
加锁
等到func1
释放mtx1
的锁后,func2
就可以开始对mtx1
加锁,然后等待func2
释放mtx2
的锁
通过两个线程对锁的加锁顺序保持一致 ,就可以避免死锁问题的出现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <iostream> #include <thread> #include <mutex> using namespace std;std::mutex mtx1, mtx2; void func1 () { for (int i = 0 ; i < 10 ; i++){ mtx1.lock (); mtx2.lock (); mtx2.unlock (); mtx1.unlock (); } } void func2 () { for (int i = 0 ; i < 10 ; i++){ mtx1.lock (); mtx2.lock (); mtx2.unlock (); mtx1.unlock (); } } void testMultiThread () { thread t1 (func1) ; thread t2 (func2) ; t1.join (); t2.join (); } int main () { testMultiThread (); return 0 ; }
3.
Linux下互斥锁和条件变量的使用
Linux下c++互斥锁和条件变量的使用,需要引入<pthread.h>
头文件,使用pthread_mutex_t
和pthread_cond_t
来定义互斥锁和条件变量
互斥锁 是对多线程共享资源的保护
条件变量 是当多个线程需要等待某个条件满足时 ,就可以使用条件变量来进行线程的等待(进入阻塞)和唤醒 。如果采用互车锁进行阻塞会造成死锁,所以加入条件变量来实现线程的等待和唤醒
pthread_mutex_t的简单使用
pthread_mutex_t
定义一个互斥锁
pthread_mutex_init
初始化互斥锁,传入两个参数:第一个参数是互斥锁的地址,第二个参数是互斥锁的属性,一般传入NULL
pthread_mutex_lock
实现加锁,传入一个参数:互斥锁的地址
pthread_mutex_unlock
实现解锁,传入一个参数:互斥锁的地址
1 2 3 4 5 6 7 8 9 10 #include <pthread.h> pthread_mutex_t mutex;pthread_mutex_init (&mutex, NULL );void func () { pthread_mutex_lock (&mutex); pthread_mutex_unlock (&mutex); }
pthread_cond_t的简单使用
pthread_cond_t
定义一个条件变量
pthread_cond_init
初始化条件变量,传入两个参数:第一个参数是条件变量的地址,第二个参数是条件变量的属性,一般传入NULL
pthread_cond_wait
实现线程的等待(阻塞),传入两个参数:第一个参数是条件变量的地址,第二个参数是互斥锁的地址
该函数执行后,获得信号(signal函数)之前,将一直被阻塞。
该函数会在被阻塞之前 以原子方式释放相关的互斥锁
并在被唤醒时 以原子方式再次获取该互斥锁
所以我们在下面的线程池中,虽然被阻塞的时候会被释放互斥锁,但是在被唤醒时会再次获取互斥锁,所以唤醒后需要进行解锁 操作
pthread_cond_signal
实现线程的唤醒,传入一个参数:条件变量的地址
pthread_cond_destroy
销毁条件变量,传入一个参数:条件变量的地址
pthread_cond_broadcast
唤醒所有等待在条件变量上的线程
线程A等待条件的伪代码
1 2 3 4 5 6 pthread_mutex_lock (&mutex); while ( **条件为假**) pthread_cond_wait (cond, mutex); **修改条件** pthread_mutex_unlock (&mutex);
线程B通知线程A的伪代码
1 2 3 4 pthread_mutex_lock (&mutex); 设置条件为真 pthread_cond_signal (cond); pthread_mutex_unlock (&mutex);
至于条件变量的实际应用将在后面线程池的实现中进行详细讲解
三、线程池的实现源码及解析(C++)
1.
明确目标:剖析线程池需要实现的模块(框架)
工作队列:控制线程池中的线程状态
任务队列:线程中的任务函数(任务对应的执行函数)
线程池控制管理:两把锁(一把控制操作的互斥锁 ,一把用于新任务加入时唤醒线程的条件锁 )
管理者线程:用于自动管理线程池中线程数量
pthread_create的回调函数:回调函数是每个线程创建之后就开始执行的函数,该函数作为pthread_create的第三个参数传入
1 2 3 int pthread_create (pthread_t *tidp,const pthread_attr_t *attr, void *(*start_rtn)(void *),void *arg) ;
pthread_create
函数的陷阱:函数原型中第三个参数为函数指针,指向处理线程函数的地址,该函数要求为静态函数,所以如果回调函数(worker)是类成员函数时,需要将worker函数设置为静态成员函数
第四个参数this
指针的陷阱:静态成员函数中没有this
指针,所以如果需要在静态成员函数中调用类的成员函数,需要将代表当前实例化类对象的this
指针作为参数传入
线程池中的生产者和消费者模型 :
线程池中关于线程创建、销毁等的操作:
pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
:创建线程
thread:指向线程标识符的指针
attr:指向线程属性的指针,一般为传递NULL作为默认属性
start_routine:线程运行函数的起始地址。线程函数的返回类型必须为void,且接受一个void 类型的参数。
arg:传递给线程函数的参数,通过void*进行传递
pthread_join(pthread_t thread, void **retval);
:等待线程结束
thread:线程标识符
retval:用户定义的指针,用来存储被等待线程的返回值
pthread_exit(void *retval);
:终止线程
pthread_detach(pthread_t thread);
:分离线程
thread:线程标识符
该函数的作用是将参数thread标识的线程的状态设置为分离状态,这样线程在终止时会自动释放所有资源,而不用在其他线程中对其进行回收
下面进行线程池的实现讲解,其中在Linux下编译运行使用Makefile
自动化脚本进行编译
对Makefile
基础使用的讲解可参考我的博客:WebServer学习2:从Config文件了解Makefile编译
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 CXX = g++ TARGET = test SRC = $(wildcard *.cpp) OBJS = $(patsubst %.cpp, %.o, $(SRC) ) CXXFLAGS = -c -Wall $(TARGET) : $(OBJS) $(CXX) -o $@ $^ %.o: %.cpp $(CXX) $(CXXFLAGS) $< -o $@ .PHONY : cleanclean: rm -f *.o $(TARGET)
2. 线程池中任务队列类的实现
2.1 单任务结构体的设计
首先设计一个单任务结构体,用于封装任务的回调(执行)函数指针和回调函数对应的参数
在C++中,函数指针的声明方式为:返回类型(*函数指针名)(参数类型1, 参数类型2, ...);
这里我们的线程池任务结构体中,函数指针类型声明:void(*)(void*)
void: 表示函数的返回类型为 void,即不返回任何值。
(*): 表示这是一个指针。
(void): 表示指针所指向的函数将接受一个 void
类型的参数。
采用using
的方式创建函数指针类型别名,方便后续使用(这里我们将别名定位callback
代表回调函数)
采用别名后使用函数的方式:callback func = &func_name;
,func(*args)
实现函数的调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 using callback = void (*)(void *);struct Task { callback function; void *arg; Task (){ function = nullptr ; arg = nullptr ; } Task (callback f, void *a){ function = f; arg = a; } };
2.2 任务队列类的设计
设计一个任务队列类,用于存储任务队列中的任务
任务队列 是一种需要先进先出 的数据结构,C++中有标准库中的queue
容器可以方便地实现队列的功能
其中任务队列对于线程池来说一般只有一个,所以我们需要在多线程消费者使用任务队列的时候,对共享的数据进行加锁保护
这里共享的数据就是TaskQueue类中的queue
容器,所以我们需要对queue
容器进行加锁保护,以保证多线程对queue
容器的安全操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <queue> #include <pthread.h> class TaskQueue {public : TaskQueue (); ~TaskQueue (); void addTask (Task &task) ; void addTask (callback function, void *arg) ; Task takeTask () ; int getTaskCount () { return m_queue.size (); } private : std::queue<Task> m_queue; pthread_mutex_t m_mutex; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 #include "TaskQueue.h" TaskQueue::TaskQueue () { pthread_mutex_init (&m_mutex, NULL ); } TaskQueue::~TaskQueue () { pthread_mutex_destroy (&m_mutex); } void TaskQueue::addTask (Task &task) { pthread_mutex_lock (&m_mutex); m_queue.push (task); pthread_mutex_unlock (&m_mutex); } void TaskQueue::addTask (callback function, void *arg) { pthread_mutex_lock (&m_mutex); m_queue.push (Task (function, arg)); pthread_mutex_unlock (&m_mutex); } Task TaskQueue::takeTask () { Task task; pthread_mutex_lock (&m_mutex); if (getTaskCount () > 0 ){ task = m_queue.front (); m_queue.pop (); } pthread_mutex_unlock (&m_mutex); return task; }
2.3 对当前的任务队列类进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include "TaskQueue.h" #include <iostream> void taskFunc (void *arg) { int num = *(int *)arg; std::cout << "thread " << pthread_self () << " is working, num = " << num << std::endl; } void testTaskQueue () { TaskQueue taskQ; for (int i = 0 ; i < 10 ; i++){ int *num = new int (i); Task task (taskFunc, num) ; taskQ.addTask (task); } for (int i = 0 ; i < 10 ; i++) { Task task = taskQ.takeTask (); task.function (task.arg); } } int main () { testTaskQueue (); return 0 ; }
测试结果:
3. 线程池的实现
首先需要设计线程池类的.h
声明文件,具体包括
线程池的基本参数(私有变量)
唯一的任务队列对象
线程池中的线程对象(工作线程和管理者线程)、以及关于线程安全的共享互斥锁和条件变量
线程池中的参数:线程池的大小、线程池中忙线程 与存活线程 的数量、管理者每次控制线程销毁/创建 的数量、线程池是否关闭
线程池的私有接口
静态函数:包括所有工作线程的回调函数worker 、唯一管理者线程的回调函数manager
成员函数:线程销毁
线程池的公有接口
类构造函数和析构函数
功能函数:添加任务、获取忙线程数、获取存活线程数
ThreadPool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #pragma once #include <pthread.h> #include <iostream> #include <cstring> #include <unistd.h> #include "TaskQueue.h" class ThreadPool {public : ThreadPool (int min, int max); ~ThreadPool (); void addTask (Task task) ; int getBusyNum () ; int getAliveNum () ; private : static void * worker (void *arg) ; static void * manger (void *arg) ; void threadExit () ; private : pthread_mutex_t m_mutex; pthread_cond_t m_cond; pthread_t *m_threadIds; pthread_t m_mangerID; TaskQueue *m_taskQ; int m_minThreads; int m_maxThreads; int m_busyThreads; int m_aliveThreads; int m_exitThreads; bool m_shutDown; static const int MangerCtlThreadNum = 2 ; };
3.1 线程池的构造函数和析构函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 #include "ThreadPool.h" ThreadPool::ThreadPool (int min, int max) { do { m_taskQ = new TaskQueue; m_minThreads = min; m_maxThreads = max; m_busyThreads = 0 ; m_aliveThreads = m_minThreads; m_shutDown = false ; m_threadIds = new pthread_t [m_maxThreads]; if (m_threadIds == nullptr ){ std::cout << "new pthread_t[] failed" << std::endl; break ; } memset (m_threadIds, 0 , sizeof (pthread_t )*m_maxThreads); if (pthread_mutex_init (&m_mutex, NULL ) != 0 || pthread_cond_init (&m_cond, NULL ) != 0 ){ std::cout << "init mutex or cond failed" << std::endl; break ; } for (int i = 0 ; i < m_minThreads; i++){ pthread_create (&m_threadIds[i], NULL , worker, this ); std::cout << "create thread ID: " << m_threadIds[i] << std::endl; } pthread_create (&m_mangerID, NULL , manger, this ); std::cout << "create manger thread ID: " << m_mangerID << std::endl; }while (0 ); if (m_taskQ) delete m_taskQ; if (m_threadIds) delete [] m_threadIds; } ThreadPool::~ThreadPool () { m_shutDown = true ; std::cout << "manger thread ID: " << m_mangerID << " is exiting" << std::endl; pthread_join (m_mangerID, NULL ); for (int i = 0 ; i < m_aliveThreads; i++){ pthread_cond_signal (&m_cond); } pthread_mutex_destroy (&m_mutex); pthread_cond_destroy (&m_cond); }
3.2
线程池的添加任务以及获取忙线程数和存活线程数
注意这三个函数中涉及的关键参数都是pool中多线程的共享数据 ,所以记得进行加锁保护
记得在添加任务后需要唤醒线程 ,让线程去取任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void ThreadPool::addTask (Task task) { if (m_shutDown){ return ; } m_taskQ->addTask (task); pthread_cond_signal (&m_cond); } int ThreadPool::getBusyNum () { int busyNum = 0 ; pthread_mutex_lock (&m_mutex); busyNum = m_busyThreads; pthread_mutex_unlock (&m_mutex); return busyNum; } int ThreadPool::getAliveNum () { int aliveNum = 0 ; pthread_mutex_lock (&m_mutex); aliveNum = m_aliveThreads; pthread_mutex_unlock (&m_mutex); return aliveNum; }
3.3 线程池中线程自我销毁的实现
线程自我销毁函数threadExit
,是在某个具体工作线程worker
中被调用的。
当threadExit
函数被调用时获取当前worker线程的ID
然后找到线程池数组中对应的位置 置为0 ,表示线程处于被销毁(不存活)状态
最后再调用pthread_exit(NULL)
函数实现执行当前程序的线程的自杀销毁
ThreadPool.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void ThreadPool::threadExit () { pthread_t tid = pthread_self (); for (int i = 0 ; i < m_maxThreads; i++){ if (m_threadIds[i] == tid){ m_threadIds[i] = 0 ; break ; } } pthread_exit (NULL ); }
3.4 线程池的工作线程
线程池中所有工作线程的工作机制都是一样的,所以我们直接将工作线程的回调函数同一为worker
,并在worker
中实现工作线程的工作机制:
worker
接受的参数是当前线程池对象,因为worker
本身是静态函数,又需要访问线程池类中的共享参数,所以需要将线程池对象传入
当当前线程中,判断出任务队列中没有任务时,就会阻塞等待 ,直到有新任务加入队列,就会被唤醒
其中阻塞的时候会释放互斥锁 ,唤醒后会再次获取互斥锁 ,所以唤醒后记得需要进行解锁操作
一般有两种情况会唤醒线程:1. 有新任务加入队列 2.
线程池销毁,需要唤醒线程自杀(包括管理者控制)
当出现第1种情况时,worker
被唤醒后会获取任务 并执行任务 ,然后再次阻塞等待
当出现第2种情况时,worker
被唤醒后会判断标志销毁线程的参数是否被设置了 ,如果是就会自杀销毁 (通过threadExit
函数自杀)
ThreadPool.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 void *ThreadPool::worker (void *arg) { ThreadPool* pool = static_cast <ThreadPool*>(arg); while (true ){ pthread_mutex_lock (&pool->m_mutex); while (pool->m_taskQ->getTaskCount () == 0 && !pool->m_shutDown){ std::cout << "thread " << pthread_self () << " is waiting" << std::endl; pthread_cond_wait (&pool->m_cond, &pool->m_mutex); if (pool->m_exitThreads > 0 ){ pool->m_exitThreads--; if (pool->m_aliveThreads > pool->m_minThreads){ pool->m_aliveThreads--; std::cout << "manger kills thread ID: " << pthread_self () << std::endl; pthread_mutex_unlock (&pool->m_mutex); pool->threadExit (); } } } if (pool->m_shutDown){ pthread_mutex_unlock (&pool->m_mutex); pool->threadExit (); } Task task = pool->m_taskQ->takeTask (); pool->m_busyThreads++; pthread_mutex_unlock (&pool->m_mutex); std::cout << "thread " << pthread_self () << " is working" << std::endl; task.function (task.arg); delete task.arg; task.arg = nullptr ; pthread_mutex_lock (&pool->m_mutex); std::cout << "thread " << pthread_self () << " is idle" << std::endl; pool->m_busyThreads--; pthread_mutex_unlock (&pool->m_mutex); } return nullptr ; }
3.5 线程池的管理者线程
管理者线程的工作机制是:根据线程池中的忙线程数和存活线程数,动态地管理线程池中的线程数量 ,manger只要在保证线程数不小于最小值,也不大于最大值的范围内,动态地控制线程数量就行
一个pool中只需要有唯一的一个管理者线程 ,管理者线程的回调函数设为manger
,并在manger
中实现管理者线程的工作机制:
manger
接受的参数同样是当前线程池对象,因为manger
本身是静态函数,所以需要将线程池对象传入
管理者线程根据一定的间隔时间t
(代码中设为3s)来轮询线程池的线程情况(每3s判断一次是否进行新增/销毁线程 )
扩增线程 :当任务数过多,线程池中的alive线程 较小不够用 时,创建线程
创建线程的条件:任务数task >
存活线程数(表示线程池不够用,需要扩大线程池),且存活线程数 <
最大线程数(表示线程池还能扩大)
管理者线程每次创建线程数量为2(本代码中)
创建线程直接从线程池数组 中找到一个空闲的位置 (值为0),直接调用pthread_create
创建线程,并更新存活线程数
销毁线程 :当线程池中忙的线程数过小 (线程池过于清闲了),且存活线程数大于最小线程数时(说明还没到最小线程数),销毁线程
销毁线程的条件:忙线程数*2 <
存活线程数(表示线程池冗余过大),且存活线程数 >
最小线程数(表示线程池还能缩小)
管理者线程每次销毁线程数量为2(本代码中),需要共享数据m_exitThreads
来通知工作线程销毁线程
销毁的方式是再管理者线程中唤醒空闲的线程,此时空闲线程的worker
会发现m_exitThreads
不为0,就会自杀销毁(也就是上述唤醒worker
线程的第2种情况 )
ThreadPool.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 void *ThreadPool::manger (void *arg) { ThreadPool* pool = static_cast <ThreadPool*>(arg); while (!pool->m_shutDown){ sleep (3 ); pthread_mutex_lock (&pool->m_mutex); int taskSize = pool->m_taskQ->getTaskCount (); int aliveNum = pool->m_aliveThreads; int busyNum = pool->m_busyThreads; pthread_mutex_unlock (&pool->m_mutex); if (taskSize > aliveNum && aliveNum < pool->m_maxThreads){ pthread_mutex_lock (&pool->m_mutex); int count = 0 ; for (int i = 0 ; i < pool->m_maxThreads && count < MangerCtlThreadNum; i++){ if (pool->m_threadIds[i] == 0 ){ pthread_create (&pool->m_threadIds[i], NULL , worker, pool); std::cout << "manger creates thread ID: " << pool->m_threadIds[i] << std::endl; count++; pool->m_aliveThreads++; } } pthread_mutex_unlock (&pool->m_mutex); } if (busyNum*2 < aliveNum && aliveNum > pool->m_minThreads){ pthread_mutex_lock (&pool->m_mutex); pool->m_exitThreads = MangerCtlThreadNum; pthread_mutex_unlock (&pool->m_mutex); for (int i = 0 ; i < MangerCtlThreadNum; i++){ pthread_cond_signal (&pool->m_cond); } } } return nullptr ; }
4. 线程池的测试
通过创建10
个Task的任务队列来测试线程池的工作情况(重点关注管理者的工作模式)
其中对于10
个Task的任务,在每次执行Task任务后都会sleep
2s
,所以如果在单线程模式下最少需要20s
才能执行完毕
因此我们将main
中创建完线程后的sleep时间也设为20s
,主要是为了观察采用线程池后的工作效率,同时也能保证main
函数不会提前结束,导致线程池资源被提前销毁
根据测试结果我们是可以看到采用线程池后,很快就会执行完所有Task,然后等待一段时间(等到main
中的20s延迟完后)整个程序才会结束,线程池的资源才会被销毁,说明通过线程池确实会节省时间,提高效率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <iostream> #include "TaskQueue.h" #include "ThreadPool.h" void taskFunc (void *arg) { int num = *(int *)arg; std::cout << "thread " << pthread_self () << " is working, num = " << num << std::endl; sleep (2 ); } void testTaskQueue () { TaskQueue taskQ; for (int i = 0 ; i < 10 ; i++){ int *num = new int (i); Task task (taskFunc, num) ; taskQ.addTask (task); } for (int i = 0 ; i < 10 ; i++) { Task task = taskQ.takeTask (); task.function (task.arg); } } void testThreadPool () { ThreadPool pool (3 , 12 ) ; for (int i = 0 ; i < 10 ; i++){ pool.addTask (Task (taskFunc, new int (i))); } sleep (20 ); } int main () { testThreadPool (); return 0 ; }
编译运行:
测试结果:
5. 本文线程池代码仓库
四、阻塞队列的实现
以异步日志 的实现为例,阻塞队列中,各个线程生产者 负责往阻塞队列中push
日志消息,消费者 线程负责从阻塞队列中pop
日志消息并写入日志文件
异步日志 中的消费者 为日志线程 ,因此日志线程 的worker
函数中需要不断地从阻塞队列中取出日志消息并写入日志文件。也就是worker
函数作为消费者 pop
队列中的数据时,遇到队列为空时需要通过条件变量阻塞等待 ,直到生产者 线程往队列中push
数据后唤醒日志线程 ,继续pop
队列中的数据写进日志文件缓冲区中。
具体以异步日志 为例的阻塞队列 的实现细节参考本人的另一篇关于WebServer的博客:
五、线程池work stealing优化
未完待续...
六、Reference