各种线程池实现

boost::threadpool

按照boost标准开发的第三方库。下载地址在boost::threadpool 使用方法较为简单。例子如下

#include <iostream>
#include "boost/bind.hpp"
#include "boost/threapool.hpp"
using namespace std;
using namespace threadpool;
void first_task()
{
    cout << "first task" << endl;
}
void second_task()
{
    cout << "second task" << endl;
}
void task_with_parameter(int value,string str)
{
    cout << "task with parameter,value is: " << value <<",str is: " << str;
}
int main()
{
    // 声明线程池
    pool thread_pool(2);
    // 向线程池中添加任务
    thread_pool.schedule(&first_task);
    // 等待线程函数执行完成
    thread_pool.wait();
    thread_pool.schedule(&second_task);
    thread_pool.schedule(boost::bind(task_with_parameter,8,"hello"));
    thread_pool.wait();
    return 0;
}

boost::threadpool 添加任务,同步方式都相对简单,在添加多参数的任务时候需要注意 boost::bind() 传递的参数是按照拷贝的方式传递的。如果想使用引用的方式传递的话,需要使用boost::ref() 或者boost::cref()。还有需要注意的是boost::bind()最多接受九个参数
boost::threadpool 是相对比较老的一个库,在2005-2008年间更新

boost::thread_group 以及io_service

例子

#include <boost/asio/io_service.hpp>
#include <boost::bind.hpp>
#include <boost::thread/thread_group.hpp>

boost::asio::io_service ioService;
boost::thread_group threadpool;
/*The work class is used to inform the io_service when
work starts and finishes. This ensures that the
io_service object's run() function will not exit
while work is underway, and that it does exit when
there is no unfinished work remaining
*/
/* 声明一个ioService work 的原因是为了保证io service
的run方法在这个work销毁之前不会退出
*/
boost::asio::io_service::work work(ioService);
// 放在for循环中,根据线程池中线程中个数创建
// ioService 可以理解为任务队列
//Run the io_service object's event processing loop.
threadpool.create_thread(boost::bind(&boost::asio::ioservice::run,&ioService));
threadpool.create_thread(boost::bind(&boost::asio::ioservice::run,&ioService));
// 向ioService 中提交任务
ioService.post(boost::bind(myTask,"hello world"));
ioService.post(boost::bind(myTask2,"clear"));
...
// ioService 在stop 之后,post到ioService中的task 都不会被执行
ioService.stop();
threadpool.join_all();

boost::ioservice 以及 boost::thread_pool 实现的thread pool没有提供wait() 方法,因此需要调用者主动判断所提交的任务有没有完成

基于c++11 实现的线程池

主要步骤如下:

  • 设定线程池中所提供的服务线程数
int threads = thread::hardware_concurrency();
  • 每个线程都应该执行一个无限循环,无限循环中等待新任务到达,并执行任务
vector<thread> pool;
for (int i = 0; i < threads; i++)
{
    pool.push_back(thread(Infinite_loop_function));
}
  • 无限循环function
while(true)
{
    {
        unique_lock<mutex> lock(queue_mutex);
        condition.wait(lock,[]{return !Queue.empty()});
        Task = Queue.front();
        Queue.pop();
    }
    Task();
}
  • 向任务队列中添加任务
void enqueue(function<void()> new_task)
{
    {
        unique_lock<mutex> lock(queue_mutex);
        Queue.push(new_task);
    }
    condition.notify_one();
}

具体实现例子

class ThreadPool {
public:
    ThreadPool(size_t threads) : stop(false)
    {
        for(size_t i = 0;i<threads;++i)
            workers.emplace_back(
                [this]
                {
                    for(;;)
                    {
                        std::function<void()> task;
                        {
                            std::unique_lock<std::mutex> lock(this->queue_mutex);
                            this->condition.wait(lock,
                                [this]{ return this->stop || !this->tasks.empty(); });
                            if(this->stop && this->tasks.empty())
                            return;
                            task = std::move(this->tasks.front());
                            this->tasks.pop();
                        }
                        task();
                    }
                }
                );
    }
	// add new work item to the pool
    void enqueue(std::function<void()>& task)
    {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
            if(stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            tasks.emplace(task);
        }
        condition.notify_one();
    }
    ~ThreadPool()
    {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for(std::thread &worker: workers)
            worker.join();
    }
private:
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

总结

从线程池的实现可以看到,线程在任务队列中获取任务以及向任务队列中提交任务都需要抢占队列的互斥锁,会造成时间损耗,尤其在***任务数多,每个任务需要的时间不是很长***的情况下,抢占任务队列互斥锁的时间损耗就显得更加明显。例如,在16核机器,线程池开启14个线程,向线程池中提交2000个task(每个task耗时1ms 左右)的情况下,向线程池提交任务所需时间约20ms。
因此,线程池的方式更适合***每个task消耗的时间比较长,任务数不是特别多的场景***