首页
论坛
课程
招聘
[原创]自己对异步IO完成端口的体会 有些不足大家帮忙说下
2011-5-17 16:20 7024

[原创]自己对异步IO完成端口的体会 有些不足大家帮忙说下

2011-5-17 16:20
7024
class CClientContext  //To store and manage client related information
{
private:
     
     OVERLAPPED        *m_pol;
     WSABUF            *m_pwbuf;
     
     int               m_nTotalBytes;
     int               m_nSentBytes;
     
     SOCKET            m_Socket;  //accepted socket
     int               m_nOpCode; //will be used by the worker thread to decide what operation to perform
     char              m_szBuffer[MAX_BUFFER_LEN];
     
public:
     
     void SetOpCode(int n){m_nOpCode = n;}
     int GetOpCode(){return m_nOpCode;}
     void SetTotalBytes(int n){m_nTotalBytes = n;}
     int GetTotalBytes(){return m_nTotalBytes;}
     void SetSentBytes(int n){m_nSentBytes = n;}
     void IncrSentBytes(int n){m_nSentBytes += n;}
     int GetSentBytes() {return m_nSentBytes;}
     void SetSocket(SOCKET s){m_Socket = s;}
     SOCKET GetSocket(){return m_Socket;}
     void SetBuffer(char *szBuffer){strcpy(m_szBuffer, szBuffer);}
     void GetBuffer(char *szBuffer){strcpy(szBuffer, m_szBuffer);}
     void ZeroBuffer(){ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);}
     void SetWSABUFLength(int nLength){m_pwbuf->len = nLength;}
     int GetWSABUFLength(){return m_pwbuf->len;}
     WSABUF* GetWSABUFPtr(){return m_pwbuf;}
     OVERLAPPED* GetOVERLAPPEDPtr(){return m_pol;}
     void ResetWSABUF()
	 {ZeroBuffer();
       m_pwbuf->buf = m_szBuffer;
        m_pwbuf->len = MAX_BUFFER_LEN;
     }

     CClientContext()
     {
        m_pol = new OVERLAPPED;
        m_pwbuf = new WSABUF;
        ZeroMemory(m_pol, sizeof(OVERLAPPED));
        m_Socket =  SOCKET_ERROR;
        ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
        m_pwbuf->buf = m_szBuffer;  //这句话的意思是不是说 将m_szBuffer 指针给了buf 
	     m_pwbuf->len = MAX_BUFFER_LEN;    //也就是说 实际上WSABUF里的数组指针指向的是m_szBuffer 实际接收的数据是 在m_szBuffer数组里 问题
        m_nOpCode = 0;
        m_nTotalBytes = 0;
        m_nSentBytes = 0;
     }
     

};
std::vector<CClientContext *> g_ClientContext;
bool InitializeIOCP();
bool Initialize();
void CleanUp();
void DeInitialize();
DWORD WINAPI AcceptThread(LPVOID lParam);
void AcceptConnection(SOCKET ListenSocket);
bool AssociateWithIOCP(CClientContext   *pClientContext);
DWORD WINAPI WorkerThread(LPVOID lpParam);
void WriteToConsole(char *szFormat, ...);
void AddToClientList(CClientContext   *pClientContext);
void RemoveFromClientListAndFreeMemory(CClientContext   *pClientContext);
void CleanClientList();
int GetNoOfProcessors();
#endif


int main(int argc, char *argv[])
{
   Initialize();
    SOCKET ListenSocket;
   struct sockaddr_in ServerAddress;
    ListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
     bind(ListenSocket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress));
      listen(ListenSocket,SOMAXCONN))
     g_hAcceptEvent = WSACreateEvent();
     WSAEventSelect(ListenSocket, g_hAcceptEvent, FD_ACCEPT);
      DWORD nThreadID;
     g_hAcceptThread = CreateThread(0, 0, AcceptThread, (void *)ListenSocket, 0, &nThreadID);
}

bool InitializeIOCP()
{
g_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );

     DWORD nThreadID;
     for (int ii = 0; ii < g_nThreads; ii++)
     {
          g_phWorkerThreads[ii] = CreateThread(0, 0, WorkerThread, (void *)(ii+1), 0, &nThreadID);
     }
     
     return true;
}

DWORD WINAPI AcceptThread(LPVOID lParam)
{
     SOCKET ListenSocket = (SOCKET)lParam;
     
     WSANETWORKEVENTS WSAEvents;
     while(WAIT_OBJECT_0 != WaitForSingleObject(g_hShutdownEvent, 0))
     {
          if (WSA_WAIT_TIMEOUT != WSAWaitForMultipleEvents(1, &g_hAcceptEvent, FALSE, 0, FALSE))
          {
               void AcceptConnection(SOCKET ListenSocket)
{
     sockaddr_in ClientAddress;
     int nClientLength = sizeof(ClientAddress);
     SOCKET Socket = accept(ListenSocket, (sockaddr*)&ClientAddress, &nClientLength);
     CClientContext   *pClientContext  = new CClientContext;
     pClientContext->SetOpCode(OP_READ);    //
     pClientContext->SetSocket(Socket);
     AddToClientList(pClientContext);  //这里是添加到列表里的
     if (true == AssociateWithIOCP(pClientContext))  //请问这个关联操作会触发OP_READ 完成操作事件吗   问题1 ?????????
     {
          pClientContext->SetOpCode(OP_WRITE);
      WSABUF *p_wbuf = pClientContext->GetWSABUFPtr();
      OVERLAPPED *p_ol = pClientContext->GetOVERLAPPEDPtr();  //我想这里的指针  得到指针 应该可以改变pClientContext 里的OVERLAPPED结构指针里的数据吧
      DWORD dwFlags = 0;  DWORD dwBytes = 0;
          //这里是接收一个字节 搞不懂为啥是一个字节 为啥 要接收难道我发送不行吗  问题2 ?????????
      int nBytesRecv = WSARecv(pClientContext->GetSocket(), p_wbuf, 1,&dwBytes, &dwFlags, p_ol, NULL);
  }
}

bool AssociateWithIOCP(CClientContext   *pClientContext)
{
     HANDLE hTemp = CreateIoCompletionPort((HANDLE)pClientContext->GetSocket(), g_hIOCompletionPort, (DWORD)pClientContext, 0);
return true;
}

DWORD WINAPI WorkerThread(LPVOID lpParam)
{   
     int nThreadNo = (int)lpParam;
    void *lpContext = NULL;
     OVERLAPPED       *pOverlapped = NULL;
     CClientContext   *pClientContext = NULL;
     DWORD            dwBytesTransfered = 0;
     int nBytesRecv = 0;
     int nBytesSent = 0;
     DWORD    dwBytes = 0, dwFlags = 0;
     
     while (WAIT_OBJECT_0 != WaitForSingleObject(g_hShutdownEvent, 0))
     {
          BOOL bReturn = GetQueuedCompletionStatus(g_hIOCompletionPort,&dwBytesTransfered,(LPDWORD)&lpContext,&pOverlapped,INFINITE);
           pClientContext = (CClientContext *)lpContext;
         WSABUF *p_wbuf = pClientContext->GetWSABUFPtr();
         OVERLAPPED *p_ol = pClientContext->GetOVERLAPPEDPtr();
          switch (pClientContext->GetOpCode())
          {
          case OP_READ:
              pClientContext->IncrSentBytes(dwBytesTransfered);   //插入到m_nSentBytes 已经传输的字节数
               if(pClientContext->GetSentBytes() < pClientContext->GetTotalBytes())  //判断发送的数据是否小于总共要发送的数据 m_nTotalBytes
               {
                    pClientContext->SetOpCode(OP_READ);   //如果是 则设置标志为OP_READ  这里设置为 OP_READ 我想应该谁重复发送吧
                  p_wbuf->buf += pClientContext->GetSentBytes();   //向后移动 然后
                    p_wbuf->len = pClientContext->GetTotalBytes() - pClientContext->GetSentBytes();
                   dwFlags = 0;
                nBytesSent = WSASend(pClientContext->GetSocket(), p_wbuf, 1, &dwBytes, dwFlags, p_ol, NULL);
               }
               else  //到这里的意思是发送完毕吗  按道理这里应该也要设置为OP_READ 来实现重复接收啊 为啥是OP_WRITE
               {   
                    pClientContext->SetOpCode(OP_WRITE);  //如果发送的数据 大于总共要发送的 到这一步 设置开始接收数据 触发下一个事件
                                            //为什么 没有没有关于WSARecv的 而且有也只有一个字节 难道接收只接收一个字节吗    问题 3?????????
                                        pClientContext->ResetWSABUF();  //清空WSABUF结构      
                    dwFlags = 0;
                   nBytesRecv = WSARecv(pClientContext->GetSocket(), p_wbuf, 1,&dwBytes, &dwFlags, p_ol, NULL);
               }
              break;
               
          case OP_WRITE:
               char szBuffer[MAX_BUFFER_LEN];
              pClientContext->GetBuffer(szBuffer);  //接收到的字节 怎么接收的没说啊 为啥只接收了一个字节 接受一个缓冲区为啥就能打印不继续接收 难道 继续接收应该设置下面那个标志为 OP_WRITE   //问题4??????
             WriteToConsole("\nThread %d: The following message was received: %s", nThreadNo, szBuffer);
              pClientContext->SetOpCode(OP_READ);   //继续接收 应该设置这个标志为OP_WRITE 为啥  
                  //设置发送的字节数为 0 总发送字节数为 接收到的字节数
              pClientContext->SetTotalBytes(dwBytesTransfered);
              pClientContext->SetSentBytes(0);
                p_wbuf->len  = dwBytesTransfered;
               dwFlags = 0;
               nBytesSent = WSASend(pClientContext->GetSocket(), p_wbuf, 1,&dwBytes, dwFlags, p_ol, NULL);
}
有些东西我省略了方便大牛查看 帮忙解释下 啥云因

《0day安全 软件漏洞分析技术(第二版)》第三次再版印刷预售开始!

收藏
点赞0
打赏
分享
最新回复 (11)
雪    币: 107
活跃值: 活跃值 (10)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
tydef 活跃值 2011-5-17 16:24
2
0
体会在????
雪    币: 106
活跃值: 活跃值 (10)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
NiGHter 活跃值 2011-5-17 17:01
3
0
1、为什么WSARecv(.., 1, ...); 因为代码中写的是每次接收1bytes
2、粗略看了一下你的大体代码,用LoadRunner测试一下,服务端程序应该会崩
3、建议处理一下粘包以及封包的校验,另外可以考虑做内存池之类的管理下,还有连接超时检测等等
雪    币: 27
活跃值: 活跃值 (10)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
smove 活跃值 2011-5-17 17:10
4
0
好的 不过我那些内存处理的暂时弄掉等下在交流 我粗略的省略掉了
雪    币: 27
活跃值: 活跃值 (10)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
smove 活跃值 2011-5-17 18:47
5
0
不礼貌的说一句 我刚才请教高手
你对一个字节的看法也是错的
麻烦请看下MSDN  我只是建议 谢谢你的上面的几个回答 对我很有用

不过能说下在哪里检测连接超时吗   还有内存池管理是否就是我自定义的那个结构 new CClientContext 管理和分配这个
雪    币: 106
活跃值: 活跃值 (10)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
NiGHter 活跃值 2011-5-17 23:01
6
0
不好意思,看错WSARecv的结构了,和recv记混了,是BufferCount,我刚才说的每次只接受1bytes是有些东西是这么做的,比如某飞信群发,这样做可以解决部分粘包问题,但是数据包大的话就会影响性能了。
建议看下内存池概念吧,申请释放多了内存不稳定,影响性能,可以考虑做成内存池,没必要每次都delete
IOCP这东西写出来都差不多,体现在细节上的处理,才会提高并发量。
建议装一个LoadRunner测试一下。

还有,代码就别老编辑掉了,免得别人看回帖发现与1楼的代码不符,造成误解。
雪    币: 106
活跃值: 活跃值 (10)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
NiGHter 活跃值 2011-5-17 23:13
7
0
连接超时的话检测方法很多,可以单独开个线程来处理一下,不过处理手法上要细腻一点,别让检测线程占用太多系统资源,否则性能会有影响。
雪    币: 130
活跃值: 活跃值 (36)
能力值: ( LV11,RANK:180 )
在线值:
发帖
回帖
粉丝
besterChen 活跃值 4 2011-7-3 19:43
8
0
http://www.codeproject.com/KB/IP/SimpleIOCPApp.aspx


人家说的好像挺详细的呀~
雪    币: 14
活跃值: 活跃值 (10)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
wting 活跃值 2011-7-4 08:42
9
0
这种通信框架有没有比较经典的代码可读的
雪    币: 184
活跃值: 活跃值 (10)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
霹雳狂风 活跃值 2011-7-4 22:15
10
0
m_pwbuf->buf = m_szBuffer;  //这句话的意思是不是说 将m_szBuffer 指针给了buf
       m_pwbuf->len = MAX_BUFFER_LEN;    //也就是说 实际上WSABUF里的数组指针指向的是m_szBuffer 实际接收的数据是 在m_szBuffer数组里 问题

微软决定要WSABUF为缓冲区参数,buf要指向一块有效内存 因为 m_szBuffer 的生命周期和 m_pwbuf是一致的 所以那样。。

if (true == AssociateWithIOCP(pClientContext))  //请问这个关联操作会触发OP_READ 完成操作事件吗   问题1 ?????????
据我猜测是不会出发读操作的

    //这里是接收一个字节 搞不懂为啥是一个字节 为啥 要接收难道我发送不行吗  问题2 ?????????
      int nBytesRecv = WSARecv(pClientContext->GetSocket(), p_wbuf, 1,&dwBytes, &dwFlags, p_ol, NULL);
这里是投递一个读的IOCP请求到队列中 也就是说有数据会触发你上面说的OP_READ操作(但是看你代码貌似是设置OP_WRITE

   pClientContext->SetOpCode(OP_WRITE);  //如果发送的数据 大于总共要发送的 到这一步 设置开始接收数据 触发下一个事件
              //为什么 没有没有关于WSARecv的 而且有也只有一个字节 难道接收只接收一个字节吗    问题 3?????????
          pClientContext->ResetWSABUF();  //清空WSABUF结构      
不是一个字节是一个WSABUF

pClientContext->GetBuffer(szBuffer);  //接收到的字节 怎么接收的没说啊 为啥只接收了一个字节 接受一个缓冲区为啥就能打印不继续接收 难道 继续接收应该设置下面那个标志为 OP_WRITE   //问题4??????

为什么只有一个字节我也不知道,不过你strcpy是很不负责任很危险的做法 小心堆栈溢出, 本次IO有多少字节不是 print(%s)说了算 是 dwBytesTransfered 说了算
是内核给你完成的,完成了给你的队列投递一个信息,然后你在Get出来, 和消息队列有一点点类似
继续接受关键是投递WSARecv操作到队列

记得点一下致谢
雪    币: 133
活跃值: 活跃值 (10)
能力值: ( LV5,RANK:60 )
在线值:
发帖
回帖
粉丝
kalrey 活跃值 1 2011-7-5 01:21
11
0
我是手机党,来膜拜十楼的
不作粘包处理和封包解析还是不行滴
雪    币: 184
活跃值: 活跃值 (10)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
霹雳狂风 活跃值 2011-7-8 21:03
12
0
不好意思哈,最近有点忙,改天一起吃饭
游客
登录 | 注册 方可回帖
返回