第十五章 进程池和线程池 - Huoke/Linux-net-Programma GitHub Wiki
在前面的章节中,我们通过动态创建子线程或者进程来实现并发服务器。这样做有如下缺点:
- 动态创建线程或者进程是比较耗费时间的,这将导致较慢的客户响应。
- 动态创建的子线程(进程)通常只用来为一个客户服务(除非我们特殊的处理),这将导致系统上产生大量的细微线程(或进程)。线程(或者进程)之间的切换将消耗大量的CPU时间。
- 动态创建的子进程是当前进程的完整映像。当前进程必须谨慎地管理它分配的文件描述符和堆内存等系统资源,否则进程可能复制这些资源,从而使系统的可用资源急剧下降,进而影响服务器的性能。 在前面简绍过的进程池和线程池可以解决上述问题。本章将分析两种“池”的细节,给出它们的通用实现,并分别用进程池和线程池来实现简单的并发服务器。
线程池和进程池相似,所以我们只以进程池为例进行介绍。如没有特殊声明,下面对进程池的讨论完全适用于线程池。
进程池是由服务器预先创建的一组子进程,这些子进程的数目在3~10个之间(当然,这只是典型情况)。比如httpd守护进程就是使用包含7个子进程的进程池来实现并发的。线程池中的线程数量应该和CPU数量差不多。(为什么呢?好好看看《现代操作系统》)
当有新的任务到来时,主进程将通过某种方式选择进程池中的某一个子进程来位置服务。相比于动态创建子进程,选择一个已经存在的子进程的代价显然要小得多。至于主进程选择哪个子进程来为新任务服务,则有两种方式:
- 主进程使用某种算法来主动选择子进程。最简单、最常用的算法是随机算法和Round Robin(轮流选取)算法,但更优秀、更智能的算法将任务在各个工作进程中更均匀地分配,从而减轻服务器整体压力。
- 主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。当有新的任务到来时,逐进程将任务添加到工作队列中。这将唤醒正在等待任务的子进程,不过只有一个子进程获得新任务的"接管权", 它可以从工作队列中取出任务并执行之,而其他子进程将继续睡眠在工作队列上。
当选择好子进程后,主进程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。最简单的方式是,在父进程和子进程之间预先建立好一条管道,然后通过管道来实现所有的进程间通信(当然,要预先定义好一套协议来规范管道的使用)。在父线程和子线程之间传递数据就要简单得多,因为我们可以把这些数据定义为全局的,那么它们本身就是被所有线程共享的。
综上讨论,我们将进程池的一般模型描绘为图15-1所示的形式。
新任务 通知机制
主进程 ————————>选择算法(随机还是轮流/还是工作队列) ————————> 进程池(子进程1、子进程2...)
在使用进程池处理多客户任务时,首先要考虑的一个问题是: 监听socket和连接socket 是否都是由主进程来统一管理。回想一下第八章介绍过的几种并发模式,其中半同步/半反应堆模式是主进程统一管理这两种socket的; 而图8-11所示的高效的半同步/半异步模式,以及领导者/追随者模式, 则是由主进程管理所有监听socket,而各个子进程分别管理属于自己的连接socket的,对于前一种情况,主进程接受新的连接以得到连接socket,然后它需要将该socket传递给子进程(对于线程池而言, 父线程将socket传递给子线程是很简单的,因为它们可以很容易地共享该socket。但对于进程池而言,我们必须使用13.9节介绍的方法来传递该socket)。后一种情况的灵活性更大一些,因为子进程可以自己调用accept来接受新的连接,这样父进程就无须向子进程传递socket,而只需简单地通知一声: “我检测到新的连接, 你来接受它”。
在4.6.1 小节中我们曾讨论过常连接,即一个客户的多次请求可以复用一个TCP连接。那么,在设计进程池时还需要考虑: 一个客户连接上的所有任务是否始终由一个子进程来处理。如果说客户任务时无状态的,那么我们可以考虑使用不同的子进程来为客户的不同请求服务,如图15-2所示:
👍 👎 💯 🔢 🥇
但如果客户任务是存在上下文关系的,则最好一直用同一个子进程来为它服务,否则实现起来比较麻烦,因为我们不得不在各个子进程之间传递上下文数据。在9.3.4小节中,我们讨论了epoll 的EPOLLONESHOT事件,这一事件能够确保一个客户连接在整个生命周期中仅被一个线程处理。
本节我们实现一个基于图 8-10 所示的半同步/半反应堆并发模式的线程池, 如代码清单 15-3 所示。相比代码清单15-1所示的进程池实现,该线程池的通用性高得多,因为它使用一个工作队列完全解除了主线程和工作线程的耦合关系。
主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。 不过,如果要将该线程池应用到实际服务器程序中,那么我们必须保证所有客户请求都是无状态的,因为同一个连接上的不同请求可能会由不同的线程处理。
// filename :ThreadPool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include “Locker.h”
/*线程池类, 将它定义为模板是为了代码复用。 模板参数T是任务类*/
template< typename T>
class ThreadPool
{
public:
/* 参数number是线程池中的数量, MAXREQ 是请求队列中最多允许的,等待处理的请求的数量 */
ThreadPool(int number = 8, int MAXREQ = 10000);
~ThreadPool();
/* 往请求队列中添加任务 */
bool Append(T *request);
private:
/* 工作线程运行的函数, 它不断从工作队列中取出任务并执行之
为啥是static 函数呢 ?
*/
static void* Worker(void* arg);
void Run();
private:
int m_threadNumber; /* 线程池中的线程数 */
int m_request; /* 请求队列中允许的最大请求数 */
pthread_t* m_threads;/* 描述线程池的数组,其大小为m_threadNumber */
std::list< T* > m_workerQueue; /* 请求队列 */
Locker m_queueLocker; /* 保护请求队列的互斥锁 */
sem m_queueStat; /* 是否有任务需要处理 */
bool m_stop; /* 是否结束线程 */
};
template<typename T>
ThreadPool<T>::ThreadPool(int number, int MAXREQ) :
m_threadNumber(number),m_request(MAXREQ), m_stop(false), m_threads(NULL)
{
if((m_threadNumber <= 0) || (m_request <=0)) {
throw std::exception();
}
m_threads = new pthread_t[m_request ];
if(! m_threads) {
throw std::exception();
}
/* 创建 thread_number 个线程,并将它们都设置为脱离线程 */
for(int i=0 ;i <m_threadNumber ; ++i) {
printf("create the %dth thread\n", i);
if(pthread_create(m_threads + i, NULL, Worker, this) !=0) {
delete [] m_threads;
throw std::exception();
}
if(pthread_detach(m_threads[i])) {
delete [] m_threads;
throw std::exception();
}
}
}
template<typename T>
ThreadPool<T>::~ThreadPool()
{
delete[] m_threads;
m_stop = true;
}
template<typename T>
bool ThreadPool<T>::Append(T* request)
{
/* 操作工作队列时一定要加锁, 因为它被所有线程共享 */
m_queueLocker.lock();
if( m_workerQueue.size() > m_request)
{
m_queueLocker.unlock();
return false;
}
m_workerQueue.push_back(request);
m_queueLocker.unlock();
m_queueStat.post();
return true;
}
template<typename T>
void* ThreadPool<T>::Worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
pool->Run();
return pool;
}
template<typename T>
void ThreadPool<T>::Run()
{
while( ! m_stop)
{
m_queueStat.wait();
m_queueLocker.lock();
if(m_workerQueue.empty()) {
m_queueLocker.unlock();
continue;
}
T* request = m_workerQueue.front();
m_workerQueue.pop_front();
m_queueLocker.unlock();
if (! request) {
continue;
}
request->process();
}
}
#endif //THREADPOOL_H
这里值得一提的是:在C++程序中使用pthread_create函数时,该函数的第三个参数必须指向一个静态函数。而要在一个静态函数中使用类的动态成员(包括成员函数和成员变量), 则只能通过如下两种方式来实现:
- 通过类的静态对象类调用。比如单利模式中,静态函数可以通过类的全局唯一实例来访问动态成员函数。
- 将类的对象作为参数传递给静态函数,然后在静态函数中引用这个对象,并调用其动态方法。 代码中使用的是第二种方法:将线程参数设置为this指针,然后在Worker函数中获取该指针并调用其动态方法Run()。