类定义如下
// ThreadPoolImp.h: interface for the ThreadPoolImp class. // //////////////////////////////////////////////////////////////////////
#if !defined(AFX_THREADPOOLIMP_H__82F4FC7E_2DB4_4D2A_ACC8_2EFC787CAE42__INCLUDED_) #define AFX_THREADPOOLIMP_H__82F4FC7E_2DB4_4D2A_ACC8_2EFC787CAE42__INCLUDED_
#if _MSC_VER > 1000 #pragma once #endif // _MSC_VER > 1000
#pragma warning( disable : 4705 4786)
#include <map> #include "AutoLock.h"
using namespace std;
class IJobDesc; class IWorker;
class CThreadPoolImp { public: class ThreadInfo { public: ThreadInfo() { m_hThread=0; m_bBusyWorking=false; } ThreadInfo(HANDLE handle, bool bBusy) { m_hThread=handle; m_bBusyWorking=bBusy; } ThreadInfo(const ThreadInfo& info) { m_hThread=info.m_hThread; m_bBusyWorking=info.m_bBusyWorking; } //////// HANDLE m_hThread; bool m_bBusyWorking; };
typedef map<DWORD,ThreadInfo> ThreadInfoMap; typedef ThreadInfoMap::iterator Iterator_ThreadInfoMap; friend static unsigned int CThreadPoolImp::ManagerProc(void* p); friend static unsigned int CThreadPoolImp::WorkerProc(void* p); protected: enum ThreadPoolStatus { BUSY, IDLE, NORMAL }; public: //interface to the outside void Start(unsigned short nStatic, unsigned short nmax); void Stop(bool bHash=false); void ProcessJob(IJobDesc* pJob, IWorker* pWorker) const;
//constructor and destructor CThreadPoolImp(); virtual ~CThreadPoolImp();
protected: //interfaces public: HANDLE GetMgrIoPort() const { return m_hMgrIoPort; } UINT GetMgrWaitTime() const { return 1000; } HANDLE GetWorkerIoPort() const { return m_hWorkerIoPort; }
private: static DWORD WINAPI ManagerProc(void* p); static DWORD WINAPI WorkerProc(void* p); protected: //manager thread HANDLE m_hMgrThread; HANDLE m_hMgrIoPort; protected: //configuration parameters mutable unsigned short m_nNumberOfStaticThreads; mutable unsigned short m_nNumberOfTotalThreads;
protected: //helper functions void AddThreads(); void RemoveThreads(); ThreadPoolStatus GetThreadPoolStatus(); void ChangeStatus(DWORD threadId, bool status); void RemoveThread(DWORD threadId);
protected: //all the work threads ThreadInfoMap m_threadMap; CCriticalSection m_arrayCs; HANDLE m_hWorkerIoPort; };
#endif // !defined(AFX_THREADPOOLIMP_H__82F4FC7E_2DB4_4D2A_ACC8_2EFC787CAE42__INCLUDED_)
实现如下
// ThreadPool.cpp: implementation of the CThreadPoolImp class. // //////////////////////////////////////////////////////////////////////
#include "stdafx.h" #include "ThreadPoolimp.h" #include "outdebug.h" #include <assert.h> #include "work.h"
#ifdef _DEBUG #undef THIS_FILE static char THIS_FILE[]=__FILE__; //#define new DEBUG_NEW #endif
CThreadPoolImp::CThreadPoolImp() { }
CThreadPoolImp::~CThreadPoolImp() {
}
void CThreadPoolImp::Start(unsigned short nStatic, unsigned short nMax) { assert(nMax>=nStatic); HANDLE hThread; DWORD nThreadId; m_nNumberOfStaticThreads=nStatic; m_nNumberOfTotalThreads=nMax;
//lock the resource CAutoLock AutoLock(m_arrayCs);
//create an IO port m_hMgrIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE, NULL, 0, 0); hThread = CreateThread( NULL, // SD 0, // initial stack size (LPTHREAD_START_ROUTINE)ManagerProc, // thread function (LPVOID)this, // thread argument 0, // creation option &nThreadId ); // thread identifier m_hMgrThread = hThread;
//now we start these worker threads m_hWorkerIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE, NULL, 0, 0); for(long n = 0; n < nStatic; n++) { hThread = CreateThread( NULL, // SD 0, // initial stack size (LPTHREAD_START_ROUTINE)WorkerProc, // thread function (LPVOID)this, // thread argument 0, // creation option &nThreadId ); ReportDebug("generate a worker thread handle id is %d.\n", nThreadId); m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false))); } }
void CThreadPoolImp::Stop(bool bHash) { CAutoLock Lock(m_arrayCs);
::PostQueuedCompletionStatus(m_hMgrIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF); WaitForSingleObject(m_hMgrThread, INFINITE); CloseHandle(m_hMgrThread); CloseHandle(m_hMgrIoPort);
//shut down all the worker threads UINT nCount=m_threadMap.size(); HANDLE* pThread = new HANDLE[nCount]; long n=0; ThreadInfo info; Iterator_ThreadInfoMap i=m_threadMap.begin(); while(i!=m_threadMap.end()) { ::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF); info=i->second; pThread[n++]=info.m_hThread; i++; }
DWORD rc=WaitForMultipleObjects(nCount, pThread, TRUE, 30000);//wait for 0.5 minutes, then start to kill threads CloseHandle(m_hWorkerIoPort); if(rc>=WAIT_OBJECT_0 && rc<WAIT_OBJECT_0+nCount) { for(unsigned int n=0;n<nCount;n++) { CloseHandle(pThread[n]); } } else if(rc==WAIT_TIMEOUT&&bHash) { //some threads not terminated, we have to stop them. DWORD exitCode; for(unsigned int i=0; i<nCount; i++) { if (::GetExitCodeThread(pThread[i], &exitCode)==STILL_ACTIVE) { TerminateThread(pThread[i], 99); } CloseHandle(pThread[i]); } } delete[] pThread; }
DWORD WINAPI CThreadPoolImp::ManagerProc(void* p) { //convert the parameter to the server pointer. CThreadPoolImp* pServer=(CThreadPoolImp*)p; HANDLE IoPort = pServer->GetMgrIoPort(); unsigned long pN1, pN2; OVERLAPPED* pOverLapped;
LABEL_MANAGER_PROCESSING: while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2, &pOverLapped, pServer->GetMgrWaitTime() )) { if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF) { return 0; } // else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE) // { // if(pN1!=0) // { // DWORD rc=::WaitForSingleObject((HANDLE)pN1,INFINITE); // if(rc=WAIT_OBJECT_0) // { // CloseHandle((HANDLE)pN1);//关闭该线程句柄 // } // ReportDebug("Wait a Thread Removed!\n"); // } // } else { ReportDebug("mgr events comes in!\n"); } }
//time out processing if (::GetLastError()==WAIT_TIMEOUT) { //time out processing ReportDebug("Time out processing!\n"); //the manager will take a look at all the worker's status. The if (pServer->GetThreadPoolStatus()==CThreadPoolImp::BUSY) pServer->AddThreads(); if (pServer->GetThreadPoolStatus()==CThreadPoolImp::IDLE) pServer->RemoveThreads();
goto LABEL_MANAGER_PROCESSING; } return 0; }
DWORD WINAPI CThreadPoolImp::WorkerProc(void* p) { //convert the parameter to the server pointer. CThreadPoolImp* pServer=(CThreadPoolImp*)p; HANDLE IoPort = pServer->GetWorkerIoPort(); unsigned long pN1, pN2; OVERLAPPED* pOverLapped;
DWORD threadId=::GetCurrentThreadId(); ReportDebug("worker thread id is %d.\n", threadId);
while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2, &pOverLapped, INFINITE )) { if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE) {
// CThreadPoolImp::Iterator_ThreadInfoMap it=pServer->m_threadMap.find(threadId); // if(it!=pServer->m_threadMap.end()) // { // ::PostQueuedCompletionStatus(pServer->m_hMgrIoPort, // (unsigned long)it->second.m_hThread, // 0, // (OVERLAPPED*)0xFFFFFFFE); pServer->RemoveThread(threadId); // ReportDebug("Try to Remove a Thread\n"); // } break; } else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF) { break; } else { ReportDebug("worker events comes in!\n"); //before processing, we need to change the status to busy. pServer->ChangeStatus(threadId, true); //retrieve the job description and agent pointer IWorker* pIWorker = reinterpret_cast<IWorker*>(pN1); IJobDesc* pIJob= reinterpret_cast<IJobDesc*>(pN2); pIWorker->ProcessJob(pIJob); pServer->ChangeStatus(threadId, false); } } return 0; }
void CThreadPoolImp::ChangeStatus(DWORD threadId, bool status) { CAutoLock CAutoLock(m_arrayCs);
//retrieve the current thread handle Iterator_ThreadInfoMap i; ThreadInfo info; i=m_threadMap.find(threadId); info=i->second; // m_threadMap.Lookup(threadId, info); info.m_bBusyWorking=status; m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(threadId, info)); }
void CThreadPoolImp::ProcessJob(IJobDesc* pJob, IWorker* pWorker) const { ::PostQueuedCompletionStatus(m_hWorkerIoPort, \ reinterpret_cast<DWORD>(pWorker), \ reinterpret_cast<DWORD>(pJob),\ NULL); }
void CThreadPoolImp::AddThreads() { HANDLE hThread; DWORD nThreadId; unsigned int nCount=m_threadMap.size(); unsigned int nTotal=min(nCount+2, m_nNumberOfTotalThreads); for(unsigned int i=0; i<nTotal-nCount; i++) { hThread = CreateThread( NULL, // SD 0, // initial stack size (LPTHREAD_START_ROUTINE)WorkerProc, // thread function (LPVOID)this, // thread argument 0, // creation option &nThreadId ); ReportDebug("generate a worker thread handle id is %d.\n", nThreadId); m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false))); } }
void CThreadPoolImp::RemoveThread(DWORD threadId) { CAutoLock lock(m_arrayCs); m_threadMap.erase(threadId); }
void CThreadPoolImp::RemoveThreads() { unsigned int nCount=m_threadMap.size(); unsigned int nTotal=max(nCount-2, m_nNumberOfStaticThreads); for(unsigned int i=0; i<nCount-nTotal; i++) { ::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFE); } }
CThreadPoolImp::ThreadPoolStatus CThreadPoolImp::GetThreadPoolStatus() { int nTotal = m_threadMap.size(); ThreadInfo info; int nCount=0; Iterator_ThreadInfoMap i=m_threadMap.begin(); while(i!=m_threadMap.end()) { info=i->second; if (info.m_bBusyWorking==true) nCount++; i++; } if ( nCount/(1.0*nTotal) > 0.8 ) return BUSY; if ( nCount/ (1.0*nTotal) < 0.2 ) return IDLE; return NORMAL; }

|