队列元素为一个无符字符数组(即字节数组)。循环队列中只存放该数组的地址。这个地址指向一个存储区域,该存储区的结构为: _______________________________________________ |数组长度(4B)|数组内容(由前面的长度决定长度)| --------------------------------------------------------------- 这个循环队列支持多线程同步操作,对队列改动时,有互斥锁mutex防止不同步。
工作环境: linux 9.0 编译: g++ BytesQueue.cpp main.cpp -o main -lpthread
下面是代码部分: 俩个main.cpp函数,前一个为一般单线程应用。后一个为多线程应用。
/* BytesQueue.h zhangggdlt 2004/11/15 to realize a queue storing bytes array. */ #ifndef _BYTES_QUEUE_H #define _BYTES_QUEUE_H
#include <pthread.h> #include <unistd.h>
#define OPERATION_OK 0 #define QUEUE_FULL -1 #define QUEUE_EMPTY -2 #define INCREASE_FAILED -3 #define NO_AREA -4 #define POINT_NULL -5 #define FAILED_LOCK -6
typedef int ERR_NUMBER; typedef unsigned char uint_8;
/* The class BytesQueue is used to realize store an unsigned char array into the queue which sustain mutiple thread and sycronization. This queue is a cycle queue. The size of the queue can be set when it is constructed and you can also increas the size of the queue during the application. */ class BytesQueue { private: int _size; int _head; int _rear; uint_8 **_buffer; pthread_mutex_t QueMutex; public: BytesQueue(int size=512); ERR_NUMBER increaseSize(int size=512); ERR_NUMBER inQueue(const uint_8 *data, int len); ERR_NUMBER outQueue(uint_8 *data, int &len); void destroy(); void errMessage(ERR_NUMBER err); void showBytesQueue(BytesQueue& bq); };
#endif //_BYTES_QUEUE_H
—————————————————————————————————————— /* BytesQueue.cpp zhangggdlt 2004/12/9 to realize a stack storing bytes array which sustain the mutitread and sycronization. */ #include <stdio.h> #include <string.h> #include "BytesQueue.h"
/* Constructor. This BytesQueue can sustain sycronization among the mutiThread. It means you can use this data structure under mutithread. */ BytesQueue::BytesQueue(int size) //size = 512 { this->_size = size; this->_buffer = new (uint_8*)[this->_size]; this->_head = 0; this->_rear = 0; pthread_mutex_init(&QueMutex, NULL); }
/* You can use this number fuction to increase the size of the queue. The data will not be lost during the increasement. */ ERR_NUMBER BytesQueue::increaseSize(int size) //size = 512 { uint_8 **temp; int eleCount = (this->_rear - this->_head + 1 + this->_size) % this->_size; int tempSize = this->_size; int i,j; this->_size += size; if(!(temp = new (uint_8*)[this->_size])) return INCREASE_FAILED; if (this->_rear == this->_head) //empty queue { eleCount = 0; } if ((this->_rear+1)%this->_size == this->_head) //full queue { eleCount = this->_size - size; } for (i=this->_head ,j=0; j<eleCount; j++,i=(i+1)%this->_size) { temp[i] = this->_buffer[i%tempSize]; this->_rear = i; } delete []this->_buffer; this->_buffer = temp; return OPERATION_OK; }
/* This function is use to accept one element into the queue. You must remember the element is a unsigned char array. Len is the length of the data. */ ERR_NUMBER BytesQueue::inQueue(const uint_8 *data, int len) { uint_8 *temp; if ((this->_rear+1)%this->_size == this->_head) { printf("The queue is full!\n"); return QUEUE_FULL; } if (!(temp = new uint_8[len + 4])) return NO_AREA; if (pthread_mutex_trylock(&QueMutex)) { printf("Try lock failed!\n"); return FAILED_LOCK; } this->_buffer[this->_rear] = temp; memcpy(this->_buffer[this->_rear], &len, 4); memcpy(this->_buffer[this->_rear]+4, data, len); this->_rear = (this->_rear + 1) % this->_size; pthread_mutex_unlock(&QueMutex); return OPERATION_OK; }
/* This function is use to set free one element from the queue. You must get a buffer big enough to store the data before you call the function. At the same time you need a more int &len to get the data length. */ ERR_NUMBER BytesQueue::outQueue(uint_8 *data, int &len) { if(!data) return POINT_NULL; if(this->_head == this->_rear) { printf("The queue is empty!\n"); return QUEUE_EMPTY; } if (pthread_mutex_trylock(&QueMutex)) { printf("Try lock failed!\n"); return FAILED_LOCK; } memcpy((void*)&len, this->_buffer[this->_head], 4); memcpy((void*)data, this->_buffer[this->_head]+4, len); this->_head = (this->_head + 1) % this->_size; pthread_mutex_unlock(&QueMutex); return OPERATION_OK; }
/* This function is use to set free the data structure. */ void BytesQueue::destroy() { while (this->_head != this->_rear) { delete [](this->_buffer[this->_head]); this->_head = (this->_head + 1) % this->_size; } delete [](this->_buffer); this->_size = 0; this->_buffer = NULL; this->_head = 0; this->_rear = 0; }
/* This fuction is use to test. Show the result of the call fuction. */ void BytesQueue::errMessage(ERR_NUMBER err) { switch(err) { case OPERATION_OK: printf(" push is ok!\n"); break; case QUEUE_FULL: printf(" push failed! The queue is full!!\n"); break; case QUEUE_EMPTY: printf(" pop failed! The queue is empty!!\n"); break; case INCREASE_FAILED: printf(" increase queue size failed! \n"); break; default: printf(" other things are wrong! \n"); break; } }
/* This fuction is used to show the infomation of the current queue. */ void BytesQueue::showBytesQueue(BytesQueue& bq) { printf(" %s\n", "The info of the BytesQueue is :"); printf(" size : %d\n", bq._size); printf(" head : %d\n", bq._head); printf(" rear : %d\n", bq._rear); printf(" buf addr : 0x%x\n", bq._buffer); }
/* using namespace NetworkProtocols;
//this is a good example to show how to use the data structure BytesQueue.
int main() { int len,i; char ch; ERR_NUMBER err; uint_8 bufi[]={1,2,3,4,5,6,7,8,9,0}; uint_8 bufo[10]; BytesQueue bs; bs.showBytesQueue(bs); ch = getchar(); while(ch != 'q') { switch(ch) { case 'i': err = bs.inQueue(bufi, 10); bs.errMessage(err); bs.showBytesQueue(bs); ch = getchar(); break; case 'o': err = bs.outQueue(bufo, len); bs.errMessage(err); bs.showBytesQueue(bs); ch = getchar(); break; case 'e': err = bs.increaseSize(); bs.errMessage(err); bs.showBytesQueue(bs); ch = getchar(); break; case 'h': printf("....................Help................\n"); printf(" i: go into an array into Queue.\n"); printf(" o: go out of an array out of the queue.\n"); printf(" e: enlarge the size of the queue.\n"); printf(" h: help\n"); printf(" q: quit the system.\n"); ch = getchar(); break; default: if (ch != '\n') printf("...........Your input is wrong! Again!..............\n"); ch = getchar(); break; } } bs.destroy(); bs.showBytesQueue(bs); return 0; } */ ———————————————————————————————————————— //main.cpp #include <stdio.h> #include "BytesQueue.h"
typedef struct { int id; BytesQueue *bq; uint_8 *buf; int len; int delay; }MyParameter;
pthread_t threads[5]; pthread_mutex_t QueMutex; pthread_attr_t attr;
void *inQueue(void* pvar) { int i = 1; MyParameter *para = (MyParameter*)pvar; while( i ) { printf("Thread inQue: %d is working! \n", para->id); para->bq->inQueue(para->buf, para->len); para->bq->showBytesQueue(*(para->bq)); //para->bs->push(para->buf,para->len); //para->bs->showBytesStack(*(para->bs)); usleep(para->delay); i ++; } pthread_exit(NULL); }
void *outQueue(void* pvar) { int i = 1; MyParameter *para = (MyParameter*)pvar; while( i ) { printf("-------------Thread outQue: %d is working! \n", para->id); para->bq->outQueue(para->buf, para->len); para->bq->showBytesQueue(*(para->bq)); //para->bs->pop(para->buf,para->len); //para->bs->showBytesStack(*(para->bs)); usleep(para->delay); i ++; } pthread_exit(NULL); }
int main() { //IpStack::IpStack(int size) //size=10 uint_8 mybuf1[] = { 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x50, 0x58, 0x0D, 0x0D, 0x0D, 0x08, 0x00, 0x45, 0x00, 0x00, 0x34, 0x00, 0xF2, 0x00, 0x00, 0x40, 0x11, 0xB6, 0x65, 0xC0, 0xA8, 0x21, 0x0F, 0xC0, 0xA8, 0x21, 0x02, 0x04, 0x01, 0x00, 0x05, 0x00, 0x20, 0x60, 0x4c, 0x73, 0x66, 0x61, 0x73, 0x64, 0x66, 0x73, 0x61, 0x64, 0x66, 0x61, 0x73, 0x64, 0x66, 0x73, 0x64, 0x61, 0x66, 0x61, 0x73, 0x66, 0x73, 0x64, 0x66 }; uint_8 mybuf2[100]; int len; BytesQueue bq(100); MyParameter paras[4]={ {0,&bq,mybuf1,66,1000000}, {1,&bq,mybuf1,66,2000000}, {2,&bq,mybuf1,66,3000000}, {3,&bq,mybuf2,len,1000000} }; //bq.showBytesQueue(bq); pthread_attr_init(&attr); pthread_create(&threads[0], &attr, inQueue, (void *)¶s[0]); pthread_create(&threads[1], &attr, inQueue, (void *)¶s[1]); pthread_create(&threads[2], &attr, inQueue, (void *)¶s[2]); pthread_create(&threads[3], &attr, outQueue, (void *)¶s[3]); //pthread_create(&threads[4], &attr, outQueue, (void *)¶s[3]); for (int i=0; i<4; i++) { pthread_join(threads[i], NULL); } pthread_attr_destroy(&attr);
bq.destroy(); //bq.showBytesStack(bq); printf("ok!!\n"); pthread_exit (NULL); return 0; } //end of main ____________________________________________________________________________________ zhangggdlt 2004.12.10 (完)

|