新闻  |   论坛  |   博客  |   在线研讨会
C/C++手撕线程池(线程池的封装和实现)
电子禅石 | 2024-04-09 11:47:24    阅读:1201   发布文章

本文使用的源码地址:

https://github.com/SCljh/thread_pool


线程池描述


池式结构

  在计算机体系结构中有许多池式结构:内存池、数据库连接池、请求池、消息队列、

对象池等等。


  池式结构解决的主要问题为缓冲问题,起到的是缓冲区的作用。


线程池

 通过使用线程池,我们可以有效降低多线程操作中任务申请和释放产生的性能消耗。

    特别是当我们每个线程的任务处理比较快时,系统大部分性能消耗都花在了

    pthread_create以及释放线程的过程中。

那既然是这样的话,何不在程序开始运行阶段提前创建好一堆线程,

等我们需要用的时候只要去这一堆线程中领一个线程,用完了再放回去,

等程序运行结束时统一释放这一堆线程呢?按照这个想法,线程池出现了。


线程池解决的问题

解决任务处理

阻塞IO

解决线程创建于销毁的成本问题

管理线程

  线程池应用之一:日志存储


  在服务器保存日志至磁盘上时,性能往往压在磁盘读写上,

     而引入线程池利用异步解耦可以解决磁盘读写性能问题。


  线程池的主要作用:异步解耦

说了那么多线程池的优点,那接下来要做的就是手撕这诱人的玩意了。


朴实无华但不枯燥的代码(以c++为例)

本文主要讲解的是c++线程池的实现,C语言实现其实思想和c++是一致的,

具体的代码可见文章开头的链接。


线程池中比较关键的东西

  若想自己编写一个线程池框架,那么可以先关注线程池中比较关键的东西:


工作队列

任务队列

线程池的池

pthread_create中的回调函数

为什么说这些东西比较关键?因为这“大四元”基本上支撑起了整个线程池的框架。

而线程池的框架如下所示:

————————————————

线程池

 如图所示,我们将整个框架以及任务添加接口定义为线程池的“”,

那么在这个池子中重要的就是工作队列任务队列

以及决定工作队列中的thread到底应该工作还是休息的回调函数

  那么手撕线程池,我们就从这几个关键点入手。

工作队列

  worker队列,首先要有worker才有队列,我们首先定义worker结构体:

  可想而知,worker中要有create_pthread函数的id参数,

还需要有控制每一个worker live or die的标志terminate,

我们最好再设置一个标志表示这个worker是否在工作。

最后,我们要知道这个worker隶属于那个线程池(至于为什么下文将介绍)

struct NWORKER{
        pthread_t threadid;		//线程id
        bool terminate;			//是否需要结束该worker的标志
        int isWorking;			//该worker是否在工作
        ThreadPool *pool;		//隶属于的线程池
    }

任务队列

  任务队列就简单得多了,想想编程语言中的任务应该是什么?不就是函数嘛。

所以我们只需要定义一个函数该有的东西就行了。

struct NJOB{
        void (*func)(void *arg);     //任务函数
        void *user_data;			 //函数参数
    };

线程池本池

  对于一个线程池,任务队列和工作队列已经是必不可少的东西了,

那线程池还有需要哪些东西辅助它以达到它该有的功能呢?

  一说到线程,那处理好线程同步就是一个绕不开的话题,

那在线程池中我们需要处理的临界资源有哪些呢?

想想我们工作队列中的每个worker都在等待一个任务队列看其是否有任务到来,

所以很容易得出结论我们必须要在线程池中实现两把锁:

一把是用来控制对任务队列操作的互斥锁,

另一把是当任务队列有新任务时唤醒worker的条件锁。

  有了这两把锁,线程池中再加点必要的一些数字以及对线程池操作的函数,

那么这个类就写完了。实现代码如下:

class ThreadPool{
private:
    struct NWORKER{
        pthread_t threadid;
        bool terminate;
        int isWorking;
        ThreadPool *pool;
    } *m_workers;

    struct NJOB{
        void (*func)(void *arg);     //任务函数
        void *user_data;
    };
public:
    //线程池初始化
    //numWorkers:线程数量
    ThreadPool(int numWorkers, int max_jobs);
    //销毁线程池
    ~ThreadPool();
    //面向用户的添加任务
    int pushJob(void (*func)(void *data), void *arg, int len);

private:
    //向线程池中添加任务
    bool _addJob(NJOB* job);
    //回调函数
    static void* _run(void *arg);
    void _threadLoop(void *arg);

private:
    std::list<NJOB*> m_jobs_list;
    int m_max_jobs;		//任务队列中的最大任务数
    int m_sum_thread;	//worker总数
    int m_free_thread;		//空闲worker数
    pthread_cond_t m_jobs_cond;      //线程条件等待
    pthread_mutex_t m_jobs_mutex;  //为任务加锁防止一个任务
    被两个线程执行等其他情况
};

可以看到我们做了一些必要的封装,只给用户提供了构造函数、

析构函数以及添加任务的函数。这也是一个基本的线程池框架必要的接口。

 

回调函数

static?

  根据上方代码可以看见,回调函数为static函数。

我可不想在我使用使用回调函数的时候自动给我加上*this参数。

  首先回调函数是每个线程创建之后就开始执行的函数,

该函数作为pthread_create的第三个参数传入。我们来看看pthread_create的函数原型:

int pthread_create(pthread_t *tidp,const pthread_attr_t *attr,
void *(*start_rtn)(void*),void *arg);

注意到,此处的我们传入的回调函数必须是接受一个void*参数,

且返回类型为void*的函数。如果我们将回调函数写成线程池的普通成员函数,

那么c++会在这个函数参数前默认加上一个*this参数,

**这也是为什么我们能在成员函数中使用当前对象中的一些属性。

**然而就是这个原因,若我们传入的回调函数指针为类的成员函数,

那c++编译器会破坏我们的函数结构(因为给我们加了一个形参),

导致pthread_create的第三个参数不符合要求而报错:

NON-STATIC

看吧,编译器不让我们用non-static的成员函数作为回调函数传入pthread_create中。

其实在c++中,大多数回调函数都有这个要求。


  那为什么static就可以呢?


这是因为static函数为类的静态函数,当类的成员函数被static修饰后,

调用该函数将不会默认传递*this指针,

这也是为什么static成员函数中不能使用对象的非static属性:

你*this指针都没传我上哪去找你的对象?

函数本身

  在运行回调函数的时候,我们又想用对象里的东西(比如锁),

编译器又不让我们用,那怎么办?别忘了虽然static函数没有*this指针,

但它却可以有一个*void的参数啊。有了这个*void,我们还怕少一个*this指针?

我们可以先写一个static函数,将需要的对象指针通过形参传到这个函数里,

再在这个函数中通过这个对象调用成员函数的方法,就能使用这个对象的成员属性了。


  就像这样:

//run为static函数
void* ThreadPool::_run(void *arg) {
    NWORKER *worker = (NWORKER *)arg;
    worker->pool->_threadLoop(arg);
}
//threadLoop为普通成员函数
void ThreadPool::_threadLoop(void *arg) {
    //在这里就能直接用当前ThreadPool对象的东西了
}

 

至于threadLoop的实现,由于线程是要一直存在的,

一个while(true)的循环肯定少不了了。这个循环中具体做什么呢:

不断检查任务队列中是否有job:


如果有,则取出这个job,并将该job从任务队列中删除,且执行job中的func函数。

如果没有,调用pthread_cond_wait函数等待job到来时被唤醒。

若当前worker的terminate为真,则退出循环结束线程。

注意在对job操作前别忘了加锁,函数实现如下:

void ThreadPool::_threadLoop(void *arg) {
    NWORKER *worker = (NWORKER*)arg;
    while (1){
        //线程只有两个状态:执行\等待
        //查看任务队列前先获取锁
        pthread_mutex_lock(&m_jobs_mutex);
        //当前没有任务
        while (m_jobs_list.size() == 0) {
        	//检查worker是否需要结束生命
            if (worker->terminate) break;
            //条件等待直到被唤醒
            pthread_cond_wait(&m_jobs_cond,&m_jobs_mutex);
        }
        //检查worker是否需要结束生命
        if (worker->terminate){
            pthread_mutex_unlock(&m_jobs_mutex);
            break;
        }
        //获取到job后将该job从任务队列移出,免得其他worker过来重复做这个任务
        struct NJOB *job = m_jobs_list.front();
        m_jobs_list.pop_front();
		//对任务队列的操作结束,释放锁
        pthread_mutex_unlock(&m_jobs_mutex);

        m_free_thread--;
        worker->isWorking = true;
        //执行job中的func
        job->func(job->user_data);
        worker->isWorking = false;

        free(job->user_data);
        free(job);
    }

    free(worker);
    pthread_exit(NULL);
}


                        

原文链接:https://blog.csdn.net/ACMer_L/article/details/107578636

                     

                  

                        

————————————————


                        

               

                        



                        

原文链接:https://blog.csdn.net/ACMer_L/article/details/107578636


*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。

参与讨论
登录后参与讨论
属于自己的技术积累分享,成为嵌入式系统研发高手。
推荐文章
最近访客