关于socket的地址复用和端口复用技术与UDP并发

这篇具有很好参考价值的文章主要介绍了关于socket的地址复用和端口复用技术与UDP并发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 一. socket五元组

linux:

setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,(const void *)&reuse , sizeof(int));

setsockopt(fd, SOL_SOCKET, SO_REUSEPORT,(const void *)&reuse , sizeof(int));

windows:

setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,(const char*)&reuse , sizeof(int)); 

socket是系统级资源(区别于进程级和线程级,在本机上是全局唯一的,https://zhuanlan.zhihu.com/p/612436510),可以人为绑定本地ip和端口,与协议类型,对端ip,对端端口一起,作为该socket在本机上的唯一标识。当网卡有数据来临时,根据数据包的协议类型,目的ip,目的端口,源ip,源端口查找到对应的socket,将数据送到相应的缓冲区。

        看上去socket的唯一标识确实是一个五元组,然而有些情况下,socket 五元组中的信息并不能全都生效。比如创建udp类型的socket,在接收到数据之前,无法确定对端的ip和端口;创建的用于listen的TCP socket,在监听到客户端连接请求之前,也不确定客户端的ip和端口。只有tcp accept拿到连接请求时,由操作系统为我们创建的socket,才会有完整的五元组信息。因此在五元组不确定的情况下,内核只能利用有限的三元信息,去寻找socket。如果局限在udp通讯,则协议类型无法提供有效的区分依据,五元组最终退化成二元组(tcp同理)。

        如果只利用socket的二元组信息已经可以区分不同的socket,那就皆大欢喜,因此操作系统默认不允许端口同时被多个socket占用就显得很合理。然而随着计算机技术和网络技术的日益复杂化,这一单纯的假设在很多场景下已经不再适用。比如为了提高网络吞吐量,引入了多进程网络编程技术,允许本机多个socket绑定到同一ip和端口上,此时二元组不能一一对应一个socket。linux操作系统内核为了管理这些五元组信息不全的socket,构建了hash表数据结构,以有限的二元组信息作为key把socket映射到散列桶上,并以拉链法处理hash冲突。当网卡数据包来临,根据数据包中的报头,查找到对应的散列桶,随机将数传给桶内的某一个socket,一般是round robin 算法。可以看作是内核级负载均衡。

        通常基于tcp的server,很少会在同一台物理机上开太多的服务进程,在业务量很大时可以做服务器集群,可以很好的避开上述问题。然而,近年来基于UDP的高并发网络模型开始流行,尤其是基于QUIC协议的低延时网络传输技术在流媒体传输领域取得的巨大成功,使得UDP并发越来越受到关注。下面是一个典型的UDP并发网络模型:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#include <assert.h>

#define SO_REUSEPORT    15

#define MAXBUF 10240
#define MAXEPOLLSIZE 100


int read_data(int sd)
{
    char recvbuf[MAXBUF + 1];
    int  ret;
    struct sockaddr_in client_addr;
    socklen_t cli_len=sizeof(client_addr);

    bzero(recvbuf, MAXBUF + 1);
  
    ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len);
    if (ret > 0) {
        printf("read[%d]: %s  from  %d\n", ret, recvbuf, sd);
    } else {
        printf("read err:%s  %d\n", strerror(errno), ret);
      
    }
    fflush(stdout);
}

int udp_accept(int sd, struct sockaddr_in my_addr)
{
    int new_sd = -1;
    int ret = 0;
    int reuse = 1;
    char buf[16];
    struct sockaddr_in peer_addr;
    socklen_t cli_len = sizeof(peer_addr);

    ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len);
    if (ret < 0) {
        return -1;
    }

    if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("child socket");
        exit(1);
    } else {
        printf("parent:%d  new:%d\n", sd, new_sd);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse,sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr));
    if (ret){
        perror("chid bind");
        exit(1);
    } else {
    }

    peer_addr.sin_family = PF_INET;
    printf("aaa:%s\n", inet_ntoa(peer_addr.sin_addr));
    if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) {
        perror("chid connect");
        exit(1);
    } else {
    }

out:
    return new_sd;
}

int main(int argc, char **argv)
{
    int listener, kdpfd, nfds, n, curfds;
    socklen_t len;
    struct sockaddr_in my_addr, their_addr;
    unsigned int port;
    struct epoll_event ev;
    struct epoll_event events[MAXEPOLLSIZE];
    int opt = 1;;
    int ret = 0;

    port = 1234;
  
    if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("socket");
        exit(1);
    } else {
        printf("socket OK\n");
    }

    ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
    if (ret) {
        exit(1);
    }
  
    bzero(&my_addr, sizeof(my_addr));
    my_addr.sin_family = PF_INET;
    my_addr.sin_port = htons(port);
    my_addr.sin_addr.s_addr = INADDR_ANY;
    if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) {
        perror("bind");
        exit(1);
    } else {
        printf("IP bind OK\n");
    }
    int flag = fcntl(listener,F_GETFL,0);
    fcntl(listener, F_SETFL, flag | O_NONBLOCK)
    kdpfd = epoll_create(MAXEPOLLSIZE);

    ev.events = EPOLLIN;//|EPOLLET;
    ev.data.fd = listener;

    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) {
        fprintf(stderr, "epoll set insertion error: fd=%dn", listener);
        return -1;
    } else {
        printf("ep add OK\n");
    }
 
    while (1) {
      
        nfds = epoll_wait(kdpfd, events, 10000, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            break;
        }
      
        for (n = 0; n < nfds; ++n) {
            if (events[n].data.fd == listener) {
                printf("listener:%d\n", n);
                int new_sd;               
                struct epoll_event child_ev;
                while(1)
                {
                    new_sd = udp_accept(listener, my_addr);
                    if(new_sd==-1)break;
                    child_ev.events = EPOLLIN;
                    child_ev.data.fd = new_sd;
                    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) 
                    {
                        fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd);
                        return -1;
                    }
                }
            } else {
                read_data(events[n].data.fd);
            }
        }
    }
    close(listener);
    return 0;
}

//源码地址:https://zhuanlan.zhihu.com/p/296666065

上面的代码,每收到一个新客户端发来的数据,会手动创建一个新的udp socket来与该客户端交互。新的udp socket会与初始udp socket绑定相同的地址和端口号,因此该网络模型中创建的所有的socket都会被放到一个散列桶内,hash表退化成了链表。观察上述代码,可以注意到,当获知了客户端的源端口和源ip后,会手动将新udp socket的五元组信息补全(执行了connect操作),这样一来,新的udp socket五元组就唯一确定了,下一次该客户端再发数据包,就会直接找到这个新的udp socket(socket还是在原来的散列桶内,找到它需要在这个桶中完成查找),放到指定缓冲区,而不会随机分配了。

        看上去非常nice,又回到了一个socket只处理一个客户端交互的熟悉套路上,然而还存在严重的问题,操作系统内核每次都要去在这个散列桶(链表)查询,寻找那个唯一匹配五元组的socket,在并发量起来后,查找效率十分低下(https://blog.csdn.net/dog250/article/details/104219341),这个有赖于操作系统内核进一步优化,在应用层很难做出改进。另一个严重问题是,新建的udp socket没有关闭机制,随着客户量的增加很快就会耗尽系统资源。必须增加超时退出机制,将一定时间内没有数据接收的socket 关闭掉。此外,由于udp本身是不可靠协议,客户端第一次发的连接请求包可能会丢失,尤其是短时时间内大量客户端连接请求数据包涌进上面的listener socket时,recv_from调用不及时,就会在服务端造成数据包丢失。为了解决该问题,可以让listener socket变成非阻塞,水平触发的运行模式,可以适当缓解,但是从根源上来说,客户端要有重发机制,这样不仅能解决服务端丢包问题,而且还能解决网络链路丢包问题。

二. Windows 基于IOCP的UDP并发网络模型

在之前的一篇博客中提到过windows 基于IOCP的TCP并发网络模型(https://blog.csdn.net/yuanshenqiang/article/details/120454822),如果将其改为UDP并发也是可以的,不过fpConnectEx不能用于udp socket,可直接调用connect进行客户端地址绑定。但是windows的UDP协议栈好像不支持在多个具有相同端口号的socket之间进行路由,即便绑定到相同的端口号,并且connect到指定地址,数据包也总是会汇聚到同一个socket。因此,客户端在收到第一个回复包后,需要根据回复包携带的端口修改目的端口,否则无法正常通讯(实际上UDP协议本身也不保证通信过程中对方发来的数据包携带的端口始终不变)。

    下面的UDP服务器也需要为新建的udp socket增加超时退出机制,否则服务端也很容易资源耗尽。与epoll_wait一样,当调用closesocket后,GetQueuedCompletionStatus会立即检测到socket状态变化,将已经投递到完成队列的事件吐出来,返回值为false,错误码为ERROR_OPERATION_ABORTED。

#include <iostream>
#include <set>
#include <mutex>
#define _WINSOCK_DEPRECATED_NO_WARNINGS
 
#define BUFFER_SIZE 1024
#define START_POST_ACCEPTEX 2
#define THREAD_COUNT 2
#define UDP_SERVER_PORT 50721
 
#include <Winsock2.h>
#pragma comment(lib, "Ws2_32.lib")
 
#include <MSWSock.h>
#pragma comment(lib, "Mswsock.lib")
 
struct ServerParams
{
	SOCKET listenerSock;
	HANDLE iocp;
};
struct ConnectionInfo
{
	unsigned long ip;
	int port;
	__int64 overlp;
	ConnectionInfo(unsigned long _ip, int _port, __int64 _overlp) :ip(_ip), port(_port), overlp(_overlp) {}
	bool operator<(const ConnectionInfo& node) const
	{
		if (ip < node.ip)
			return true;
		else if (port < node.port)
			return true;
		else
			return false;
	}
};

enum IO_OP_TYPE
{
	IO_ACCEPT,
	IO_RECV,
	IO_SEND,
	IO_CONNECT,
	IO_DISCONNECT,
	IO_EX_SEND
};

typedef struct UdpOverlappedPerIO
{
	OVERLAPPED overlapped;
	SOCKET socket;
	WSABUF wsaBuf;
	IO_OP_TYPE type;
	__int64 lifetime;                  //存活时间
	char wbuff[BUFFER_SIZE];		   //写缓冲区
	char rbuff[BUFFER_SIZE];         //读缓冲区
	SOCKADDR_IN remoteAddr;     //存储数据来源IP地址
	int remoteAddrLen;                      //存储数据来源IP地址长度
	BOOL deprecated;					   //数据是否失效,仅当type为IO_EX_SEND时生效
	BOOL destroyEnable;				   //数据失效后是否允许回收,仅当type为IO_EX_SEND时生效
}*LPUdpOverlappedPerIO;

static std::mutex ConMx;
static std::set<ConnectionInfo> ConMap;

int InitUdpServer(IN unsigned short port, OUT ServerParams& pms)
{
	WSADATA wsaData;
	int ret;
	ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
	if (ret == 0)
	{
		pms.listenerSock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
		if (pms.listenerSock != INVALID_SOCKET)
		{
			int reuse = 1;
			setsockopt(pms.listenerSock, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(int));
			sockaddr_in address;
			address.sin_family = AF_INET;
			address.sin_addr.s_addr = inet_addr("127.0.0.1");
			address.sin_port = htons(port);
			ret = bind(pms.listenerSock, (SOCKADDR*)&address, sizeof(SOCKADDR));
			if (ret == 0)
			{
				pms.iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
				if (pms.iocp != NULL)
				{
					if (NULL != CreateIoCompletionPort((HANDLE)pms.listenerSock, pms.iocp, NULL, 0))
					{
						return 0;
					}
					CloseHandle(pms.iocp);
				}
			}
			closesocket(pms.listenerSock);
		}
		WSACleanup();
	}
	if (ret == 0)
		ret = -1;
	return ret;
}

int UdpPostAcceptEx(IN HANDLE handle, IN SOCKADDR_IN* clientAddr, struct UdpOverlappedPerIO** slot)
{
	SOCKET sock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
	if (sock == INVALID_SOCKET)
		return -1;
	sockaddr_in address;
	address.sin_family = AF_INET;
	address.sin_addr.s_addr = inet_addr("127.0.0.1");
	address.sin_port = 0;//htons(UDP_SERVER_PORT);
	//int reuse = 1;
	//setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(int));
	int ret = bind(sock, (SOCKADDR*)&address, sizeof(SOCKADDR));
	UdpOverlappedPerIO* overlp = new UdpOverlappedPerIO;
	if (overlp == nullptr || ret != 0)
	{
		if (overlp != nullptr)
			delete overlp;
		closesocket(sock);
		return -1;
	}
	CreateIoCompletionPort((HANDLE)sock, handle, NULL, 0);
	memset(overlp, 0x00, sizeof(UdpOverlappedPerIO));
	overlp->socket = sock;
	overlp->wsaBuf.buf = overlp->rbuff;
	overlp->wsaBuf.len = BUFFER_SIZE;
	overlp->type = IO_OP_TYPE::IO_RECV;
	overlp->remoteAddr = *clientAddr;
	overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
	ret = connect(sock, (struct sockaddr*)clientAddr, sizeof(SOCKADDR_IN));
	if (ret != 0)
	{
		int errorCode = WSAGetLastError();
		std::cout << "Connect to Peer Failed: " << errorCode << std::endl;
	}
	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
	*slot = overlp;
	return sock;
}

void MessageProcess(IN HANDLE handle, IN SOCKADDR_IN* clientAddr, IN int fd, IN void* msg, IN DWORD msglen, OUT void* res, IN OUT DWORD* reslen, BOOL first_package = FALSE);

DWORD WINAPI udpworkthread(LPVOID lpParam)
{
	ServerParams* pms = (ServerParams*)lpParam;
	HANDLE complectionPort = pms->iocp;
	SOCKET listenSock = pms->listenerSock;
	DWORD bytesTrans;
	ULONG_PTR completionKey;
	LPUdpOverlappedPerIO overlp;
	ConnectionInfo conn_checker = { 0,0,0 };
	DWORD errCode;
	int ret;
	while (1)
	{
		//if udp socket has been closed locally, GetQueuedCompletionStatus will return false immediately with a ERROR_OPERATION_ABORTED error code.
		BOOL result = GetQueuedCompletionStatus(complectionPort, &bytesTrans, &completionKey, (LPOVERLAPPED*)&overlp, INFINITE);
		if (!result)
		{
			errCode = GetLastError();
			if ((errCode == WAIT_TIMEOUT) || (errCode == ERROR_NETNAME_DELETED) || (errCode == ERROR_OPERATION_ABORTED))
			{
				if (overlp != NULL)
				{
					std::cout << "socket closed, fd = " << overlp->socket << std::endl;
					closesocket(overlp->socket);
                    conn_checker.ip = overlp->remoteAddr.sin_addr.S_un.S_addr;
			        conn_checker.port = overlp->remoteAddr.sin_port;
                    ConMx.lock();
			        auto it = ConMap.find(conn_checker);
                    if(it!=ConMap.end()){ConMap.erase(it);};
                    ConMx.unlock();
					delete overlp;
				}
				continue;
			}
			std::cout << "GetQueuedCompletionStatus failed, Maybe IOCP handle has been closed." << std::endl;
			break;
		}
		if (overlp == NULL)
		{
			std::cout << "GetQueuedCompletionStatus Quit Normally." << std::endl;
			break;
		}
		switch (overlp->type)
		{
		case IO_OP_TYPE::IO_ACCEPT:
		{
			std::cout << "happened IO_ACCEPT, Data length = " << bytesTrans << ", sock fd= " << overlp->socket << std::endl;
			conn_checker.ip = overlp->remoteAddr.sin_addr.S_un.S_addr;
			conn_checker.port = overlp->remoteAddr.sin_port;

			int new_sock = INVALID_SOCKET;
			LPUdpOverlappedPerIO slot = NULL;
			DWORD dwRecv = 0, dwSend = 0, dwFlag = 0;
			DWORD sendBuffSz = BUFFER_SIZE;
			ConMx.lock();
			auto it = ConMap.find(conn_checker);
			if (it == ConMap.end())
			{
				std::cout << "client accepted, ip = " << inet_ntoa(overlp->remoteAddr.sin_addr) << " port = " << ntohs(overlp->remoteAddr.sin_port) << std::endl;
				new_sock = UdpPostAcceptEx(complectionPort, &overlp->remoteAddr, &slot);
				if (slot != NULL && new_sock > 0)
				{
					MessageProcess(complectionPort, &slot->remoteAddr, new_sock, overlp->rbuff, bytesTrans, slot->wbuff, &sendBuffSz, TRUE);
					if (sendBuffSz > 0)/*first packet process*/
					{
						slot->type = IO_OP_TYPE::IO_SEND;
						slot->wsaBuf.buf = slot->wbuff;
						slot->wsaBuf.len = sendBuffSz;
						slot->remoteAddrLen = sizeof(SOCKADDR_IN);
						ret = WSASend(new_sock, &(slot->wsaBuf), 1, &dwSend, dwFlag, (LPOVERLAPPED)slot, NULL);
						//ret = WSASendTo(new_sock,&(slot->wsaBuf),1,&dwSend,dwFlag,(struct sockaddr*)&slot->remoteAddr,sizeof(SOCKADDR_IN),(LPOVERLAPPED)slot,NULL);
					}
					conn_checker.overlp = (__int64)(slot);
					ConMap.insert(conn_checker);
				}
			}
			else
			{
				slot = (LPUdpOverlappedPerIO)(it->overlp);
				MessageProcess(complectionPort, &slot->remoteAddr, slot->socket, overlp->rbuff, bytesTrans, slot->wbuff, &sendBuffSz, TRUE);
				if (sendBuffSz > 0)/*first packet process*/
				{
					slot->type = IO_OP_TYPE::IO_SEND;
					slot->wsaBuf.buf = slot->wbuff;
					slot->wsaBuf.len = sendBuffSz;
					slot->remoteAddrLen = sizeof(SOCKADDR_IN);
					ret = WSASend(slot->socket, &(slot->wsaBuf), 1, &dwSend, dwFlag, (LPOVERLAPPED)slot, NULL);
					//ret = WSASendTo(new_sock,&(slot->wsaBuf),1,&dwSend,dwFlag,(struct sockaddr*)&slot->remoteAddr,sizeof(SOCKADDR_IN),(LPOVERLAPPED)slot,NULL);
				}
			}
			ConMx.unlock();
			ZeroMemory(overlp->wbuff, BUFFER_SIZE);
			ZeroMemory(overlp->rbuff, BUFFER_SIZE);
			overlp->type = IO_OP_TYPE::IO_ACCEPT;
			overlp->wsaBuf.buf = overlp->rbuff;
			overlp->wsaBuf.len = BUFFER_SIZE;
			overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
			ret = WSARecvFrom(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, (SOCKADDR*)&(overlp->remoteAddr), &(overlp->remoteAddrLen), (LPOVERLAPPED)overlp, NULL);
		}
		break;
		case IO_OP_TYPE::IO_RECV:
		{
			std::cout << "happened IO_RECV, Data length = " << bytesTrans << ", sock fd= " << overlp->socket << std::endl;
			DWORD dwRecv = 0, dwSend = 0, dwFlag = 0;
			DWORD sendBuffSz = BUFFER_SIZE;
			ZeroMemory(overlp->wbuff, BUFFER_SIZE);
			MessageProcess(complectionPort, &overlp->remoteAddr, overlp->socket, overlp->rbuff, bytesTrans, overlp->wbuff, &sendBuffSz, FALSE);
			ZeroMemory(overlp->rbuff, BUFFER_SIZE);
			if (sendBuffSz > 0)
			{
				overlp->type = IO_OP_TYPE::IO_SEND;
				overlp->wsaBuf.buf = overlp->wbuff;
				overlp->wsaBuf.len = sendBuffSz;
				overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
				ret = WSASend(overlp->socket, &(overlp->wsaBuf), 1, &dwSend, dwFlag, (LPOVERLAPPED)overlp, NULL);
				//ret = WSASendTo(overlp->socket, &(overlp->wsaBuf), 1, &dwSend, dwFlag, (struct sockaddr*)&overlp->remoteAddr, sizeof(SOCKADDR_IN), (LPOVERLAPPED)overlp, NULL);
			}
			else
			{
				overlp->type = IO_OP_TYPE::IO_RECV;
				overlp->wsaBuf.buf = overlp->rbuff;
				overlp->wsaBuf.len = BUFFER_SIZE;
				overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
				ret = WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), NULL);
				//ret = WSARecvFrom(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, (SOCKADDR*)& (overlp->remoteAddr), &(overlp->remoteAddrLen), &(overlp->overlapped), NULL);
			}
			if (ret == SOCKET_ERROR)
			{
				errCode = GetLastError();
				if (errCode != WSA_IO_PENDING)
					std::cout << "WSARecvFrom/WSASendTo failed: " << errCode << std::endl;
			}
		}
		break;
		case IO_OP_TYPE::IO_SEND:
		{
			std::cout << "happened IO_SEND: " << bytesTrans << std::endl;
			ZeroMemory(overlp->wbuff, BUFFER_SIZE);
			ZeroMemory(overlp->rbuff, BUFFER_SIZE);
			overlp->type = IO_OP_TYPE::IO_RECV;
			overlp->wsaBuf.buf = overlp->rbuff;
			overlp->wsaBuf.len = BUFFER_SIZE;
			overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
			DWORD dwRecv = 0, dwFlag = 0;
			ret = WSARecv(overlp->socket, &(overlp->wsaBuf), 1, &dwRecv, &dwFlag, &(overlp->overlapped), NULL);
			//ret = WSARecvFrom(overlp->socket, &(overlp->wsaBuf), 1, &dwRecv, &dwFlag, (SOCKADDR*)&(overlp->remoteAddr), &(overlp->remoteAddrLen), &(overlp->overlapped), NULL);
			if (ret == SOCKET_ERROR)
			{
				errCode = GetLastError();
				if (errCode != WSA_IO_PENDING)
					std::cout << "IO_SEND WSARecv failed: " << errCode << std::endl;
			}
		}
		break;
		case IO_OP_TYPE::IO_EX_SEND:
		{
			if (overlp->deprecated == TRUE)
			{
				std::cout << "happened IO_EX_SEND: " << bytesTrans << std::endl;
				if (overlp->destroyEnable)
					delete overlp;
				break;
			}
			DWORD dwFlag = 0, dwSend = 0;
			overlp->deprecated = TRUE;
			ret = WSASendTo(overlp->socket, &(overlp->wsaBuf), 1, &dwSend, dwFlag, (struct sockaddr*)&overlp->remoteAddr, sizeof(SOCKADDR_IN), (LPOVERLAPPED)overlp, NULL);
			if (ret == SOCKET_ERROR)
			{
				errCode = GetLastError();
				if (errCode != WSA_IO_PENDING)
				{
					closesocket(overlp->socket);
					delete overlp;
					std::cout << "IO_EX_SEND WSASendTo failed: " << errCode << std::endl;
				}
			}
		}
		break;
		default:
			break;
		}
	}
	//_endthread();//<process.h>
	return 0;
}


void MessageProcess(IN HANDLE handle, IN SOCKADDR_IN* clientAddr, IN int fd, IN void* msg, IN DWORD msglen, OUT void* res, IN OUT DWORD* reslen, BOOL first_package)
{
	std::string msgstr((char*)msg, msglen);
	if (first_package)
	{
		*reslen = sprintf_s((char*)res, *reslen, "get your first msg: ");
		memcpy((char*)res + *reslen, msg, msglen);
		*reslen = *reslen + msglen;
	}
	else
	{
		*reslen = sprintf_s((char*)res, *reslen, "Hello there ");
		memcpy((char*)res + *reslen, msg, msglen);
		*reslen = *reslen + msglen;
	}
	return;
	//*reslen = 0;//根据业务情况,如果无法立即返回结果,则可以在这里置为0,然后在合适的时机,通过PostQueuedCompletionStatus投递写事件

	//*********************************** examples for write delay*******************************
	//{
		//LPUdpOverlappedPerIO overlp = new UdpOverlappedPerIO;
		//memset(overlp, 0x00, sizeof(UdpOverlappedPerIO));
		//overlp->socket = fd;
		//overlp->type = IO_OP_TYPE::IO_EX_SEND;
		//overlp->wsaBuf.len = sprintf_s(overlp->wbuff, BUFFER_SIZE, "Delay reply from server socket, with fd=%d.\n", fd);
		//overlp->wsaBuf.buf = overlp->wbuff;
		//overlp->deprecated = FALSE;
		//overlp->destroyEnable = TRUE;
		//overlp->remoteAddr = *clientAddr;
		//overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
		//PostQueuedCompletionStatus(handle, 0, 0, (LPOVERLAPPED)overlp);
	//}
	//********************************** end for examples***************************************
}

int main()
{
	int ret;
	ServerParams pms{ 0 };
	ret = InitUdpServer(UDP_SERVER_PORT, pms);
	if (ret != 0)
	{
		std::cout << "Init server failed." << std::endl;
		return -1;
	}
	for (int i = 0; i < THREAD_COUNT; i++)
	{
		CreateThread(NULL, 0, udpworkthread, &pms, 0, NULL);
	}

	for (int i = 0; i < START_POST_ACCEPTEX; i++)
	{
		UdpOverlappedPerIO* overlp = new UdpOverlappedPerIO;
		memset(overlp, 0x00, sizeof(UdpOverlappedPerIO));
		overlp->socket = pms.listenerSock;
		overlp->wsaBuf.buf = overlp->rbuff;
		overlp->wsaBuf.len = BUFFER_SIZE;
		overlp->type = IO_OP_TYPE::IO_ACCEPT;
		overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
		DWORD dwFlag = 0, dwRecv = 0;
		std::cout << "udp server listenning sock= " << overlp->socket << std::endl;
		ret = WSARecvFrom(overlp->socket, &(overlp->wsaBuf), 1, &dwRecv, &dwFlag, (struct sockaddr*)&(overlp->remoteAddr), &overlp->remoteAddrLen, (LPOVERLAPPED)overlp, NULL);
		if (ret == SOCKET_ERROR)
		{
			int errCode = GetLastError();
			if (errCode != WSA_IO_PENDING)
				std::cout << "WSARecvFrom failed: " << errCode << std::endl;
		}
	}
	getchar();
	for (int i = 0; i < THREAD_COUNT; i++)
	{
		PostQueuedCompletionStatus(pms.iocp, 0, 0, NULL);
	}
	Sleep(1000);
	closesocket(pms.listenerSock);
	CloseHandle(pms.iocp);
	WSACleanup();
	return 0;
}

 三. UDP并发网络模型的用途

1. 分布式系统日志收集

2. 内网穿透p2p

3. 进程通信

(欢迎补充)文章来源地址https://www.toymoban.com/news/detail-754018.html

到了这里,关于关于socket的地址复用和端口复用技术与UDP并发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java中网络的基本介绍。网络通信,网络,ip地址,域名,端口,网络通信协议,TCP/IP传输过程,网络通信协议模型,TCP协议,UDP协议

    - 网络通信 概念:网络通信是指 通过计算机网络进行信息传输的过程 ,包括数据传输、语音通话、视频会议等。在网络通信中,数据被分成一系列的数据包,并通过网络传输到目的地。在数据传输过程中,需要确保数据的完整性、准确性和安全性。常见的网络通信协议有T

    2024年02月10日
    浏览(53)
  • 中科大计网学习记录笔记(十四):多路复用与解复用 | 无连接传输:UDP

    前言: 学习视频:中科大郑烇、杨坚全套《计算机网络(自顶向下方法 第7版,James F.Kurose,Keith W.Ross)》课程 该视频是B站非常著名的计网学习视频,但相信很多朋友和我一样在听完前面的部分发现信息量过大,有太多无法理解的地方,在我第一次点开的时候也有相同的感受

    2024年02月21日
    浏览(33)
  • TCP/UDP/Socket 通俗讲解

    1.封包和拆包 封包,就是发送数据前把自己本地需要发送的数据包装一下,即把要发送的原始数据附加上接受者可以辨识到自己身份等一些额外信息。有点像寄一封信,信封上填写的寄件人和收件人以及地址。 拆包,是接收到对方封包后发送来的数据后,拆出原始信息和对方

    2023年04月18日
    浏览(27)
  • TCP 和 UDP 的 Socket 调用

    在网络层,Socket 函数需要指定到底是 IPv4 还是 IPv6,分别对应设置为 AF_INET 和 AF_INET6。另外,还要指定到底是 TCP 还是 UDP。TCP 协议是基于数据流的,所以设置为 SOCK_STREAM,而 UDP 是基于数据报的,因而设置为 SOCK_DGRAM。 TCP 的服务端要先监听一个端口,一般是先调用 bind 函数,

    2024年02月08日
    浏览(27)
  • Java——TCP UDP Socket编程

    目录 一、网络的相关概念 (一)网络通信 (二)网络 (三)ip地址 (四)ipv4地址分类 (五)域名 (六)网络通信协议 (七)TCP和UDP 二、InetAddress类 三、Socket 四、TCP网络编程 (一)案例一——使用字节流 (二)案例二——使用字节流  (三)案例三——使用字符流 (四

    2024年02月06日
    浏览(30)
  • 基于Socket简单的UDP网络程序

    ⭐ 小白苦学IT的博客主页 ⭐ 初学者必看:Linux操作系统入门 ⭐ 代码仓库:Linux代码仓库 ❤关注我一起讨论和学习Linux系统 网络编程前言 网络编程是连接数字世界的桥梁,它让计算机之间能够交流信息,为我们的生活和工作带来便利。从简单的网页浏览到复杂的分布式系统,

    2024年04月12日
    浏览(28)
  • 网络通信(Socket/TCP/UDP)

    Socket(又叫套接字)是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接协议,客户端的IP地址,客户端的端口,服务器的IP地址,服务器的端口。 一个Socket是一对IP地址和端口。 Socket可以看

    2024年01月22日
    浏览(45)
  • 基于TCP/UDP的Socket编程

    ---- socket概述: socket是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用已实现进程在网络中通信。 socket起源于UNIX,在Unix一切皆文件哲学的思想下,socket是一种\\\"打开—读/写—关闭\\\"模式的实现,服务器和客户端各自维护一个

    2024年02月10日
    浏览(28)
  • Socket套接字(UDP数据报)篇

    Socket套接字,是由系统提供用于网络通信的技术,是基于TCP/IP协议的网络通信的基本操作单元. 基于Socket套接字的网络程序开发就是网络编程. 使用的是UDP(User Datagram Protocol)协议,传输层协议. 无连接 不可靠传输 面向数据报 全双工 对于UDP协议来说,具有无连接,面向数据报的特征

    2024年04月29日
    浏览(27)
  • socket的基本使用,基于TCP/UDP

    OSI参考模型 Open System Interconnect 开放式系统 每层网络的由来 物理层:010101比特流,设备之间原始数据的传输,数模转换(发送端)和模数转换(接收端) -》传输过程可能出现错码和误码? 数据链路层:将原始比特流转换成逻辑传输符号,提供纠错编码,格式变为帧 -》出现

    2024年02月03日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包