ACE笔记(6) Proactor机制下的异步SOCKET开发 Proactor机制和reactor机制的不同 1、在reactor机制下,所有I/O请求是同步的,即接到信号请求后,立即执行信号处理, 执行完后才开始继续监听信号请求,其接收信号请求的机制是被动的 而在Proactor机制下,I/O请求是异步的,即接到信号请求后,不立即执行信号处理(而是在莫个时刻执行该处理), 然后再继续监听信号请求,其接收信号请求的机制是主动的 2、要想符合Proactor机制的信号处理,需要从 ACE_Service_Handler 派生,而reactor机制信号处理类要从ACE_Event_Handler派生 ACE_Service_Handler 中以定义的常见回调方法: /// 异步读完成时会被调用 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); /// 在UDP SOCKET中,当异步写完成时会被调用 virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result); /// 在UDP SOCKET中,当异步读完成时会被调用 virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result); /// 当异步写完成时会被调用 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); /// 当异步读文件完成时会被调用 virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); /// 当异步写文件完成时会被调用 virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result); ///当异步接收完成时会被调用 virtual void handle_accept (const ACE_Asynch_Accept::Result &result); ///当异步连接完成时会被调用 virtual void handle_connect (const ACE_Asynch_Connect::Result &result); ///当异步传输文件完成时会被调用 virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result); ///超时时会被调用 virtual void handle_time_out (const ACE_Time_Value &tv, const void *act = 0); ACE_Service_Handler 类OPEN方法使用注意: 方法定义:open (ACE_HANDLE handle,ACE_Message_Block &message_block) 当客户端连接时会触发此方法 message_block 参数附带了伴随客户端连接发送过来的消息块 所以在实现OPEN方法中,要注意判断message_block 参数是否附带了消息,如果附带了,如果不想改变现有的事件数据统一处理模式,则需要自己模拟一个读完成动作,如下: if (message_block.length () != 0) { // 复制消息块(引用) ACE_Message_Block &duplicate =*message_block.duplicate (); // 伪装一个事件读完成对象 ACE_Asynch_Read_Stream_Result_Impl *fake_result = ACE_Proactor::instance ()->create_asynch_read_stream_result (*this, this->handle_, duplicate, initial_read_size, 0, ACE_INVALID_HANDLE, 0, 0); //移动写指针到未写入的位置,因为读完成动作中会自动移动写指针 size_t bytes_transferred = message_block.length (); duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred); //发出事件完成回调命令 fake_result->complete (message_block.length (), 1, 0); // 删除伪装的对象 delete fake_result; } ACE_Asynch_Read_Stream 类常见方法 open 方法:初始化读操作 read 方法:读操作,把数据存放在一个 ACE_Message_Block 数据结构上,该结构会自动移动写指针(wr_ptr) ACE_Asynch_Write_Stream 类常见方法 open 方法:初始化写操作 write方法:写操作,会把存在 ACE_Message_Block 数据结构上写入指定的handle中,该结构会自动移动读指针(rd_ptr) ACE_Message_Block 类常见方法 构造函数:ACE_Message_Block (长度) rd_ptr():返回读指针 wr_ptr(): 返回写指针 release():释放内存 init(data,len):分配内存 wr_ptr(len):把写指针向前移动LEN个位置 wr_ptr(×):把写指针指向当前指针 duplicate():复制当前消息块 ACE_Asynch_Read_Stream::Result 类常见方法、属性 用于在回调完成时获得相关完成信息的类 bytes_to_read ():想读取的字节数 bytes_transferred ():有多少个字节被接收 handle ():作用在那个handle上 success():操作是否成功 message_block ():返回消息块 下面附带一个异步I/O处理的例子(例子来源于ACE自带例子,稍有改动),该例子用来异步接收客户请求,并把客户请求的信息显示在控制台上 #include "ace/OS_main.h" #include "ace/Service_Config.h" #include "ace/Proactor.h" #include "ace/Asynch_IO.h" #include "ace/Asynch_IO_Impl.h" #include "ace/Asynch_Acceptor.h" #include "ace/INET_Addr.h" #include "ace/SOCK_Connector.h" #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" #include "ace/Message_Block.h" #include "ace/Get_Opt.h" #include "ace/OS_NS_sys_stat.h" static u_short port = ACE_DEFAULT_SERVER_PORT;
static int done = 0; static int initial_read_size = BUFSIZ; class Receiver : public ACE_Service_Handler { public: Receiver (void); ~Receiver (void); virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); protected: virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result); private: int initiate_read_stream (void); ACE_Asynch_Read_Stream rs_; ACE_HANDLE handle_; // Handle for IO to remote peer. }; Receiver::Receiver (void) : dump_file_ (ACE_INVALID_HANDLE), handle_ (ACE_INVALID_HANDLE) {} Receiver::~Receiver (void){} void Receiver::open (ACE_HANDLE handle, ACE_Message_Block &message_block) { ACE_DEBUG ((LM_DEBUG, "%N:%l:Receiver::open called\n")); this->handle_ = handle; // 打开SOCKET读取流 if (this->rs_.open (*this, this->handle_) == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::open")); return; } if (message_block.length () != 0) { // 复制消息块(引用) ACE_Message_Block &duplicate =*message_block.duplicate (); // 伪装一个事件读完成对象 ACE_Asynch_Read_Stream_Result_Impl *fake_result = ACE_Proactor::instance ()->create_asynch_read_stream_result (*this, this->handle_, duplicate, initial_read_size, 0, ACE_INVALID_HANDLE, 0, 0); //移动写指针到未写入的位置,因为读完成动作中会自动移动写指针 size_t bytes_transferred = message_block.length (); duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred); //发出事件完成回调命令 fake_result->complete (message_block.length (), 1, 0); // 删除伪装的对象 delete fake_result; } else // 没有附带数据,则开始一个新的读操作 if (this->initiate_read_stream () == -1) return; } int Receiver::initiate_read_stream (void) { ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); // 开始读操作 if (this->rs_.read (*mb, mb->size () - 1) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1); return 0; } void Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { //开始读操作 ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); //显示读取的信息 result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message:", result.message_block ().rd_ptr ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); if (result.success () && result.bytes_transferred () != 0) { result.message_block ().release(); // 如果还存在未读取数据,则继续读取 if (this->initiate_read_stream () == -1) return; } else { //不存在,则释放消息块并关闭SOCKET连接 ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); result.message_block ().release (); done = 0; ACE_OS::closesocket (this->handle_); } } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { ACE_Asynch_Acceptor<Receiver> acceptor; //打开SOCKET端口 if (acceptor.open (ACE_INET_Addr (port), initial_read_size, 1) == -1) return -1; int success = 1; while (success > 0 && !done) // 处理和分发事件 success = ACE_Proactor::instance ()->handle_events (); return 0; } //下面的代码时帮助编译器解析上面的模板 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Asynch_Acceptor<Receiver>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Asynch_Acceptor<Receiver> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ 
|