三、线程池的实现
一个线程池一般有一个任务队列,启动的各个线程从任务队列中竞争任务,得到的线程则进行处理:list<MyTask *> m_taskQueue;
任务队列由锁保护,使得线程安全:pthread_mutex_t m_queueMutex
任务队列需要条件变量来支持生产者消费者模式:pthread_cond_t m_cond
如果任务列表为空,则线程等待,等待中的线程个数为:m_numWaitThreads
需要一个列表来维护线程池中的线程:vector<MyThread *> m_threads
每个线程需要一个线程运行函数:
void * __thread_new_proc(void *p)
{
((MyThread *)p)->run();
return 0;
}
每个线程由MyThread类负责,主要函数如下:
int MyThread::start()
{
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
int ret = pthread_create(&m_thread, &attr, thread_func, args);
pthread_attr_destroy(&attr);
if(ret != 0)
return –1;
}
int MyThread::stop()
{
int ret = pthread_kill(m_thread, SIGINT);
if(ret != 0)
return –1;
}
int MyThread::join()
{
int ret = pthread_join(m_thread, NULL);
if(ret != 0)
return –1;
}
void MyThread::run()
{
while (false == m_bStop)
{
MyTask *pTask = m_threadPool->getNextTask();
if (NULL != pTask)
{
pTask->process();
}
}
}
线程池由MyThreadPool负责,主要函数如下:
int MyThreadPool::init()
{
pthread_condattr_t cond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
int ret = pthread_cond_init(&m_cond, &cond_attr);
pthread_condattr_destroy(&cond_attr);
if (ret_val != 0)
return –1;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
ret = pthread_mutex_init(&m_queueMutex, &attr);
pthread_mutexattr_destroy(&attr);
if (ret_val != 0)
return –1;
for (int i = 0; i< m_poolSize; ++i)
{
MyThread *thread = new MyThread(i+1, this);
m_threads.push_back(thread);
}
return 0;
}
int MyThreadPool::start()
{
int ret;
for (int i = 0; i< m_poolSize; ++i)
{
ret = m_threads[i]->start();
if (ret != 0)
break;
}
ret = pthread_cond_broadcast(&m_cond);
if(ret != 0)
return –1;
return 0;
}
void MyThreadPool::addTask(MyTask *ptask)
{
if (NULL == ptask)
return;
pthread_mutex_lock(&m_queueMutex);
m_taskQueue.push_back(ptask);
if (m_waitingThreadCount > 0)
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_queueMutex);
}
MyTask * MyThreadPool::getNextTask()
{
MyTask *pTask = NULL;
pthread_mutex_lock(&m_queueMutex);
while (m_taskQueue.begin() == m_taskQueue.end())
{
++m_waitingThreadCount;
pthread_cond_wait(&n_cond, &m_queueMutex);
--m_waitingThreadCount;
}
pTask = m_taskQueue.front();
m_taskQueue.pop_front();
pthread_mutex_unlock(&m_queueMutex);
return pTask;
}
其中每一个任务的执行由MyTask负责,其主要方法如下:
void MyTask::process()
{
//用read从客户端读取指令
//对指令进行处理
//用write向客户端写入结果
}
本文导航
- 第1页: 首页
- 第2页: 对Socket的封装
- 第3页: 线程池的实现
- 第4页: 连接池的实现
- 第5页: 监听线程的实现