"); //-->
本文使用的源码地址:
https://github.com/SCljh/thread_pool
线程池描述
池式结构
在计算机体系结构中有许多池式结构:内存池、数据库连接池、请求池、消息队列、
对象池等等。
池式结构解决的主要问题为缓冲问题,起到的是缓冲区的作用。
线程池
通过使用线程池,我们可以有效降低多线程操作中任务申请和释放产生的性能消耗。
特别是当我们每个线程的任务处理比较快时,系统大部分性能消耗都花在了
pthread_create以及释放线程的过程中。
那既然是这样的话,何不在程序开始运行阶段提前创建好一堆线程,
等我们需要用的时候只要去这一堆线程中领一个线程,用完了再放回去,
等程序运行结束时统一释放这一堆线程呢?按照这个想法,线程池出现了。
线程池解决的问题
解决任务处理
阻塞IO
解决线程创建于销毁的成本问题
管理线程
线程池应用之一:日志存储
在服务器保存日志至磁盘上时,性能往往压在磁盘读写上,
而引入线程池利用异步解耦可以解决磁盘读写性能问题。
线程池的主要作用:异步解耦
说了那么多线程池的优点,那接下来要做的就是手撕这诱人的玩意了。
朴实无华但不枯燥的代码(以c++为例)
本文主要讲解的是c++线程池的实现,C语言实现其实思想和c++是一致的,
具体的代码可见文章开头的链接。
线程池中比较关键的东西
若想自己编写一个线程池框架,那么可以先关注线程池中比较关键的东西:
工作队列
任务队列
线程池的池
pthread_create中的回调函数
为什么说这些东西比较关键?因为这“大四元”基本上支撑起了整个线程池的框架。
而线程池的框架如下所示:
————————————————
工作队列
worker队列,首先要有worker才有队列,我们首先定义worker结构体:
可想而知,worker中要有create_pthread函数的id参数,
还需要有控制每一个worker live or die的标志terminate,
我们最好再设置一个标志表示这个worker是否在工作。
最后,我们要知道这个worker隶属于那个线程池(至于为什么下文将介绍)
struct NWORKER{ pthread_t threadid; //线程id bool terminate; //是否需要结束该worker的标志 int isWorking; //该worker是否在工作 ThreadPool *pool; //隶属于的线程池 }任务队列
struct NJOB{ void (*func)(void *arg); //任务函数 void *user_data; //函数参数 };
线程池本池
对于一个线程池,任务队列和工作队列已经是必不可少的东西了,
那线程池还有需要哪些东西辅助它以达到它该有的功能呢?
一说到线程,那处理好线程同步就是一个绕不开的话题,
那在线程池中我们需要处理的临界资源有哪些呢?
想想我们工作队列中的每个worker都在等待一个任务队列看其是否有任务到来,
所以很容易得出结论我们必须要在线程池中实现两把锁:
一把是用来控制对任务队列操作的互斥锁,
另一把是当任务队列有新任务时唤醒worker的条件锁。
有了这两把锁,线程池中再加点必要的一些数字以及对线程池操作的函数,
那么这个类就写完了。实现代码如下:
class ThreadPool{ private: struct NWORKER{ pthread_t threadid; bool terminate; int isWorking; ThreadPool *pool; } *m_workers; struct NJOB{ void (*func)(void *arg); //任务函数 void *user_data; }; public: //线程池初始化 //numWorkers:线程数量 ThreadPool(int numWorkers, int max_jobs); //销毁线程池 ~ThreadPool(); //面向用户的添加任务 int pushJob(void (*func)(void *data), void *arg, int len); private: //向线程池中添加任务 bool _addJob(NJOB* job); //回调函数 static void* _run(void *arg); void _threadLoop(void *arg); private: std::list<NJOB*> m_jobs_list; int m_max_jobs; //任务队列中的最大任务数 int m_sum_thread; //worker总数 int m_free_thread; //空闲worker数 pthread_cond_t m_jobs_cond; //线程条件等待 pthread_mutex_t m_jobs_mutex; //为任务加锁防止一个任务 被两个线程执行等其他情况 };
可以看到我们做了一些必要的封装,只给用户提供了构造函数、
析构函数以及添加任务的函数。这也是一个基本的线程池框架必要的接口。
回调函数
static?
根据上方代码可以看见,回调函数为static函数。
我可不想在我使用使用回调函数的时候自动给我加上*this参数。
首先回调函数是每个线程创建之后就开始执行的函数,
该函数作为pthread_create的第三个参数传入。我们来看看pthread_create的函数原型:
int pthread_create(pthread_t *tidp,const pthread_attr_t *attr, void *(*start_rtn)(void*),void *arg);
注意到,此处的我们传入的回调函数必须是接受一个void*参数,
且返回类型为void*的函数。如果我们将回调函数写成线程池的普通成员函数,
那么c++会在这个函数参数前默认加上一个*this参数,
**这也是为什么我们能在成员函数中使用当前对象中的一些属性。
**然而就是这个原因,若我们传入的回调函数指针为类的成员函数,
那c++编译器会破坏我们的函数结构(因为给我们加了一个形参),
导致pthread_create的第三个参数不符合要求而报错:
看吧,编译器不让我们用non-static的成员函数作为回调函数传入pthread_create中。
其实在c++中,大多数回调函数都有这个要求。
那为什么static就可以呢?
这是因为static函数为类的静态函数,当类的成员函数被static修饰后,
调用该函数将不会默认传递*this指针,
这也是为什么static成员函数中不能使用对象的非static属性:
你*this指针都没传我上哪去找你的对象?
函数本身
在运行回调函数的时候,我们又想用对象里的东西(比如锁),
编译器又不让我们用,那怎么办?别忘了虽然static函数没有*this指针,
但它却可以有一个*void的参数啊。有了这个*void,我们还怕少一个*this指针?
我们可以先写一个static函数,将需要的对象指针通过形参传到这个函数里,
再在这个函数中通过这个对象调用成员函数的方法,就能使用这个对象的成员属性了。
就像这样:
//run为static函数 void* ThreadPool::_run(void *arg) { NWORKER *worker = (NWORKER *)arg; worker->pool->_threadLoop(arg); } //threadLoop为普通成员函数 void ThreadPool::_threadLoop(void *arg) { //在这里就能直接用当前ThreadPool对象的东西了 }
至于threadLoop的实现,由于线程是要一直存在的,
一个while(true)的循环肯定少不了了。这个循环中具体做什么呢:
不断检查任务队列中是否有job:
如果有,则取出这个job,并将该job从任务队列中删除,且执行job中的func函数。
如果没有,调用pthread_cond_wait函数等待job到来时被唤醒。
若当前worker的terminate为真,则退出循环结束线程。
注意在对job操作前别忘了加锁,函数实现如下:
void ThreadPool::_threadLoop(void *arg) { NWORKER *worker = (NWORKER*)arg; while (1){ //线程只有两个状态:执行\等待 //查看任务队列前先获取锁 pthread_mutex_lock(&m_jobs_mutex); //当前没有任务 while (m_jobs_list.size() == 0) { //检查worker是否需要结束生命 if (worker->terminate) break; //条件等待直到被唤醒 pthread_cond_wait(&m_jobs_cond,&m_jobs_mutex); } //检查worker是否需要结束生命 if (worker->terminate){ pthread_mutex_unlock(&m_jobs_mutex); break; } //获取到job后将该job从任务队列移出,免得其他worker过来重复做这个任务 struct NJOB *job = m_jobs_list.front(); m_jobs_list.pop_front(); //对任务队列的操作结束,释放锁 pthread_mutex_unlock(&m_jobs_mutex); m_free_thread--; worker->isWorking = true; //执行job中的func job->func(job->user_data); worker->isWorking = false; free(job->user_data); free(job); } free(worker); pthread_exit(NULL); }
原文链接:https://blog.csdn.net/ACMer_L/article/details/107578636
————————————————
原文链接:https://blog.csdn.net/ACMer_L/article/details/107578636
*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。