计算机网络试验要求写一个文件传输程序。由于以前自己写的线程类和SOCKET类丢失掉了(寝室电脑被盗 ),现在重写这几个类,干脆就写了一个线程池,用的是C++STL和纯API。而且,为了保证这个线程类本身是线程安全的,我还使用了WinApi中的互斥量。同时仿照C#的类库,在线程类中加入了Join方法。调用线程对象Join方法的线程将等待线程对象直到执行完毕。以下是源代码。 /////////////////////////////////MyThread.h////////////////////////////////////// #ifndef _MYTHREAD_H_ #define _MYTHREAD_H_ #include "MyException.h" #include <windows.h> #include <list> using namespace std; //前向声明 class CMyThread;//线程类 class CMyThreadPool;//线程池类 class CMyThreadExp;//线程异常类 #define MyThreadProc LPTHREAD_START_ROUTINE //线程状态枚举 enum MyThreadStatus { THREADRUN,//运行 THREADPAUSE,//暂停 }; //线程类,可动态更换执行函数 class CMyThread { struct MyThreadParam { LPTHREAD_START_ROUTINE proc;//用户线程函数 void *lpParam;//用户线程参数 CMyThread *pCMyThread;//线程类对象 }; public: //构造,proc线程函数 CMyThread(); //析构 ~CMyThread(); //运行线程 bool Run(); //暂停线程 bool Pause(); //调用Join的线程将阻塞,直到该线程执行完毕 void Join(); //设置线程运行的函数,和要传给线程的参数 void SetProc(LPTHREAD_START_ROUTINE proc, void* lpParam); protected: CMyThread(CMyThread&) {} //线程实际运行的函数 static DWORD WINAPI RealThreadProc(void* lpParam); friend class CMyThreadPool; protected: HANDLE m_hThread; DWORD m_id; HANDLE m_hEvt; MyThreadParam m_param; MyThreadStatus m_status; HANDLE m_hMtx; }; //线程池类 class CMyThreadPool { public: //构造,initNum初始情况线程数量 CMyThreadPool(int initNum); //析构 ~CMyThreadPool(); //申请线程进行工作,proc工作函数,lpParam工作函数参数,run是否立即运行,返回线程ID DWORD DoWork(LPTHREAD_START_ROUTINE proc, void *lpParam, bool run=true); //运行线程,id线程ID bool Run(DWORD id); //暂停运行线程,id线程ID bool Pause(DWORD id); //调整线程池大小为size,返回调整后线程池大小 unsigned SetSize(unsigned size);//when decrease num, its dangerous protected: list<CMyThread*> m_lst; }; //线程异常种类枚举 enum EnumMyThreadExp { EXPCREATETHREAD = 0, EXPTERMINATETHREAD, EXPRESUMETHREAD, EXPSUSPENDTHREAD, EXPCREATEEVENT, EXPCREATEMUTEX, }; //线程异常类 class CMyThreadExp : public CMyException { public: CMyThreadExp(EnumMyThreadExp exp, CMyThread *pobj); ~CMyThreadExp(); void GetInfo(); EnumMyThreadExp m_exp; CMyThread *m_pobj; }; #endif//_MYTHREAD_H_ //////////////////////MyThread.cpp////////////////////////////////// #include "mythread.h" #include <iostream> using namespace std; CMyThreadExp::CMyThreadExp(EnumMyThreadExp exp, CMyThread *pobj) : m_exp(exp), m_pobj(pobj) { GetInfo(); ShowExp(); } CMyThreadExp::~CMyThreadExp() { } void CMyThreadExp::GetInfo() { sprintf(m_strInfo, "\nCMyThread Exception When %p Execute ", (void*)(m_pobj)); switch (m_exp) { case EXPCREATETHREAD: strcat(m_strInfo, "CreateThread()"); break; case EXPTERMINATETHREAD: strcat(m_strInfo,"TerminateThread()"); break; case EXPRESUMETHREAD: strcat(m_strInfo, "ResumeThread()"); break; case EXPSUSPENDTHREAD: strcat(m_strInfo, "SuspendThread()"); break; case EXPCREATEEVENT: strcat(m_strInfo, "CreateEvent()"); break; case EXPCREATEMUTEX: strcat(m_strInfo, "CreateMutex()"); break; } char temp[100]; sprintf(temp, "\nError Code = %d", GetLastError()); strcat(m_strInfo, temp); } /**********************************************************************************************/ CMyThread::CMyThread() : m_hThread(NULL), m_status(THREADPAUSE), m_hEvt(0), m_hMtx(0) { m_hThread = CreateThread(0, 0, (LPTHREAD_START_ROUTINE)(CMyThread::RealThreadProc), (void*)&m_param, CREATE_SUSPENDED, &m_id); if (m_hThread == NULL)//fail to create thread { CMyThreadExp(EXPCREATETHREAD, this); throw;//construct exception } m_param.proc = NULL; m_param.lpParam = NULL; m_param.pCMyThread = this; m_hEvt = CreateEvent(0, FALSE, FALSE, 0);//自动复位 if (m_hEvt == 0) { CloseHandle(m_hThread); CMyThreadExp(EXPCREATEEVENT, this); throw;//construct exception } m_hMtx = CreateMutex(0, 0, 0); if (m_hMtx == 0)//fail to create mutex { unsigned long i = GetLastError(); CloseHandle(m_hEvt); CloseHandle(m_hThread); CMyThreadExp(EXPCREATEMUTEX, this); throw;//construct exception } } CMyThread::~CMyThread() { CloseHandle(m_hMtx); if (TerminateThread(m_hThread, -1) == 0)//fail to terminate CMyThreadExp(EXPTERMINATETHREAD, this); } bool CMyThread::Run() { WaitForSingleObject(m_hMtx, INFINITE);//get mutex if (m_status == THREADPAUSE) if (ResumeThread(m_hThread) == -1)//fail to resume { CMyThreadExp(EXPRESUMETHREAD, this); ReleaseMutex(m_hMtx);//release mutex return false; } m_status = THREADRUN; ReleaseMutex(m_hMtx);//release mutex return true; } bool CMyThread::Pause() { WaitForSingleObject(m_hMtx, INFINITE);//get mutex if (m_status == THREADRUN) if (SuspendThread(m_hThread) == -1)//fail to suspend { CMyThreadExp(EXPSUSPENDTHREAD, this); ReleaseMutex(m_hMtx);//release mutex return false; } m_status = THREADPAUSE; ReleaseMutex(m_hMtx);//release mutex return true; } void CMyThread::Join() { WaitForSingleObject(m_hEvt, INFINITE); } void CMyThread::SetProc(LPTHREAD_START_ROUTINE proc, void* lpParam) { WaitForSingleObject(m_hMtx, INFINITE);//get mutex m_param.proc = proc; m_param.lpParam = lpParam; ReleaseMutex(m_hMtx);//release mutex } DWORD WINAPI CMyThread::RealThreadProc(void* lpParam) { LPTHREAD_START_ROUTINE proc; MyThreadParam *pp = (MyThreadParam*)lpParam; while (true) { proc = pp->proc; if (proc) (*proc)(pp->lpParam); pp->proc = NULL;//clear function pp->lpParam = NULL;//clear param pp->pCMyThread->Pause();//pause automatic SetEvent(pp->pCMyThread->m_hEvt); } } /**********************************************************************************************/ CMyThreadPool::CMyThreadPool(int initNum) { CMyThread *pt; for (int i=0; i<initNum; i++) { pt = new CMyThread; m_lst.push_back(pt); } } CMyThreadPool::~CMyThreadPool() { list<CMyThread*>::iterator it = m_lst.begin(); list<CMyThread*>::iterator ite = m_lst.end(); for ( ; it != ite; it++)//terminate all thread { delete (*it); } } DWORD CMyThreadPool::DoWork(LPTHREAD_START_ROUTINE proc, void *lpParam, bool run) { list<CMyThread*>::iterator it = m_lst.begin(); list<CMyThread*>::iterator ite = m_lst.end(); for ( ; it != ite; it++)//is there a idle thread? { if ((*it)->m_param.proc == NULL && (*it)->m_status == THREADPAUSE ) { (*it)->SetProc(proc, lpParam); if (run)//run at once (*it)->Run(); return (*it)->m_id; } } //no idle thread CMyThread *pt = new CMyThread;//create a new thread m_lst.push_back(pt); pt->SetProc(proc, lpParam); if (run) pt->Run(); return pt->m_id; } bool CMyThreadPool::Run(DWORD id) { list<CMyThread*>::iterator it = m_lst.begin(); list<CMyThread*>::iterator ite = m_lst.end(); for ( ; it != ite; it++) { if ((*it)->m_id == id) return ((*it)->Run()); } return false; } bool CMyThreadPool::Pause(DWORD id) { list<CMyThread*>::iterator it = m_lst.begin(); list<CMyThread*>::iterator ite = m_lst.end(); for ( ; it != ite; it++) { if ((*it)->m_id == id) return ((*it)->Pause()); } return false; } unsigned CMyThreadPool::SetSize(unsigned size) { unsigned nowsize = m_lst.size(); if (nowsize <= size) { CMyThread *pt; unsigned inc = size-nowsize; for (unsigned i=0; i<inc; i++) { pt = new CMyThread; m_lst.push_back(pt); } return size; } else { unsigned dec = nowsize - size; list<CMyThread*>::iterator it = m_lst.begin(); list<CMyThread*>::iterator ite = m_lst.end(); list<CMyThread*>::iterator itemp; unsigned i=0; for ( ; it!=ite && i<dec; ) { if ((*it)->m_status == THREADPAUSE) { itemp = it++; delete ((*itemp)); m_lst.erase(itemp); i++; continue; } it++; } Sleep(100*i); return m_lst.size(); } } 
|