|
|
生产者-消费者模型的解决思路——自建队列 |
|
|
作者:未知 来源:月光软件站 加入时间:2005-2-28 月光软件站 |
笔者曾遇到这样的需求:某软件在运行中随时有可能向外发送短信,一方面发送短信的设备是个独占资源,另一方面有多个线程要发短信。按照“把不稳定因素限定在一个实体中”的原则,自然就用一个专门的线程来操作短信设备,它的任务是从消息队列中取出要发送的短信,通过短信设备发送出去。
这是比较常见的生产者-消费者模型,即一个模块产生数据,另外模块取得数据并进行处理。如何实现互斥?如何让生产者和消费者都能够方便的工作?
类似这样的情况在编程中十分常见,采用消息队列是很好的解决办法。本文给出的消息队列具有以下特色: ????·消息的大小、结构是自由的,甚至可以是一个对象; ????·消息队列的长度(容纳消息的个数)是可设定的,超过最大长度可以选择丢弃或等待; ????·新消息一般是放在队列的最后面,也可以放在最前面; ????·代码少,思路简洁,你可以根据情况扩展;
【设计思路】 设计思路其实很简单,消息队列类CHMQ是个模板类,用CList保存消息,用CriticalSection来实现互斥,用Event来控制消息队列为空或已满的情况。提供以下几个常用接口: ????·Push() 在队列尾增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。 ????·Insert() 在对列头增加消息,其他同Push()。 ????·Pop() 从队列头中取消息,如果队列为空则阻塞,直到不为空的时候。 ????·SetMaxCount() 设置消息队列的最大长度,以及如果消息队列满的时候新消息的处理方式。????或丢弃消息,或阻塞直到队列不为满的时候。 ????·GetMaxCount() 返回消息队列的最大长度,如果没有设置最大长度则返回-1。 ????·GetQueueCount() 返回目前消息队列中的消息个数。
【扩展】 ????(1) 目前一个CHMQ对象仅支持一个消息队列,可以扩展成多个消息队列 ????(2) 消息队列为空时,Pop()一直阻塞到有新消息时才返回,可以扩展成超时返回 ????(3) 消息队列已满时,Push()和Insert()可能会阻塞到队列不满的时候,可以扩展成超时丢弃
#ifndef CHMQ_H #define CHMQ_H
#include #include
template class CHMQ { public: ????CHMQ() ????{ ????????m_nMax = -1; ????????m_bDrop = FALSE;
????????::InitializeCriticalSection(&m_lock);
????????m_hPushEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL); ????????ASSERT(m_hPushEvent != NULL); ????????m_hPopEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL); ????????ASSERT(m_hPopEvent != NULL); ????}
????~CHMQ() ????{ ????????::DeleteCriticalSection(&m_lock); ????????::CloseHandle(m_hPushEvent); ????????::CloseHandle(m_hPopEvent); ????????m_list.RemoveAll(); ????}
????// 队列尾增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。 ????void Push(TYPE& type) ????{ ????????::EnterCriticalSection(&m_lock); ????????// 如果消息队列已满 ????????if ( m_nMax > 0 && m_list.GetCount() >= m_nMax) ????????{ ????????????::ResetEvent(m_hPushEvent); ????????????::LeaveCriticalSection(&m_lock); ????????????if( m_bDrop ) ????????????????return;
????????????if ( ::WaitForSingleObject(m_hPushEvent, INFINITE) != WAIT_OBJECT_0) ????????????????ASSERT(FALSE); ????????????::EnterCriticalSection(&m_lock); ????????}
????????m_list.AddTail(type); ????????::SetEvent(m_hPopEvent); ????????::LeaveCriticalSection(&m_lock); ????}
????// 队列头增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。 ????void Insert(TYPE& type) ????{ ????????::EnterCriticalSection(&m_lock); ????????// 如果消息队列已满 ????????if ( m_nMax > 0 && m_list.GetCount() >= m_nMax) ????????{ ????????????::ResetEvent(m_hPushEvent); ????????????::LeaveCriticalSection(&m_lock); ????????????if( m_bDrop ) ????????????????return;
????????????if ( ::WaitForSingleObject(m_hPushEvent, INFINITE) != WAIT_OBJECT_0) ????????????????ASSERT(FALSE); ????????????::EnterCriticalSection(&m_lock); ????????}
????????m_list.AddHead(type); ????????::SetEvent(m_hPopEvent); ????????::LeaveCriticalSection(&m_lock); ????} ???? ????// 从队列头中取消息,如果队列为空则阻塞,直到不为空的时候。 ????TYPE Pop() ????{ ????????TYPE type;
????????::EnterCriticalSection(&m_lock);
????????// 如果队列为空 ????????if (m_list.IsEmpty()) ????????{ ????????????::ResetEvent(m_hPopEvent); ????????????::LeaveCriticalSection(&m_lock); ????????????if ( ::WaitForSingleObject(m_hPopEvent, INFINITE) != WAIT_OBJECT_0) ????????????????ASSERT(FALSE); ????????????::EnterCriticalSection(&m_lock); ????????}
????????type = m_list.RemoveHead(); ????????::SetEvent(m_hPushEvent); ????????::LeaveCriticalSection(&m_lock);
????????return type; ????}
????// 返回目前消息队列中的消息个数。 ????int GetQueueCount() ????{ ????????int nCount = 0;
????????::EnterCriticalSection(&m_lock); ????????nCount = m_list.GetCount(); ????????::LeaveCriticalSection(&m_lock); ???????? ????????return nCount; ????}
????// 设置消息队列的最大长度,以及如果消息队列满的时候新消息的处理方式。 ????//????或丢弃消息,或阻塞直到队列不为满的时候。 ????void SetMaxCount(int nMax = -1, BOOL bDrop = FALSE) ????{ ????????::EnterCriticalSection(&m_lock); ????????m_nMax = nMax; ????????m_bDrop = bDrop; ????????::LeaveCriticalSection(&m_lock); ????}
????// 返回消息队列的最大长度,如果没有设置最大长度则返回-1。 ????int GetMaxCount() ????{ ????????::EnterCriticalSection(&m_lock); ????????::LeaveCriticalSection(&m_lock); ????????return m_nMax; ????}
private: ????CRITICAL_SECTION????m_lock; ????CList????m_list;
????HANDLE????m_hPopEvent; ????HANDLE????m_hPushEvent;
????int??m_nMax; ????BOOL m_bDrop; };
#endif // CHMQ_H

|
|
相关文章:相关软件: |
|