目录
一、案例项目设计概括
二,案例整体设计
三、TCP/Socket 的服务端与客户端通信代码设计
3.1 tcp/socket通信
3.2 客户端信息管理
3.3 数据的读取与写入
四、tcp/socket通信扩展
4.1 线程类及消息队列类设计
4.2 读取及写入数据处理类
4.3 更上层集成的Socket-API接口
五、tcp/socket业务数据通信处理
5.1 数据编解码设计(序列化及反序列化)
5.2 结构化数据传递
六、项目最终呈现
6.1 增加日志记录模块
6.2 项目完整目录结构
6.3 程序编译如下
6.4 测试
6.4 结语
七、源码附录
7.1 commom文件夹
7.2 srv_IO目录
7.4 client_IO目录
7.4 main.cpp
一、案例项目设计概括
我们c/c++开发人员在实际开发过程中,不可不知TCP通信,也不可不知标准库针对TCP通信提供的Socket实现。也行在很多稍大一些的项目中,我们会借用其他通信中间件来实现TCP通信,但是在日常很多小项目、原型验证项目、本地化局部通信项目等项目中,TCP/Socket通信都是优先想到和使用的方法。
在本文中,将抛开关于TCP/Socket通信的请求、握手等通信过程和各种socket相关函数及结构体的原理说明,这种技术文档太多太多了,本文将用实际使用的角度,直接设计一个项目使用TCP/Socket来阐述其如何通信、如何应用细化、如何切换项目需要的由简入繁的项目开发工程。
假定一:
现在需要做一个简单的TCP/Socket服务端程序,和一个TCP/Socket客户端的程序,服务端和客户端均支持win/linux平台,客户端链接上服务端,并发送一串字符串。
假定二:
基于假定一,实现一个支持多个客户端链接的TCP/Socket服务端程序,和一个TCP/Socket客户端程序,各个客户端链接上服务端,并发送一串字符串到服务端,服务端也可以给各个客户端统一发送字符串或给特定客户端发送字符串。
假定三:
基于假定二,实现服务端接收数据和处理数据分离,为其建立独立线程,为了减少socket读写数据等待处理,为写入数据和读取数据建立消息队列缓存。
假定四:
基于假定三,实现通信数据的编码、解码设计,并实现结构化数据传输。
二,案例整体设计
上面的假设就是开发案例的业务需求描述,较简单,会有欠缺很多细节业务描述,其实正如我们实际项目中来自客户简单口述的需求一样。
按上述假设,先进行初步展开,其整体概念如下图,
1)核心是基于TCP/Socket建立服务端和客户端的接口,实现TCP连接及通信;
2)基于该接口,分别创建服务端和客户端的业务数据处理的数据写入socket和从socket读取数据的独立线程,可以为写入数据和读取数据建立缓存消息队列,该队列可以视业务复杂性,放置独立线程内或单独建立一个数据集处理类;
3)为了应对不断连接或断开连接的客户端,还需要建立一个客户端端管理功能类或模块;这些类或模块集成在一起,构成服务端或客户端的socket-API(外观模式),统一给业务应用模块调用实现TCP通信;
4)服务端和客户端的Socket-API基于共同的功能模块如线程、互斥锁、消息队列、编解码、日志等功能模块,以及共同的结构化数据模块,尤其是服务端和客户端在进行数据结构化传输的共同达成一致的数据结构。
本文采用vs2015+cmake(win)和g++ +cmake(Linux)进行代码编译,按上述设计,构建开发项目的目录框架如下:
#
TCP_StructData
bin #编译输出结果
client_IO #客户端Socket的API功能源码
client_test #客户端业务功能源码
build_win #客户端windows编辑中间文件输出目录
build_linux #客户端linux编辑中间文件输出目录
main.cpp
CMakeLists.txt #cmake配置文件
common #共同功能模块或数据结构源码
srv_IO #服务端Socket的API功能源码
svr_test #服务端业务功能源码
build_win #客户端windows编辑中间文件输出目录
build_linux #客户端linux编辑中间文件输出目录
main.cpp #
CMakeLists.txt #cmake配置文件
三、TCP/Socket 的服务端与客户端通信代码设计
Socket 通常是服务端先启动,服务端启动后建立socket结构体,然后初始化就进行bind函数调用绑定,然后调用listen函数建立客户端连接侦听能力,而实际获取客户端连接信息,是通过调用accept函数来实现的。
3.1 tcp/socket通信
服务端:
先构建两个函数,onConnect函数和Accept函数,onConnect函数用于建立及初始化socket结构体,并进行bind绑定和listen侦听。Accept函数则是监测是否由新客户端连接进来。部分代码如下图:
int MySocketPrivate::onConnect()
{
if (m_OnListen) //服务器Socket是否已经创建
{
//your code
return 1;
}
else {
#ifdef WIN32
m_SSocket = socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN addrServ;
addrServ.sin_family = AF_INET;
addrServ.sin_port = htons(m_Port);
addrServ.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
bind(m_SSocket, (SOCKADDR*)&addrServ, sizeof(SOCKADDR));
//如果创建Socket失败则提示,成功则开始监听
if (listen(m_SSocket, 20) == SOCKET_ERROR)
{
closesocket(m_SSocket);
//your code
return -1;
}
else {
//your code
m_OnListen = true;
return 1;
}
#else
m_SSocket = socket(AF_INET, SOCK_STREAM, 0);
if (MY_SOCKET_NULL == m_SSocket)
{
//your code
return -1;
}
struct sockaddr_in s_add;
bzero(&s_add, sizeof(struct sockaddr_in));
s_add.sin_family = AF_INET;
s_add.sin_addr.s_addr = htonl(INADDR_ANY);
s_add.sin_port = htons(m_Port);
if (-1 == bind(m_SSocket, (struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
{
//your code
return -1;
}
else {
//your code
}
if (-1 == listen(m_SSocket, 5))
{
//your code
return -1;
}
else {
//your code
}
m_OnListen = true;
return 1;
#endif
}
}
bool MySocketPrivate::Accept()
{
bool bRet = true;
if (m_OnListen)
{
#ifdef WIN32
SOCKADDR_IN cliAddr;
int length = sizeof(SOCKADDR);
SOCKET cliSock = accept(m_SSocket, (SOCKADDR*)&cliAddr, &length);
if (INVALID_SOCKET == cliSock)
{
closesocket(cliSock);
//your code
bRet = false;
}
else {
char _ipport[64] = { 0 };
sprintf_s(_ipport, "%s:%d", (char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
int nNetTimeout = 100; //1秒
setsockopt(cliSock, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));//这是对客户端句柄进行设置
//your code
}
#else
int sin_size = sizeof(struct sockaddr_in);
struct sockaddr_in c_add;
int nfp = accept(m_SSocket, (struct sockaddr *)(&c_add), (socklen_t*)&sin_size);
if (-1 == nfp)
{
//your code
bRet = false;
}
else {
char _ipport[64] = { 0 };
std::string _ipStr = inet_ntoa((*(in_addr*)&(c_add.sin_addr)));
int _port = static_cast<int>(htons(c_add.sin_port));
//your code
}
#endif
}
else {
bRet = false;
//your code
}
return bRet;
};
通常需要根据项目实际情况调用setsockopt函数设置相关链接参数,需要在注意的是,在onConnect内对服务端部分进行配置,在Accept函数可对成功链接进来的客户端部分进行配置:
/*
*windows 参数设置描述
*3.在send(),recv()过程中有时由于网络状况等原因,收发不能预期进行,可以设置收发时限:
*int nNetTimeout = 1000; //1秒
*发送时限
*setsockopt( socket, SOL_SOCKET, SO_SNDTIMEO, ( char * )&nNetTimeout, sizeof( int ) );
*接收时限
*setsockopt( socket, SOL_SOCKET, SO_RCVTIMEO, ( char * )&nNetTimeout, sizeof( int ) );
*4.在send()的时候,返回的是实际发送出去的字节(同步)或发送到socket缓冲区的字节(异步);系统默认的状态发送和接收一次为8688字节(约
*为8.5K);在实际的过程中如果发送或是接收的数据量比较大,可以设置socket缓冲区,避免send(),recv()不断的循环收发:
* 接收缓冲区
*int nRecvBufLen = 32 * 1024; //设置为32K
*setsockopt( s, SOL_SOCKET, SO_RCVBUF, ( const char* )&nRecvBufLen, sizeof( int ) );
*发送缓冲区
*int nSendBufLen = 32*1024; //设置为32K
*setsockopt( s, SOL_SOCKET, SO_SNDBUF, ( const char* )&nSendBufLen, sizeof( int ) );
*5.在发送数据的时,不执行由系统缓冲区到socket缓冲区的拷贝,以提高程序的性能:
*int nZero = 0;
*setsockopt( socket, SOL_SOCKET, SO_SNDBUF, ( char * )&nZero, sizeof( nZero ) );
*6.在接收数据时,不执行将socket缓冲区的内容拷贝到系统缓冲区:
*int nZero = 0;
*setsockopt( s, SOL_SOCKET, SO_RCVBUF, ( char * )&nZero, sizeof( int ) );
*/
客户端部分:
客户端是通过指定的ip和port链接到服务端的,在socket通信这方面,仅需要建立及初始化sokcet结构体(配置ip及port),调用sokcet的connect api链接服务端。
int MySocketClient::onConnect()
{
if (m_OnConnect) //
{
//your code
return 0;
}
//防止链接冲突调用
if (m_OnConnecting)
{
return 0;
}
try {
m_OnConnecting = true;
#ifdef WIN32
sock_fd = static_cast<int>(socket(AF_INET, SOCK_STREAM, 0));
SOCKADDR_IN ser_addr;
memset(&ser_addr, 0, sizeof(ser_addr));
ser_addr.sin_family = AF_INET;
ser_addr.sin_addr.s_addr = inet_addr(m_IpAddress.c_str());
ser_addr.sin_port = htons(static_cast<unsigned short>(m_Port));
if (connect(sock_fd, (struct sockaddr *)&ser_addr, sizeof(ser_addr)) < 0)
{
//your code
m_OnConnecting = false;
return -1;
}
int nNetTimeout = 10; //10毫秒
setsockopt(sock_fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&nNetTimeout, sizeof(int));
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));
//KeepAlive
bool bKeepAlive = true;
int nRet = setsockopt(sock_fd, SOL_SOCKET, SO_KEEPALIVE,(char*)&bKeepAlive, sizeof(bKeepAlive));
if (nRet == SOCKET_ERROR)
{
//your code
}
// set KeepAlive parameter
tcp_keepalive alive_in;
tcp_keepalive alive_out;
alive_in.keepalivetime = 1000; // 1s
alive_in.keepaliveinterval = 3000; //3s
alive_in.onoff = true;
unsigned long ulBytesReturn = 0;
nRet = WSAIoctl(sock_fd, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in),
&alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL);
if (nRet == SOCKET_ERROR)
{
//your code
}
m_OnConnect = true;
m_OnConnecting = false;
m_ConnectONLog = true;
return 1;
#else
sock_fd = socket(PF_INET, SOCK_STREAM, 0);
if (-1 == sock_fd)
{
//your code
m_OnConnecting = false;
return -1;
}
struct sockaddr_in s_add;
bzero(&s_add, sizeof(struct sockaddr_in));
s_add.sin_family = PF_INET;
s_add.sin_addr.s_addr = inet_addr(m_IpAddress.c_str());
s_add.sin_port = htons(m_Port);
if (-1 == connect(sock_fd, (struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
{
//your code
m_OnConnecting = false;
return -1;
}
int x = fcntl(sock_fd, F_GETFL, 0);
fcntl(sock_fd, F_SETFL, x | O_NONBLOCK);
//your code
m_OnConnect = true;
m_OnConnecting = false;
return 1;
#endif
}
catch (...) {
//your code
m_OnConnecting = false;
return -2;
}
}
3.2 客户端信息管理
在服务端,针对多客户端链接,服务端是无法预知那些客户端连接进来的,只有在accept函数成功返回时,才能获得客户端句柄及具体信息,对于程序而言和那个客户端通信,知道客户端句柄就可以,而对于业务应用(客户)而言,需要知道的是客户端的IP和端口。因此就需要将客户端句柄和网络ip及端口映射起来,并将这些相关信息进行缓存管理。
accept函数返回的客户端句柄数据类型在windows和linux系统是不一致的,因此需要统一
#ifdef WIN32
#define MY_SOCKET SOCKET
#define MY_SOCKET_NULL NULL
#endif
#ifdef __linux__
#define MY_SOCKET int
#define MY_SOCKET_NULL (-1)
#endif
构建和管理网络信息(IP、port)和客户端句柄的映射关系,本人采用了std::map管理,将网络信息集成到一个KeyObj_Client类里面,在common文件夹建立hashmap.h/cpp源码文件,实现KeyObj_Client类,并将该类实例作为map的key,因此为该类添加相关的比较运算操作符函数,以满足作为map的key及排序需要。
class KeyObj_Client
{
public:
KeyObj_Client(std::string _ipStr, int _port);
//
static long cmp_Key(const KeyObj_Client &obj1, const KeyObj_Client &obj2);
std::string m_ipStr;
int m_port;
int linkFlag;
long m_ip; //网络地址整型表述
private:
};
inline bool operator==(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) == 0; }
inline bool operator!=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) != 0; }
inline bool operator>=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) >= 0; }
inline bool operator<=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) <= 0; }
inline bool operator>(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) > 0; }
inline bool operator<(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) < 0; }
/*------------------------------------------------------------------------*/
std::map<KeyObj_Client,MY_SOCKET> m_CSockets; //绑定客户端
在实现通信逻辑处理时,字符串处理相对不方便,为此还将字符串ip信息转换为整型IP信息。关于网络ip的字符串和整型之间的互换功能,在common文件夹建立myFunc.h/cpp源文件,实现该ip的转换功能函数。
//myFunc.h
namespace PFunc
{
//
bool ipCheck(std::string ip_str);
long ipToInt(std::string ip_str);
std::string intToIp(long ip_int);
};
//myFunc.cpp
bool PFunc::ipCheck(std::string ip_str)
{
if (INADDR_NONE != inet_addr(ip_str.c_str()))
{
return true;
}
return false;
};
long PFunc::ipToInt(std::string ip_str)
{
if (INADDR_NONE != inet_addr(ip_str.c_str()))
{
return ntohl(inet_addr(ip_str.c_str()));
}
else {
CLogger::createInstance()->Log(eConfigError
, "ip format [%s] error: %s %s %d,please check the file format and code!"
, ip_str.c_str(), __FILE__, __FUNCTION__, __LINE__);
return 0;
}
};
std::string PFunc::intToIp(long ip_int)
{
char ip[64] = { 0 };
#ifdef WIN32
strcpy_s(ip, inet_ntoa(*(in_addr*)&ip_int));
#else
strcpy(ip, inet_ntoa(*(in_addr*)&ip_int));
#endif
return std::string(ip);
};
因此,KeyObj_Client类在显式实例时,就调用ip的转换功能函数,并将字符串ip和整型ip信息都存储起来,便于使用
//hashmap.cpp
KeyObj_Client::KeyObj_Client(std::string _ipStr, int _port)
: m_ipStr(_ipStr), m_port(_port), linkFlag(0)
{
m_ip = PFunc::ipToInt(_ipStr);
};
//
long KeyObj_Client::cmp_Key(const KeyObj_Client &obj1, const KeyObj_Client &obj2)
{
long diff = obj1.m_ip - obj2.m_ip;
if (diff != 0) return diff;
diff = obj1.m_port - obj2.m_port;
if (diff != 0) return diff;
return 0;
};
因此,在accept函数成功返回及获得客户端句柄及信息,就将按这些信息创建KeyObj_Client实例,并存入到std::map的m_CSockets缓存容器中,Accept-API函数的部分代码调整如下。
bool MySocketPrivate::Accept()
{
bool bRet = true;
if (m_OnListen)
{
#ifdef WIN32
SOCKADDR_IN cliAddr;
int length = sizeof(SOCKADDR);
SOCKET cliSock = accept(m_SSocket, (SOCKADDR*)&cliAddr, &length);
if (INVALID_SOCKET == cliSock)
{
closesocket(cliSock);
//your code
bRet = false;
}
else {
//cliAddr.sin_addr.S_un.S_addr;
char _ipport[64] = { 0 };
sprintf_s(_ipport, "%s:%d", (char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
KeyObj_Client _linkInfo((char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
int nNetTimeout = 100; //1秒
setsockopt(cliSock, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));
m_MyMutex.Lock();
m_CSockets[_linkInfo] = cliSock;//添加客户端
m_MyMutex.Unlock();
}
#else
int sin_size = sizeof(struct sockaddr_in);
struct sockaddr_in c_add;
// printf("MySocketPrivate::Accept 1\n");
int nfp = accept(m_SSocket, (struct sockaddr *)(&c_add), (socklen_t*)&sin_size);
if (-1 == nfp)
{
//your code
bRet = false;
}
else {
char _ipport[64] = { 0 };
std::string _ipStr = inet_ntoa((*(in_addr*)&(c_add.sin_addr)));
int _port = static_cast<int>(htons(c_add.sin_port));
sprintf(_ipport, "%s:%d", _ipStr.c_str(), _port);
int x = fcntl(nfp, F_GETFL, 0);
fcntl(nfp, F_SETFL, x | O_NONBLOCK);
KeyObj_Client _linkInfo(_ipStr, _port);
m_MyMutex.Lock();
m_CSockets[_linkInfo] = nfp;
m_MyMutex.Unlock();
//your code
}
#endif
}
else {
bRet = false;
//your code
}
return bRet;
};
如上述代码,除了Accept成功时操作m_CSockets缓存容器外,在发送、读取数据失败,客户端端主动断开、异常断开等都会操作m_CSockets缓存容器,为了确保m_CSockets缓存容器的一致性,因此添加了互斥锁对m_CSockets缓存容器进行一致性保护。
PYMutex m_MyMutex;
std::map<KeyObj_Client,MY_SOCKET> m_CSockets; //绑定客户端
因此在common文件夹建立Mutex.h/cpp源文件,创建了PYMutex类,实现互斥锁。
//Mutex.h
typedef void *HANDLE;
class IMutex
{
public:
virtual ~IMutex() {}
virtual void Lock() const = 0;
virtual bool TryLock() const = 0;
virtual void Unlock() const = 0;
};
class PYMutex : public IMutex
{
public:
PYMutex();
~PYMutex();
virtual void Lock() const;
virtual bool TryLock() const;
virtual void Unlock() const;
private:
#ifdef _WIN32
HANDLE m_mutex;
#else
mutable pthread_mutex_t m_mutex;
#endif
};
因此,需要提供对基于m_CSockets缓存容器的写入,删除,清空等操作实现对接入客户端的管理:
//实例销毁时调用
void MySocketPrivate::disConnect()
{
deleteCSocket();//删除客户端
deleteSSocket();//删除服务端
#ifdef WIN32
WSACleanup();
#endif
}
//删除所有客户端
void MySocketPrivate::deleteCSocket()
{
m_MyMutex.Lock();
std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
while (it != m_CSockets.end())
{
//删除链接
deleteCSocket(it->second);
#ifdef WIN32
it = m_CSockets.erase(it);
#else
std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
m_CSockets.erase(ittemp);
#endif
}
m_MyMutex.Unlock();
}
//删除指定客户端
void MySocketPrivate::deleteCSocket(MY_SOCKET m_CSocket)
{
try {
if (MY_SOCKET_NULL != m_CSocket)
{
#ifdef WIN32
closesocket(m_CSocket);
#else
close(m_CSocket);
#endif
m_CSocket = MY_SOCKET_NULL;
}
}
catch (...) {
//your code
}
}
//删除服务端
void MySocketPrivate::deleteSSocket()
{
m_OnListen = false;
try {
if (MY_SOCKET_NULL != m_SSocket)
{
#ifdef WIN32
closesocket(m_SSocket);
#else
close(m_SSocket);
#endif
m_SSocket = MY_SOCKET_NULL;
}
}
catch (...) {
//your code
}
};
3.3 数据的读取与写入
数据读取和写入类似于管道或文件操作,主要是基于socket句柄,调用read/wirte函数或recv/send函数来实现,如一下代码,可以指定客户端读取数据,也可以指定向客户端发送数据,在出现读取数据或写入数据异常时,需要删除该客户端。
//MySocketPrivate.cpp
//return success read count
int MySocketPrivate::Read(std::map<KeyObj_Client, RDClient> &bufs)
{
int ret = 0;
m_MyMutex.Lock();
std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
while (it != m_CSockets.end())
{
char _buf[512] = { 0 };
#ifdef WIN32
int re_one = recv(it->second, _buf, 512, 0);
if (re_one <= 0)
{
int _error = GetLastError();
if (_error != 10060)
{
//other code
//删除链接
deleteCSocket(it->second);
it = m_CSockets.erase(it);
continue;
}
else {
re_one = 0;
}
}
#else
int re_one = recv(it->second, _buf, 256, MSG_DONTWAIT);
if (re_one <= 0)
{
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re_one = 0;
}
else {
//other code
//删除连接
deleteCSocket(it->second);
std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
m_CSockets.erase(ittemp);
continue;
}
}
#endif
if (re_one>0)
{
ret += 1;
std::map<KeyObj_Client, RDClient>::iterator itrd = bufs.find(it->first);
if (itrd != bufs.end())
{
itrd->second.add((unsigned char*)_buf, re_one);
}
else {
bufs[it->first] = RDClient((unsigned char*)_buf, re_one);
}
}
it++;
}
m_MyMutex.Unlock();
return ret;
};
//return success count
int MySocketPrivate::Write(const char* buf, int size)//向全部客户端发送数据
{
int ret = 0;
m_MyMutex.Lock();
std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
while (it != m_CSockets.end())
{
// printf_s("write data %d to client is started!\r\n",size);
#ifdef WIN32
int re = send(it->second, buf, size, 0);
if (re <= 0)
{
int _error = GetLastError();
if (_error != 10060)
{
//other code
//删除连接
deleteCSocket(it->second);
it = m_CSockets.erase(it);
continue;
}
else {
re = 0;
}
}
#else
int re = send(it->second, buf, size, MSG_DONTWAIT);
if (re <= 0)
{
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re = 0;
}
else {
//other code
//删除连接
deleteCSocket(it->second);
std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
m_CSockets.erase(ittemp);
continue;
}
}
#endif
else{
ret += 1;
}
it++;
}
m_MyMutex.Unlock();
return ret;
};
//return success count
int MySocketPrivate::Write(unsigned long long ipInt, const char* buf, int size)
{
int ret = 0;
m_MyMutex.Lock();
std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
while (it != m_CSockets.end())
{
//当前版本只针对网络地址做判断,即一台电脑多个客户端连接,都会被发送数据
if ((unsigned long long)it->first.m_ip == ipInt)
{
#ifdef WIN32
int re = send(it->second, buf, size, 0);
if (re < 0)
{
int _error = GetLastError();
if (_error != 10060)
{
//other code
//删除连接
deleteCSocket(it->second);
it = m_CSockets.erase(it);
continue;
}
else {
re = 0;
}
}
#else
int re = send(it->second, buf, size, MSG_DONTWAIT);
if (re <= 0)
{
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re = 0;
}
else {
//other code
//删除连接
deleteCSocket(it->second);
std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
m_CSockets.erase(ittemp);
continue;
}
}
#endif
else {
ret += 1;
}
}
it++;
}
m_MyMutex.Unlock();
return ret;
}
由于Read-API是支持从多个客户端读取数据,因此其数据采用一个RDClient结构体来缓存,需要注意的是,每次Read调用时,每个客户端都可能会一次性读取多帧数据,也有可能读取非完整帧数据,这是在实际项目中受到读取的效率、读取间隔、处理效率等影响。在common目录下创建DataDef.h源文件。
//DataDef.h
#define RDCSIZE 1024
struct RDClient
{
RDClient()
: len(0)
{
memset(Buf,0,RDCSIZE);
};
RDClient(unsigned char *buf,int nlen)
{
memset(Buf,0,RDCSIZE);
memcpy(Buf,buf,nlen);
len = nlen;
};
~RDClient()
{
};
RDClient& operator=(const RDClient &rval)
{
if (this!=&rval)
{
memset(Buf,0,RDCSIZE);
memcpy(Buf,rval.Buf,rval.len);
len = rval.len;
}
return *this;
};
int add(unsigned char *buf,int nlen)
{
try{
memset(Buf+len,0,RDCSIZE-len);
memcpy(Buf+len,buf,nlen);
len += nlen;
}catch(...)
{
printf("RDClient::add error \r\n");
}
return len;
};
unsigned char Buf[RDCSIZE];
int len;
};
至此,整个服务端的tcp/socket通信接口MySocketPrivate类实现为,在srv_IO目录下,创建MySocketPrivate.h/cpp源文件,类设计如下(完整的MySocketPrivate.h/cpp见附录):
//MySocketPrivate.h
class MySocketPrivate
{
public:
MySocketPrivate(unsigned int port)
: m_Port(port)
, m_OnListen(false)
{
m_SSocket = MY_SOCKET_NULL;
//m_CSockets.clear();
#ifdef WIN32
/*
* This function should be called once in each secondary thread
* before the first socket is created in the new thread.
*/
SocketThreadInit();
#endif
};
~MySocketPrivate(){
disConnect();
};
public:
int onConnect();
void disConnect();
int Read(std::map<KeyObj_Client,RDClient> &bufs);
int Write(const char* buf, int size);
int Write(unsigned long long ipInt,const char* buf, int size);
bool Accept();
bool get_ipInt_list(std::set<long> &ipintlist); //获取在线端的整型IP
#ifdef WIN32
private:
void SocketThreadInit();
#endif
private:
void deleteSSocket(); //删除服务端
void deleteCSocket(); //删除所有客户端
void deleteCSocket(MY_SOCKET m_CSocket);//删除指定客户端
private:
MY_SOCKET m_SSocket; //服务端
unsigned int m_Port; //端口变量
bool m_OnListen; //用于标注侦听
PYMutex m_MyMutex;
std::map<KeyObj_Client,MY_SOCKET> m_CSockets; //绑定客户端
};
类似地,实现客户端的socket句柄管理以及读取及写入函数,整个客户端的tcp/socket通信接口MySocketClient类实现为,在client_IO目录下创建MySocketClient.h/cpp源文件,类设计如下(完整的MySocketClient.h/cpp见附录)。
//MySocketClient.h
class MySocketClient
{
public:
MySocketClient(std::string ip, UINT port);
~MySocketClient(void);
int onConnect();
void disConnect();
bool isConnect(){ return m_OnConnect; };
int reSetSocket();
int Read(RDClient &bufs);
int Read(char* buf, int size);
int Write(const char* buf, int size);
private:
#ifdef WIN32
void SocketThreadInit();
#endif
private:
int sock_fd;
//fd_set read_flags,write_flags; // you know what these are
std::string m_IpAddress;
UINT m_Port; //
bool m_OnConnect; //
/*
*当前写入失败及读取线程都可重新建立链接,m_OnConnecting设置防止冲突
*/
bool m_OnConnecting; //
bool m_ConnectONLog; //防止链接错误日志反复记录,切换状态时必定记录
unsigned int m_log_Interval; //防止链接错误日志长时间不记录,2018-10-08
};
四、tcp/socket通信扩展
按上述设计,MySocketPrivate.h/cpp接口可以实现tcp/socket服务端,MySocketClient.h/cpp接口可实现tcp/socket服务端,其调用伪代码如下所示。
//server
MySocketPrivate *my_PrivateData = new MySocketPrivate(port);
if (my_PrivateData->onConnect() > 0)
{
myPDataPrt->Accept();
//读写数据
}
//client
MySocketClient *my_PrivateData = new MySocketClient(netarg.ipStr, netarg.port);
if (my_PrivateData->onConnect() > 0)
{
//读写数据,管理客户端
}
前面所述应基本实现了假定一、二,先再来看看假定三功能。MySocketPrivate.h/cpp接口和MySocketClient.h/cpp接口提供了tcp/socket功能,但是若我们按上述伪代码调用,免不了同时处理读写数据、客户端管理,在支撑多客户端通信的情况下,数据处理效率就会差,进而造成数据堵塞。因此将读取数据和写入数据采用专用线程来处理。
4.1 线程类及消息队列类设计
首先创建MyThread类,以此为基类,创建读取数据和写入数据类。由于windows-vs和linux-g++对于thread的调用API有所差异,因此本文分开设计win和linux的线程类。
linux系统,在common文件加下创建myThread.h/cpp源文件,实现linux线程类MyThread。
//myThread.h
class MyThread
{
private:
//current thread ID
pthread_t tid;
//thread status
int threadStatus;
//get manner pointer of execution
static void* run0(void* pVoid);
//manner of execution inside
void* run1();
public:
//threadStatus-new create
static const int THREAD_STATUS_NEW = 0;
//threadStatus-running
static const int THREAD_STATUS_RUNNING = 1;
//threadStatus-end
static const int THREAD_STATUS_EXIT = -1;
// constructed function
MyThread();
~MyThread();
//the entity for thread running
virtual int Run()=0;
//start thread
bool start();
//gte thread ID
pthread_t getThreadID();
//get thread status
int getState();
//wait for thread end
void join();
//wait for thread end in limit time
void join(unsigned long millisTime);
};
winows系统,在common文件加下创建win32Thread.h/cpp源文件,实现linux线程类MyThread。
//win32Thread.h
typedef void *HANDLE;
class MyThread
{
public:
MyThread();
~MyThread();
void start();
virtual int Run();
HANDLE getThread();
private:
HANDLE hThread;
static void agent(void *p);
};
在数据读取和数据写入时,通过设计缓存消息队列,实现读写数据和数据业务处理分离。因此采用std::deque来实现消息传递。在common文件夹创建queuedata.h源文件,设计消息队列类QueueData如下。
//queuedata.h
template <class T>
class QueueData
{
public:
QueueData(std::string desc = "thread_queue");
~QueueData();
//
/**
* 获取队列大小
* @return {int } 队列大小
*/
int size();
/**
* 判定队列是否为空
* @return {bool } 是否为空队列
*/
bool isEmpty();
/**
* 获取队列头元素
* @param it {T&} 头元素
* @return {bool } 是否成功
*/
bool getFirst(T &it);
/**
* 删除元素
* @return {bool } 是否成功
*/
bool removeFirst();
/**
* 获取队列头元素,并从队列终删除
* @param it {T&} 头元素
* @return {bool } 是否成功
*/
bool pop(T &it);
/**
* 从队列头开始逐步获取多个元素,并剔除
* @param its {queue<T>&} 获取到的元素集
* @param sizel {int} 一次获取多少个
* @return {bool } 至少获取一个元素以上则成功
*/
bool getList(std::queue<T> &its,unsigned int sizel=5);
/**
* 从队列尾部添加元素
* @param it {T} 被添加元素
* @return {void } 无返回
*/
void add(T it);
/**
* 从队列头部添加元素
* @param it {T} 被添加元素
* @return {void } 无返回
*/
void add_front(T it);
/**
* 清空元素
* @return {void }
*/
void clear();
private:
void init();
QueueData& operator=(const QueueData&) {return this;};
protected:
std::string queue_desc;
private:
/点集转发
//协议解析结果缓存
std::deque<T> datacache_queue; //队列容器
PYMutex m_Mutex; //线程锁,或者如果更彻底采用acl库,采用acl::thread_mutex替代
//
static unsigned int QSize; //队列大小约束,超出是会从队列头剔除旧数据腾出空位在对末添加数据
//
int queue_overS; //队列溢出次数计数
};
线程类和消息队列类准备好后,就可以实现读取数据和写入数据类了。
4.2 读取及写入数据处理类
服务端部分,写入数据处理类MySocketWD,基于消息缓存队列QueueData存储需要发送的数据,和MySocketPrivate实例实现发送数据,即将MySocketPrivate类的实例传入MySocketWD,然后该类循环地从消息缓存队列中读取数据,在内部调用MySocketPrivate的write函数完成数据发送。对外提供AddData函数接收外部传递数据进入消息缓存队列。在srv_IO目录下,创建MySocketWD.h/cpp源文件,实现MySocketWD类。
//MySocketWD.h
class MySocketPrivate;
class MySocketWD : public MyThread
{
public:
MySocketWD(MySocketPrivate* myPDataPrt_,int netType_=1);
virtual ~MySocketWD(void);
int Run();
int AddData(const char* buf, int len);
int getBuffer(unsigned long long &_ipInt, unsigned char* _buf);
private:
bool running;
int netType;
MySocketPrivate *myPDataPrt;
QueueData<WDS> WriteData;
};
写入缓存数据结构体WDS设计如下(在DataDef.h源文件实现):
//DataDef.h
struct WDS
{
WDS() : ipInt(0), data()
{
};
WDS(unsigned long long _ipInt,TCP_Data _data)
: ipInt(_ipInt), data(_data)
{
};
WDS& operator=(const WDS &rval)
{
if (this == &rval) {
return *this;
}
ipInt = rval.ipInt;
data = rval.data;
return *this;
};
unsigned long long ipInt;
TCP_Data data;
};
其中WDS包含的TCP_Data结构体和RDClient结构体几乎一致,除了缓存区域大小,但暂时还是把他们区分开来,TCP_Data代表应用层面数据缓存结构体,RDClient代表socket通信逻辑层面数据结构体,在代码优化迭代阶段再合并也不迟。(备注,在开发中过程中,过早合并和优化代码,有时反而可能会失去真正设计过程及真实业务逻辑)
//DataDef.h
struct TCP_Data
{
TCP_Data() : len(0)
{
memset(Buf, 0, 512);
};
TCP_Data(unsigned char *buf, int nlen)
{
memset(Buf, 0, 512);
memcpy(Buf, buf, nlen);
len = nlen;
};
TCP_Data& operator=(const TCP_Data &rval)
{
if (this != &rval) {
memset(Buf, 0, 512);
if (rval.len < 512) {
memcpy(Buf, rval.Buf, rval.len);
len = rval.len;
}
else {
memcpy(Buf, rval.Buf, 512);
len = 512;
}
}
return *this;
};
unsigned char Buf[512];
int len;
};
该类最主要的就是循环体的实现
//MySocketWD.cpp
int MySocketWD::Run()
{
if (NULL == myPDataPrt)
{
//your code
return 0;
}
while(running)
{
try {
unsigned long long _ipInt = 0;
unsigned char buf[512] = { 0 };
int len = this->getBuffer(_ipInt, buf);//从缓存获取数据
if (len > 0)
{
int ret = -1;
ret = myPDataPrt->Write(_ipInt, (const char*)buf, len);//发送数据
if (ret <=0)
{
//your code
}
}
}
catch (const std::exception& e)
{
//your code
}
catch (...) {
//your code
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
类似地,读取数据处理类MySocketRD如下,在srv_IO目录下,创建MySocketRD.h/cpp源文件。
//MySocketRD.h
class MySocketPrivate;
class MySocketRD : public MyThread
{
public:
MySocketRD(MySocketPrivate* myPDataPrt_, int netType_=1);
virtual ~MySocketRD(void);
int Run();
//从缓存中读取帧数据处理,请按需自行处理该函数
int AddFrame(const std::string link, const unsigned char *buf, int len);
private:
bool running;
int netType;//数据读写处理类型
MySocketPrivate *myPDataPrt;
QueueData<RDS> ReadData;
};
缓存数据结构体RDS如下,其也包含TCP_Data结构体。
//DataDef.h
struct RDS
{
RDS() : data(),flag("")
{
};
RDS(TCP_Data _data,std::string _f = "")
: data(_data),flag(_f)
{
};
RDS& operator=(const RDS &rval)
{
if (this == &rval) {
return *this;
}
data = rval.data;
flag = rval.flag;
return *this;
};
TCP_Data data;
std::string flag;
};
该类最主要的就是循环体的实现大致如下,在该版本实现中,假定每次读取到一个完整帧数据,
//MySocketRD.cpp
int MySocketRD::Run()
{
if (NULL == myPDataPrt )
{
//your code
return 0;
}
std::map<KeyObj_Client, RDClient> bufs; //从socket读取出的缓存数据
RDS rdataGx;
while (running)
{
int re = myPDataPrt->Read(bufs);//从socket接口读取数据
if (re <= 0)
{
//your code
}else {
try {
std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
while (it != bufs.end())
{
if (it->second.len > 0)
{
RDS rdata(TCP_Data(it->second.Buf, it->second.len), it->first.m_ipStr);
ReadData.add(rdata);
}
it++;
}
bufs.clear();
}
catch (const std::exception& e)
{
//your code
}
catch (...) {
//your code
}
while (ReadData.getFirst(rdataGx))//处理缓存中的数据
{
this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
ReadData.removeFirst();
}
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
注意到本文直接在读取数据循环体内同时处理了缓存的数据(如下),也可以再采用一条独立线程来处理缓存数据,使得MySocketRD类就仅不间断循环从socket接口读取数据。当然这可视实际项目需要来设计。
while (ReadData.getFirst(rdataGx))//处理缓存中的数据
{
this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
ReadData.removeFirst();
}
客户端部分读取及写入数据处理类设计类似
客户端的写入处理类先命名和服务端的一样,也是MySocketWD,因为基本大部分功能都一样,可以便于后面进行代码优化设计。当前先在client_IO目录下,创建MySocketWD.h/cpp源文件来实现。
//client_IO/MySocketWD.h
class MySocketClient;
class MySocketWD : public MyThread
{
public:
MySocketWD(void);
virtual ~MySocketWD(void);
void setPrivateDataPtr(MySocketClient *myPData, int _netType=1);
int Run();
int add_data(const char* buf, int len);
int getBuffer(unsigned char * _buf);
int getHeartBeatBuffer(unsigned char * buf);
private:
bool running;
int netType;//数据读写处理类型
unsigned int heartBeatWrite;
MySocketClient *myPDataPrt;
QueueData<TCP_Data> WriteData;
};
其处理循环函数如下,注意的是,在循环体内除了写入数据外,还同时处理链接状态判定及重新连接情况,当然该功能也在后面优化设计时可以独立出去。另外客户端的写入线程还负责发送心跳信息,即在长时间没有从消息缓存队列读取到数据时,生成心跳数据来发送
//client_IO/MySocketWD.cpp
int MySocketWD::Run()
{
if (NULL == myPDataPrt )
{
//
return 0;
}
while(running)
{
if (!myPDataPrt->isConnect())
{
myPDataPrt->reSetSocket();//read or write thread do it
if (!myPDataPrt->isConnect())
{
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
}
}
else {
//由读取进程去重新建立链接,写入线程只判定链接状态,进行数据写入
unsigned char buf[512] = { 0 };
int len = this->getBuffer(buf);
if (len <= 0 && (heartBeatWrite+heartBeat_interval)<static_cast<unsigned int>(time(NULL)))
{
len = this->getHeartBeatBuffer(buf);//生成心跳数据
}
if (len > 0) {
int ret = myPDataPrt->Write((const char*)buf, len);
if (ret != len) {
//
}
else {
heartBeatWrite = static_cast<unsigned int>(time(NULL));
}
}
}
#ifdef WIN32
Sleep(1);
#else
usleep(1000);
#endif
}
return 0;
}
生成心跳数据函数实现如下,至于具体发送什么心跳数据,那就依据项目要求而定,或许从配置信息读取来生成是不错的选择,就不需调整代码:
//cient_IO/MySocketWD.cpp
int MySocketWD::getHeartBeatBuffer(unsigned char * buf)
{
if (NULL != buf)
{
int idx = 0;
std::string cur_time_str = PFunc::getCurrentTime();
char buf_[64]={0};
sprintf(buf_,"HeartBeat:%s",cur_time_str.c_str());
idx = (int)strlen(buf_);
memcpy(buf,buf_,idx);
return idx;
}
else
{
return 0;
}
};
客户端读取数据处理类,在 client_IO目录下,创建MySocketRD.h/cpp源文件。
//client_IO/MySocketRD.h
class MySocketClient;
class MySocketRD : public MyThread
{
public:
MySocketRD(void);
virtual ~MySocketRD(void);
void setPrivateDataPtr(MySocketClient *myPData, int _netType=1);
int Run();
//从缓存中读取帧数据处理,请按需自行处理该函数
int AddFrame(const unsigned char *buf, int len);
private:
bool running;
int netType;//数据读写处理类型
MySocketClient *myPDataPrt;
QueueData<TCP_Data> ReadData;
};
其处理循环函数如下,同样处理链接状态判定及重新连接情况(哈哈,冗余设计),另外还同时处理缓存的数据。反正循环体内就做了三件事,连接服务端,从socket读取数据并写入缓存,从缓存读取数据来处理,至于是否放在其他地方实现,呵呵,就看项目需要咯。
//client_IO/MySocketRD.cpp
int MySocketRD::Run()
{
if (NULL == myPDataPrt )
{
//
return 0;
}
RDClient rdc_data;
TCP_Data rddata;
while (running)
{
if (!myPDataPrt->isConnect())
{
myPDataPrt->reSetSocket();//read or write thread do it
if (!myPDataPrt->isConnect())
{
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
}
}
else
{
//读取帧数据
char buf[256] = { 0 };
int len = myPDataPrt->Read(buf, 256);
if (len > 0)
{
TCP_Data rdata((unsigned char*)buf, len);
ReadData.add(rdata);
}
//数据帧解析
if (ReadData.getFirst(rddata))
{
this->AddFrame(rddata.Buf, rddata.len);
ReadData.removeFirst();
}
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
4.3 更上层集成的Socket-API接口
还需要不间断监听新客户端链接情况,为了更体现独立性,把该功能也独立出来,采用专用线程类MySocketSrv去侦听客户端的链接,类似读取、写入数据处理类那样,也将MySocketPrivate类的实例传入MySocketWD。在srv_IO目录下,创建MySocketSrv.h/cpp源文件来实现。
//MySocketSrv.h
class MySocketPrivate;
class MySocketSrv : public MyThread
{
public:
MySocketSrv();
virtual ~MySocketSrv();
void setPDataPtr(MySocketPrivate *myPData);
int Run();
private:
MySocketPrivate *myPDataPrt;
};
其循环体处理逻辑很简单,
//MySocketSrv.cpp
int MySocketSrv::Run()
{
if (NULL == myPDataPrt)
{
//your code
return 0;
}
while(1)
{
myPDataPrt->Accept();
#ifdef WIN32
Sleep(300);
#else
usleep(300000);
#endif
}
return 0;
}
为了更体现是一个整体的 TCP/Socket业务接口,在srv_IO目录下创建一个MySocket.h/cpp源文件,用于从业务层面上提供一个Socket-IO接口(是否有点向外观模式,呵呵),它将socket接口,读取及写入线程,侦听线程集成进来,对外提供简单易用的socket功能:初始化及读写数据。
//srv_IO/MySocket.h
class MySocketPrivate;
class MySocketSrv;
class MySocketRD;
class MySocketWD;
class MySocket
{
public:
MySocket(unsigned int port,int netType_=1);
~MySocket(void);
public:
int Read(){ return -1; };
int Write(){ return -1; };
//
//int Read(char* buf, int size);
int Write(const char* buf, int size);
private:
MySocketPrivate *my_PrivateData;
MySocketSrv *my_SocketSrv;
MySocketRD *m_MySocketRD;
MySocketWD *m_MySocketWD;
};
MySocket类的初始化如下,初始化底层真正的Socket接口MySocketPrivate,然后依据是否listen成功而进一步将MySocketPrivate传入监测客户端线程类及读写线程类并初始化及启动线程。
//srv_IO/MySocket.cpp
MySocket::MySocket(unsigned int port,int netType_)
{
try {
my_PrivateData = new MySocketPrivate(port);
if (my_PrivateData->onConnect() > 0)
{
my_SocketSrv = new MySocketSrv();
my_SocketSrv->setPDataPtr(my_PrivateData);
my_SocketSrv->start();
m_MySocketRD = new MySocketRD(my_PrivateData,netType_);
m_MySocketRD->start();
m_MySocketWD = new MySocketWD( my_PrivateData,netType_);
m_MySocketWD->start();
}
else {
my_SocketSrv = NULL;
m_MySocketRD = NULL;
m_MySocketWD = NULL;
}
}
catch (...) {
//以下处理时防止初始化半途而废(异常)
delete my_SocketSrv;
my_SocketSrv = NULL;
delete m_MySocketRD;
m_MySocketRD = NULL;
delete m_MySocketWD;
m_MySocketWD = NULL;
delete my_PrivateData;
my_PrivateData = NULL;
}
};
同样地,在客户端层面,也创建一个MySocket类,集成socket-API接口及读写线程类,在client_IO目录,创建MySocket.h/cpp源文件来实现。
//client_IO/MySocket.h
class MySocketClient;
class MySocketWD;
class MySocketRD;
class MySocket
{
public:
MySocket(int _tranid, NetArg _netarg);
virtual ~MySocket(void);
public:
virtual int Read(){ return -1; };
virtual int Write(){ return -1; };
int Write(const char* buf, int size);
private:
int tranid;
NetArg netarg;
MySocketClient *my_PrivateData;
MySocketWD *m_MySocketWD;
MySocketRD *m_MySocketRD;
};
客户端的MySocket类的初始化如下,初始化底层真正的Socket接口MySocketPrivate,然后将MySocketPrivate传入读写线程类并初始化及启动线程。
//client_IO/MySocket.cpp
MySocket::MySocket(int _tranid, NetArg _netarg)
: tranid(_tranid)
, netarg(_netarg)
{
try {//防止构造时异常出现内存泄漏
//TCP/IP客户端,连接监控服务或其他平台
my_PrivateData = new MySocketClient(netarg.ipStr, netarg.port);
if (my_PrivateData->onConnect() <= 0)
{
//do something
}
//数据协议编码解码 序列化及反序列化
//数据发送线程
m_MySocketWD = new MySocketWD();
m_MySocketWD->setPrivateDataPtr(my_PrivateData, netarg.type);
m_MySocketWD->start();
//数据接收线程
m_MySocketRD = new MySocketRD();
m_MySocketRD->setPrivateDataPtr(my_PrivateData, netarg.type);
m_MySocketRD->start();
}
catch (...)
{
delete m_MySocketRD;
m_MySocketRD = NULL;
delete m_MySocketWD;
m_MySocketWD = NULL;
delete my_PrivateData;
my_PrivateData = NULL;
}
}
通过集成MySocket类提供接口服务后,那么调用就更简单了,如下构造socket接口及发送数据:
//srv_test/main.cpp
MySocket server_test(70001,2);
char buf[]="hello, this is server!";
server_test.Write((const char*)buf,(int)strlen(buf));
//client_test/main.cpp
MySocket client_test(1,_netarg);
char buf[]="hello, this is client 01!";
client_test.Write((const char*)buf,(int)strlen(buf));
至于读取数据,前面已经提过,在读取线程里进行类处理了,其处理如下(打印信息而已,哈哈):
//client_IO/MySocketRD.cpp
//srv_IO/MySocketRD.cpp
int MySocketRD::AddFrame(const unsigned char *buf, int len)
{
if(NULL==buf)
return 0;
printf("rev:%s\r\n",(char*)buf);
return 0;
};
五、tcp/socket业务数据通信处理
前面在读取数据时就提到,暂且当每次读取数据时是一个完整帧,但是我们回想一下,在实际使用过程中,尤其是服务端,同时读取多个客户端的数据时,可能会出现数据堆叠在一起,即有可能一次从socket句柄信道哪里读取到多帧数据,或者读取到不完整帧数据,那该如何处理呢。
5.1 数据编解码设计(序列化及反序列化)
那么就需要通信有识别帧的能力,在本文,通过一个简要设计来阐述这方面的应用。
假设编码时:
1)在编码时,逐步遍历消息中的每个字节的数据(8bit),如果有大于0XF0需要做转义处理,例如将0XF3转义为0XF0 0X03
2)在转移好的消息前加上一个字节数据0XF1作为帧头标记,在消息尾加上一个字节数据0XFF作为帧尾标记。
加上解码时
1)在解码时,先查找到0XF1标记和0XFF标记,进行帧数据分割,取出它们之间的数据作为接收到的消息
2)将接收到的消息进行转义,如果发现某字节等于0XF0,与其后续字节合并转义,例如0XF0 0X0A转义0XFA。
因此给出编解码实现函数code、uncode,并将其声明及定义加入到common目录下的myFunc.h/cpp内。
//common/myFunc.h
namespace PFunc
{
//frame code
int code(const unsigned char *buff, const int len, unsigned char *outbuf);
//frame uncode
int uncode(const unsigned char *buff, int len, unsigned char *outbuf);
};
//common/myFunc.cpp
int PFunc::code(const unsigned char *buff, const int len, unsigned char *outbuf)
{
char ch = 0;
int nLen = 0;
unsigned char * buf = (unsigned char *)buff;
*outbuf++ = 0XF1;//头字节
nLen+=1;
for (int i = 0; i < len; i++, nLen++)
{
ch = buf[i];
if ((buf[i] | 0x0f) == 0xff && i > 0 && i < (len - 1))
{
*outbuf++ = 0xf0 & buf[i];
*outbuf++ = 0x0f & buf[i];
nLen += 1;
}
else {
*outbuf++ = ch;
}
}
*outbuf++ = 0XFF;//末字节
nLen+=1;
buf = NULL;
return nLen;
}
int PFunc::uncode(const unsigned char *buff, int len, unsigned char *outbuf)
{
char ch = 0;
int nLen = 0;
unsigned char * buf = (unsigned char *)buff;
//头、末尾字节判断
if(len<=2&&0XF1!=buf[0]&&0XFF!=buf[len-1]){
printf("uncode func, start bit or end bit Error!\r\n");
return 0;
}
for (int i = 1; i < (len-1); i++, nLen++)
{
ch = buf[i];
if (buf[i] == 0xf0)
{
#ifdef _DEBUG
if (i > len - 2)
printf("Error!\r\n");
if (buf[i + 1] > 0x0f)
printf("Error!\r\n");
#endif
ch = 0xf0 | buf[++i];
}
*outbuf++ = ch;
}
buf = NULL;
return nLen;
}
那么在服务端和客户端的数据读取及写入线程就可以调用code和uncode来实现数据的编解码。
在/srv_IO/MySocketWD.cpp内除了原来的直接数据写入外,增加新的除了方式,从缓存消息队列读取数据,并将数据进行编码(序列化)后得到的新数据写入socket内(发送)。如下文,netType是数据处理类型(1,直接写入;2,code后写入)
int MySocketWD::Run()
{
if (NULL == myPDataPrt)
{
//
return 0;
}
while(running)
{
try {
unsigned long long _ipInt = 0;
unsigned char buf[512] = { 0 };
int len = this->getBuffer(_ipInt, buf);
if (len > 0)
{
int ret = -1;
switch (netType)
{
case 1:
{
ret = myPDataPrt->Write(_ipInt, (const char*)buf, len);
break;
}
case 2:
{
// printf("send data: %s\r\n",buf);
unsigned char* _buf = new unsigned char[2 * len + 1];
memset(_buf, 0, 2 * len + 1);
len = PFunc::code(buf, len, _buf);//序列化处理
printf("send data: %d\r\n",len);
ret = myPDataPrt->Write(_ipInt, (const char*)_buf, len);//发送
delete[] _buf;
_buf = NULL;
break;
}
default:
{
char warBuf[128] = { 0 };
sprintf(warBuf, "MySocketWD::Run For Unkown NetType(%02X)", netType);
#ifdef WIN32
throw std::exception(warBuf);
#else
throw std::domain_error(warBuf);
#endif
break;
}
}
if (ret <=0)
{
//printf("send data: %d, buf %d\n",len,ret);
}
//else{
// printf("send data: %d, and real send %d\n",len,ret);
//}
}
}
catch (const std::exception& e)
{
//your code
}
catch (...) {
//your code
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
修改/srv_IO/MySocketRD.cpp内的循环函数,相比写入处理要复杂些,如下文,netType是数据处理类型(1,直接写入;2,从socket读取数据后,进行数据分帧处理,将识别出的帧数据写入缓存中)。由于前面提到过可能会一次从某个socket句柄的读取多帧数据或不完整数据,因此在分帧处理时就查找到帧的开始及结束标记,才能算得到一帧完整的数据。如果只找到开始标记,没结束标记,这部分数据就需要移动到缓存区域前面,与后面继续读取的数据拼在一起来处理。
int MySocketRD::Run()
{
if (NULL == myPDataPrt )
{
return 0;
}
std::map<KeyObj_Client, RDClient> bufs;
RDS rdataGx;
while (running)
{
int re = myPDataPrt->Read(bufs);
if (re <= 0)
{
#ifdef _DEBUG
printf_s("Read Data Failed or NULL\n!");
#else
;
#endif
}else {
switch (netType)
{
case 1:
{
try {
std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
while (it != bufs.end())
{
if (it->second.len > 0)
{
RDS rdata(TCP_Data(it->second.Buf, it->second.len), it->first.m_ipStr);
ReadData.add(rdata);
}
it++;
}
bufs.clear();
}
catch (const std::exception& e)
{
//your code
}
catch (...) {
//your code
}
while (ReadData.getFirst(rdataGx))
{
this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
ReadData.removeFirst();
}
break;
}
case 2:
{
try {
std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
while (it != bufs.end())
{
unsigned char * buff = it->second.Buf;
int start_frame = 0;
unsigned char ctype = 0;
for (int i = 0; i < it->second.len; i++)
{
//printf_s("%02X ",buff[i]);
if (buff[i] > 0xf0) {
if (buff[i] == 0xff)
{
if (ctype)
{
ctype = 0;
int re_len = i - start_frame + 1;
// RDS rdata(TCP_Data(buff + start_frame, i - start_frame + 1), it->first.m_ipStr);
unsigned char * pBuf = new unsigned char[re_len];
//
int nLen = PFunc::uncode(buff + start_frame, re_len, pBuf);
RDS rdata(TCP_Data(pBuf, nLen), it->first.m_ipStr);
// printf("rev01:%s\r\n",(char*)pBuf);
printf("rev01:%d\r\n",nLen);
ReadData.add(rdata);
start_frame = i + 1;
delete[] pBuf;
pBuf = NULL;
}
}
else {
ctype = buff[i];
start_frame = i;
}
}
}
buff = NULL;
if (start_frame < it->second.len)
{
RDClient _newrd(it->second.Buf + start_frame, it->second.len - start_frame);
it->second = _newrd;
it++;
}
else {
#ifdef WIN32
it = bufs.erase(it);
#else
std::map<KeyObj_Client, RDClient>::iterator ittemp = it++;
bufs.erase(ittemp);
#endif
}
}
}
catch (const std::exception& e)
{
//your code
}
catch (...) {
//your code
}
while (ReadData.getFirst(rdataGx))
{
this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
ReadData.removeFirst();
}
break;
}
default:
break;
}
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
因此,在通过socket-API 读取到数据后,是调用缓存RDClient结构体的add函数在末尾追加写入的
//common/DataDef.h
int RDClient::add(unsigned char *buf,int nlen)
{
try{
memset(Buf+len,0,RDCSIZE-len);
memcpy(Buf+len,buf,nlen);
len += nlen;
}catch(...)
{
printf("RDClient::add error \r\n");
}
return len;
};
对于客户端来说,也是类似的
client_IO/MySocketWD.cpp-Run函数修改如下:
int MySocketWD::Run()
{
if (NULL == myPDataPrt )
{
return 0;
}
while(running)
{
if (!myPDataPrt->isConnect())
{
myPDataPrt->reSetSocket();//read or write thread do it
if (!myPDataPrt->isConnect())
{
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
}
}
else {
//由读取进程去重新建立链接,写入线程只判定链接状态,进行数据写入
unsigned char buf[512] = { 0 };
int len = this->getBuffer(buf);
if (len <= 0 && (heartBeatWrite+heartBeat_interval)<static_cast<unsigned int>(time(NULL)))
{
len = this->getHeartBeatBuffer(buf);
}
if (len > 0) {
switch (netType)
{
case 1:
{
int ret = myPDataPrt->Write((const char*)buf, len);
if (ret != len) {
//printf("send data: %d, buf %d\n",len,ret);
}
else {
heartBeatWrite = static_cast<unsigned int>(time(NULL));
}
}
break;
case 2:
{
int cacheLen = 2 * len + 1;
unsigned char* _buf = new unsigned char[cacheLen];
memset(_buf, 0, cacheLen);
int nLen = PFunc::code(buf, len, _buf);//序列化处理
int ret = myPDataPrt->Write((const char*)_buf, nLen);
if (ret != nLen) {
//printf("send data: %d, buf %d\n",len,ret);
}
else {
heartBeatWrite = static_cast<unsigned int>(time(NULL));
}
delete[] _buf;
_buf = NULL;
}
break;
default:
break;
}
}
}
#ifdef WIN32
Sleep(1);
#else
usleep(1000);
#endif
}
return 0;
}
client_IO/MySocketRD.cpp-Run函数修改如下:
int MySocketRD::Run()
{
if (NULL == myPDataPrt )
{
return 0;
}
RDClient rdc_data;
TCP_Data rddata;
while (running)
{
if (!myPDataPrt->isConnect())
{
myPDataPrt->reSetSocket();//read or write thread do it
if (!myPDataPrt->isConnect())
{
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
}
}
else
{
//读取帧数据
switch (netType)
{
case 1:
{
//直接读取,不用做分帧处理,ACSII字段
char buf[256] = { 0 };
int len = myPDataPrt->Read(buf, 256);
if (len > 0)
{
TCP_Data rdata((unsigned char*)buf, len);
ReadData.add(rdata);
}
//数据帧解析
if (ReadData.getFirst(rddata))
{
this->AddFrame(rddata.Buf, rddata.len);
ReadData.removeFirst();
}
}
break;
case 2:
{
//数据有特定帧头和结尾,做分帧处理
int ret = myPDataPrt->Read(rdc_data);
if (ret > 0)
{
//printf("read(%d) from pcs_server\n",ret);
unsigned char * buff = rdc_data.Buf;
int frame_start = 0;
unsigned char ctype = 0;
for (int i = 0; i < rdc_data.len; ++i)
{
//printf("%02X ",buff[i]);
if (buff[i] > 0xf0)
{
if (buff[i] == 0xff)
{
if (ctype)
{
ctype = 0;
// TCP_Data rdata(buff + frame_start, i - frame_start + 1);
unsigned char * pBuf = new unsigned char[i - frame_start + 1];
int nLen = PFunc::uncode(buff + frame_start, i - frame_start + 1, pBuf);//反序列化处理
TCP_Data rdata(pBuf, nLen);
ReadData.add(rdata);
frame_start = i + 1;
delete[] pBuf;
pBuf = NULL;
}
}
else
{
ctype = buff[i];
frame_start = i;
}
}
}
buff = NULL;
if (frame_start < rdc_data.len)
{
RDClient _newrd(rdc_data.Buf + frame_start, rdc_data.len - frame_start);
rdc_data = _newrd;
}
else
{
rdc_data.len = 0;
}
}
//数据帧解析
while (ReadData.getFirst(rddata))
{
this->AddFrame(rddata.Buf, rddata.len);
ReadData.removeFirst();
}
}
break;
default:
break;
}
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
5.2 结构化数据传递
在实现项目中,通过TCP数据传递除了发送字符串外,还需要将数据以包的形式直接发送及接收,而包数据有固定的格式,在c/c++领域,我们通常采用struct关键字定义包数据。例如定义一下结构体:
//common/DataDef.h
typedef struct TCP_Data_Model
{
unsigned type;
int len;
float val;
char desc[32];
}TcpSocket,*PTcpSocket;
按前面所述的业务应用时数据发送及接收处理方式,那么发送时,如下,和发送字符串无多少区别:
//main.cpp
MySocket server_test(70001,2);
char buf[]="hello, this is server!";
TcpSocket ts;
ts.len=(int)strlen(buf);
memcpy(ts.desc,buf,ts.len);
ts.type = 1;
ts.val = 10.0;
//server_test.Write((const char*)buf,(int)strlen(buf));
server_test.Write((const char*)&ts,(int)sizeof(TcpSocket));
接收数据时,处理如下,直接将数据进行块拷贝即可:
//MySocketRD.cpp
int MySocketRD::AddFrame(const std::string link, const unsigned char *buf, int len)
{
if(NULL == buf)
return 0;
// printf("rev:%s\r\n",(char*)buf);
TcpSocket ts;
memcpy(&ts,buf,len);
printf("rev:%u,%d,%0.2f,%s\r\n",ts.type,ts.len,ts.val,ts.desc);
return 0;
};
六、项目最终呈现
6.1 增加日志记录模块
涉及到TCP/Socket通信,一般大多数都会被设计成后台服务长期运行,那么日志记录就是用户需求不提及也需要提供功能服务。日志服务对于长期运行中发现异常、bug、调优等都很有帮助,并也可以为用户提供日志运行信息。
通常大多数公司都有自己应有的及长时间实践过的或大或小的日志系统,尤其是对于平台级的系统而言。但是对于项目项目,原型验证,就没必要给出一个系统级的日志系统。
本文给出一个基类为MyThread的单体模式的日志类CLogger,在common目录下,创建Log.h/cpp源文件,实现CLogger类,如下文。在该类中,有一个日志信息缓存队列mylogs(类似前面的数据读取及写入消息队列),用来缓存各业务模块推送的日志。循环函数Run还不间断从日志缓存队列读取日志并写入日志文件中。
//common/Log.h
enum eLogType
{
eHardError = 1,
eSoftError = 2,
eConfigError = 3,
eParameterError = 4,
eReadError = 5,
eWriteError = 6,
eControlMessage = 7,
eResponseMessage = 8,
eTipMessage = 9
};
struct MyLogStruct
{
MyLogStruct():type(0)
{
memset(szBuffer, 0, 1024);
};
int type;
char szBuffer[1024];
};
class CLogger : public MyThread
{
public:
CLogger();
~CLogger();
int Run();
public:
void Log(const eLogType type, const char* lpszFormat, ...);
static CLogger* createInstance( void );
private:
bool getFirstLog(MyLogStruct &it);
void addLog(MyLogStruct it);
private:
static CLogger* m_pLogInstance;
bool running;
//for cache
std::queue<MyLogStruct> mylogs;
PYMutex m_Mutex_Log;
int i_log_over;
};
//common/Log.cpp
int CLogger::Run()
{
MyLogStruct log_;
while (running) {
if (getFirstLog(log_))
{
WriteLog(log_.type, log_.szBuffer);
#ifndef WIN32
printf("Log::[%d]-->%s\n", getpid(), log_.szBuffer);
#else
printf("Log::-->%s\n", log_.szBuffer);
#endif
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
6.2 项目完整目录结构
至此,基本实现了前面提到的四个假设内容(业务需求),并增加了一些额外功能,构建一个实现基本功能点的程序开发。
#
TCP_StructData
bin #编译输出结果
client_test
client_test.exe
server_test
server_test.exe
client_IO #客户端Socket的API功能源码
MySocket.h
MySocket.cpp
MySocketClient.h
MySocketClient.cpp
MySocketRD.h
MySocketRD.cpp
MySocketWD.h
MySocketWD.cpp
client_test #客户端业务功能源码
build_win #客户端windows编辑中间文件输出目录
build_linux #客户端linux编辑中间文件输出目录
main.cpp
CMakeLists.txt #cmake配置文件
common #共同功能模块或数据结构源码
DataDef.h #结构化数据
hashmap.h #结构化数据作为容器Key实现
hashmap.cpp
Log.h #日志模块
Log.cpp
Mutex.h #互斥锁
Mutex.cpp
myThread.h #linux系统下线程类实现
myThread.cpp
queuedata.h
win32Thread.h #windows系统下线程类实现
win32Thread.cpp
srv_IO #服务端Socket的API功能源码
MySocket.h
MySocket.cpp
MySocketPrivate.h
MySocketPrivate.cpp
MySocketRD.h
MySocketRD.cpp
MySocketSrv.h
MySocketSrv.cpp
MySocketWD.h
MySocketWD.cpp
svr_test #服务端业务功能源码
build_win #客户端windows编辑中间文件输出目录
build_linux #客户端linux编辑中间文件输出目录
main.cpp #
CMakeLists.txt #cmake配置文件
其中/srv_test/CMakeLists.txt 如下:
# CMake 最低版本号要求
cmake_minimum_required (VERSION 3.2)
# 项目信息
project (server_test)
#
if(WIN32)
message(STATUS "windows compiling...")
add_definitions(-D_PLATFORM_IS_WINDOWS_)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /MT")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /MTd")
set(WIN_OS true)
else(WIN32)
message(STATUS "linux compiling...")
add_definitions( -D_PLATFORM_IS_LINUX_)
set(UNIX_OS true)
set(_DEBUG true)
endif(WIN32)
#
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/../bin)
# 指定源文件的目录,并将名称保存到变量
SET(source_h
#
#${PROJECT_SOURCE_DIR}/../common/pfunc_print.h
${PROJECT_SOURCE_DIR}/../common/queuedata.h
#${PROJECT_SOURCE_DIR}/../common/conf.h
${PROJECT_SOURCE_DIR}/../common/Mutex.h
${PROJECT_SOURCE_DIR}/../common/hashmap.h
${PROJECT_SOURCE_DIR}/../common/myFunc.h
${PROJECT_SOURCE_DIR}/../common/Log.h
${PROJECT_SOURCE_DIR}/../srv_IO/MySocket.h
${PROJECT_SOURCE_DIR}/../srv_IO/MySocketRD.h
${PROJECT_SOURCE_DIR}/../srv_IO/MySocketWD.h
${PROJECT_SOURCE_DIR}/../srv_IO/MySocketPrivate.h
${PROJECT_SOURCE_DIR}/../srv_IO/MySocketSrv.h
)
SET(source_cpp
#
${PROJECT_SOURCE_DIR}/../common/Mutex.cpp
${PROJECT_SOURCE_DIR}/../common/hashmap.cpp
${PROJECT_SOURCE_DIR}/../common/myFunc.cpp
${PROJECT_SOURCE_DIR}/../common/Log.cpp
${PROJECT_SOURCE_DIR}/../srv_IO/MySocket.cpp
${PROJECT_SOURCE_DIR}/../srv_IO/MySocketRD.cpp
${PROJECT_SOURCE_DIR}/../srv_IO/MySocketWD.cpp
${PROJECT_SOURCE_DIR}/../srv_IO/MySocketPrivate.cpp
${PROJECT_SOURCE_DIR}/../srv_IO/MySocketSrv.cpp
${PROJECT_SOURCE_DIR}/main.cpp
)
#头文件目录
include_directories(
${PROJECT_SOURCE_DIR}
${PROJECT_SOURCE_DIR}/../common
${PROJECT_SOURCE_DIR}/../srv_IO
)
if (${UNIX_OS})
SET(source_h_linux
${PROJECT_SOURCE_DIR}/../common/myThread.h
)
SET(source_cpp_linux
${PROJECT_SOURCE_DIR}/../common/myThread.cpp
)
add_definitions(
"-W"
"-fPIC"
"-Wall"
# "-Wall -g"
"-Werror"
"-Wshadow"
"-Wformat"
"-Wpointer-arith"
"-D_REENTRANT"
"-D_USE_FAST_MACRO"
"-Wno-long-long"
"-Wuninitialized"
"-D_POSIX_PTHREAD_SEMANTICS"
"-DACL_PREPARE_COMPILE"
"-Wno-unused-parameter"
"-fexceptions"
)
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
link_directories()
# 指定生成目标
add_executable(server_test ${source_h} ${source_cpp} ${source_h_linux} ${source_cpp_linux})
#link
target_link_libraries(server_test
-lpthread -pthread -lz -lrt -ldl
)
endif(${UNIX_OS})
if (${WIN_OS})
SET(source_h_win
${PROJECT_SOURCE_DIR}/../common/win32Thread.h
)
SET(source_cpp_win
${PROJECT_SOURCE_DIR}/../common/win32Thread.cpp
)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4819")
add_definitions(
"-D_CRT_SECURE_NO_WARNINGS"
"-D_WINSOCK_DEPRECATED_NO_WARNINGS"
"-DNO_WARN_MBCS_MFC_DEPRECATION"
"-DWIN32_LEAN_AND_MEAN"
)
link_directories()
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG ${PROJECT_SOURCE_DIR}/../bin)
# 指定生成目标
add_executable(server_testd ${source_h} ${source_cpp} ${source_h_win} ${source_cpp_win})
#target_link_libraries(server_testd *.lib)
else(CMAKE_BUILD_TYPE)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_RELEASE ${PROJECT_SOURCE_DIR}/../bin)
# 指定生成目标
add_executable(server_test ${source_h} ${source_cpp} ${source_h_win} ${source_cpp_win})
#target_link_libraries(server_test *.lib)
endif (CMAKE_BUILD_TYPE)
endif(${WIN_OS})
/client_test/CMakeLists.txt 如下:
# CMake 最低版本号要求
cmake_minimum_required (VERSION 2.8)
# 项目信息
project (client_test)
#
if(WIN32)
message(STATUS "windows compiling...")
add_definitions(-D_PLATFORM_IS_WINDOWS_)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /MT")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /MTd")
set(WIN_OS true)
else(WIN32)
message(STATUS "linux compiling...")
add_definitions( -D_PLATFORM_IS_LINUX_)
add_definitions("-Wno-invalid-source-encoding")
set(UNIX_OS true)
set(_DEBUG true)
endif(WIN32)
#
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/../bin)
# 指定源文件的目录,并将名称保存到变量
SET(source_h
#
${PROJECT_SOURCE_DIR}/../common/DataDef.h
${PROJECT_SOURCE_DIR}/../common/queuedata.h
${PROJECT_SOURCE_DIR}/../common/Mutex.h
${PROJECT_SOURCE_DIR}/../common/myFunc.h
${PROJECT_SOURCE_DIR}/../common/Log.h
${PROJECT_SOURCE_DIR}/../client_IO/MySocket.h
${PROJECT_SOURCE_DIR}/../client_IO/MySocketClient.h
${PROJECT_SOURCE_DIR}/../client_IO/MySocketRD.h
${PROJECT_SOURCE_DIR}/../client_IO/MySocketWD.h
)
SET(source_cpp
#
${PROJECT_SOURCE_DIR}/../common/Mutex.cpp
${PROJECT_SOURCE_DIR}/../common/myFunc.cpp
${PROJECT_SOURCE_DIR}/../common/Log.cpp
${PROJECT_SOURCE_DIR}/../client_IO/MySocket.cpp
${PROJECT_SOURCE_DIR}/../client_IO/MySocketClient.cpp
${PROJECT_SOURCE_DIR}/../client_IO/MySocketRD.cpp
${PROJECT_SOURCE_DIR}/../client_IO/MySocketWD.cpp
${PROJECT_SOURCE_DIR}/main.cpp
)
#头文件目录
include_directories(
${PROJECT_SOURCE_DIR}
${PROJECT_SOURCE_DIR}/../common
${PROJECT_SOURCE_DIR}/../client_IO
)
if (${UNIX_OS})
SET(source_h_linux
${PROJECT_SOURCE_DIR}/../common/myThread.h
)
SET(source_cpp_linux
${PROJECT_SOURCE_DIR}/../common/myThread.cpp
)
add_definitions(
"-W"
"-fPIC"
"-Wall"
# "-Wall -g"
"-Werror"
"-Wshadow"
"-Wformat"
"-Wpointer-arith"
"-D_REENTRANT"
"-D_USE_FAST_MACRO"
"-Wno-long-long"
"-Wuninitialized"
"-D_POSIX_PTHREAD_SEMANTICS"
"-DACL_PREPARE_COMPILE"
"-Wno-unused-parameter"
"-fexceptions"
)
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
link_directories()
# 指定生成目标
add_executable(client_test ${source_h} ${source_cpp} ${source_h_linux} ${source_cpp_linux})
#link
target_link_libraries(client_test
-lpthread -pthread -lz -lrt -ldl
)
endif(${UNIX_OS})
if (${WIN_OS})
SET(source_h_win
${PROJECT_SOURCE_DIR}/../common/win32Thread.h
)
6.3 程序编译如下
server:
cd srv_test && mkdir build_win && cd build_win
cmake -G "Visual Studio 14 2015 Win64" -DCMAKE_BUILD_TYPE=Release ..
#vs 命令窗口,或者配置了msbuild路径的命令窗口
msbuild server_test.sln /p:Configuration="Release" /p:Platform="x64"
linux:
cd srv_test && mkdir build_linux && cd build_linux
cmake ..
make
编译:
client
win:
cd client_test && mkdir build_win && cd build_win
cmake -G "Visual Studio 14 2015 Win64" -DCMAKE_BUILD_TYPE=Release .. -Wno-dev
#vs 命令窗口
msbuild client_test.sln /p:Configuration="Release" /p:Platform="x64"
linux:
cd client_test && mkdir build_linux && cd build_linux
cmake ..
make
编译:
6.4 测试
netType=2,win:server_test.exe client_test.exe 127.0.0.1 70001
netType=2,Linux:./server_test ./client_test 127.0.0.1 70001
netType=1,在main.cpp内调整输入类型,注释掉模块化传递部分,以及在MySocketRD.cpp的AddFrame函数注释掉模块传递部分,win运行指令:server_test.exe client_test.exe 127.0.0.1 70001,netType=1时,支持通用客户端连接,例如借助串口助手连接测试。
6.4 结语
基本功能达成,但是整入前面所述那样,项目还有很多需要调整的空间:
1)客户端及服务端的读取、写入处理类以及集成类如此相似,可提炼优化
2)DataDef.h内的结构体如此相似,也可以提炼优化
3)win、linux源码还有很多可以进一步一致性优化的空间
4)业务逻辑可以进行调整,例如TCP/Socket服务接口提供更多功能
5)性能优化,读取间隔、读取长度、处理数据的先后次序等
6)...
如何优化呢,且看本专栏的下一篇博文:
c/c++开发,无可避免的代码重构实战(基于前文TCP/Socket通信开发案例)_py_free的博客-CSDN博客
温馨提示:本文采用socket通信设计在管理客户端上采用缓存容器管理,是支持少量客户端链接的情况,若有大量客户端链接情况,就建议采用其他方式,例如采用epoll:
利用epoll创建自己的后台服务,实现对各个客户端网络通信(含示例代码)_epollpri 带外数据_py_free的博客-CSDN博客
七、源码附录
7.1 commom文件夹
DataDef.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _DATA_DEF_H_
#define _DATA_DEF_H_
#include <string>
#ifdef __linux__
#include <string.h>
#include <stdio.h>
#endif
struct NetArg
{
NetArg()
: ipStr("127.0.0.1"),port(70001),type(1)
{
};
std::string ipStr; //
int port; //
int type; //Protocol
};
#define RDCSIZE 1024
struct RDClient
{
RDClient()
: len(0)
{
memset(Buf,0,RDCSIZE);
};
RDClient(unsigned char *buf,int nlen)
{
memset(Buf,0,RDCSIZE);
memcpy(Buf,buf,nlen);
len = nlen;
};
~RDClient()
{
};
RDClient& operator=(const RDClient &rval)
{
if (this!=&rval)
{
memset(Buf,0,RDCSIZE);
memcpy(Buf,rval.Buf,rval.len);
len = rval.len;
}
return *this;
};
int add(unsigned char *buf,int nlen)
{
try{
memset(Buf+len,0,RDCSIZE-len);
memcpy(Buf+len,buf,nlen);
len += nlen;
}catch(...)
{
printf("RDClient::add error \r\n");
}
return len;
};
unsigned char Buf[RDCSIZE];
int len;
};
typedef struct TCP_Data_Model
{
unsigned type;
int len;
float val;
char desc[32];
}TcpSocket,*PTcpSocket;
struct TCP_Data
{
TCP_Data() : len(0)
{
memset(Buf, 0, 512);
};
TCP_Data(unsigned char *buf, int nlen)
{
memset(Buf, 0, 512);
memcpy(Buf, buf, nlen);
len = nlen;
};
TCP_Data& operator=(const TCP_Data &rval)
{
if (this != &rval) {
memset(Buf, 0, 512);
if (rval.len < 512) {
memcpy(Buf, rval.Buf, rval.len);
len = rval.len;
}
else {
memcpy(Buf, rval.Buf, 512);
len = 512;
}
}
return *this;
};
unsigned char Buf[512];
int len;
};
struct RDS
{
RDS() : data(),flag("")
{
};
RDS(TCP_Data _data,std::string _f = "")
: data(_data),flag(_f)
{
};
RDS& operator=(const RDS &rval)
{
if (this == &rval) {
return *this;
}
data = rval.data;
flag = rval.flag;
return *this;
};
TCP_Data data;
std::string flag;
};
/
struct WDS
{
WDS() : ipInt(0), data()
{
};
WDS(unsigned long long _ipInt,TCP_Data _data)
: ipInt(_ipInt), data(_data)
{
};
WDS& operator=(const WDS &rval)
{
if (this == &rval) {
return *this;
}
ipInt = rval.ipInt;
data = rval.data;
return *this;
};
unsigned long long ipInt;
TCP_Data data;
};
#endif
hashmap.h
#pragma once
#ifndef HASH_MAP_H
#define HASH_MAP_H
/*
*自定义map容器的Key
*/
#include <map>
#include <iostream>
//
class KeyObj_Client
{
public:
KeyObj_Client(std::string _ipStr, int _port);
//
static long cmp_Key(const KeyObj_Client &obj1, const KeyObj_Client &obj2);
std::string m_ipStr;
int m_port;
int linkFlag;
long m_ip; //网络地址整型表述
private:
};
inline bool operator==(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) == 0; }
inline bool operator!=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) != 0; }
inline bool operator>=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) >= 0; }
inline bool operator<=(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) <= 0; }
inline bool operator>(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) > 0; }
inline bool operator<(const KeyObj_Client& obj1, const KeyObj_Client& obj2) { return KeyObj_Client::cmp_Key(obj1, obj2) < 0; }
#endif //HASH_MAP_H
hashmap.cpp
#include "hashmap.h"
#ifdef WIN32
#include <Winsock2.h>
#else
#include <netinet/in.h>
#include <arpa/inet.h>
#endif
#include "myFunc.h"
#include "Log.h"
KeyObj_Client::KeyObj_Client(std::string _ipStr, int _port)
: m_ipStr(_ipStr), m_port(_port), linkFlag(0)
{
m_ip = PFunc::ipToInt(_ipStr);
};
//
long KeyObj_Client::cmp_Key(const KeyObj_Client &obj1, const KeyObj_Client &obj2)
{
long diff = obj1.m_ip - obj2.m_ip;
if (diff != 0) return diff;
diff = obj1.m_port - obj2.m_port;
if (diff != 0) return diff;
return 0;
};
Log.h
#ifndef CHANNELLOG_H
#define CHANNELLOG_H
#include <stdio.h>
#include <stdarg.h>
#include <string>
//#ifndef LOGUDPPORT
//#define LOGUDPPORT 989
//#endif
//#define MAXDATASIZE 10240
//extern bool m_bDebug;
#ifdef WIN32
#include "win32Thread.h"
#endif
#ifdef linux
#include "myThread.h"
#include <string.h>
#endif
#include <queue>
#include "Mutex.h"
#ifndef WIN32
#include <string>
#include <sstream>
namespace std
{
template < typename T > std::string to_string(const T & n)
{
std::ostringstream stm;
stm << n;
return stm.str();
}
}
int GetPrivateProfileString(const char *AppName, const char *key, const char *defaultvalue, char *lpReturnedString, const int nSize, const char *lpFileName);
int GetPrivateProfileInt(const char *AppName,const char *key,const int defaultvalue,const char *lpFileName);
#endif
enum eLogType
{
eHardError = 1,
eSoftError = 2,
eConfigError = 3,
eParameterError = 4,
eReadError = 5,
eWriteError = 6,
eControlMessage = 7,
eResponseMessage = 8,
eTipMessage = 9
};
struct MyLogStruct
{
MyLogStruct():type(0)
{
memset(szBuffer, 0, 1024);
};
int type;
char szBuffer[1024];
};
class CLogger : public MyThread
{
public:
CLogger();
~CLogger();
int Run();
public:
void Log(const eLogType type, const char* lpszFormat, ...);
static CLogger* createInstance( void );
private:
bool getFirstLog(MyLogStruct &it);
void addLog(MyLogStruct it);
private:
static CLogger* m_pLogInstance;
bool running;
//for cache
std::queue<MyLogStruct> mylogs;
PYMutex m_Mutex_Log;
int i_log_over;
};
#endif
Log.cpp
#include "Log.h"
#include <time.h>
#include <sys/timeb.h>
#ifdef __linux__
#ifndef sprintf_s
#define sprintf_s sprintf
#endif
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#endif
CLogger* CLogger::m_pLogInstance = NULL;
//extern void WriteLog( const int iMsgType, const std::string strMsg);
extern char LOG_FILE_NAME[128]; //LOGFILE Name, should be defined
extern std::string logdir;
void WriteLog1( const int iMsgType, const char * strMsg)
{
try {
time_t tt;
struct timeb tm0;
struct tm tm1;
char buf[512];
FILE * fpLog = NULL;
//time(&tt);
ftime(&tm0);
tt = tm0.time;
#ifdef WIN32
localtime_s(&tm1, &tt);
#else
localtime_r(&tt, &tm1);
#endif
sprintf_s(buf, "%04d-%02d-%02d %02d:%02d:%02d.%03d "
, tm1.tm_year + 1900, tm1.tm_mon + 1, tm1.tm_mday
, tm1.tm_hour, tm1.tm_min, tm1.tm_sec, tm0.millitm);
std::string strTime = buf;
buf[10] = '\0';
//file name
std::string strPath = logdir;
#ifdef WIN32
strPath += "\\";
#else
strPath += "/";
#endif
strPath += buf;
strPath += "_";
//
switch (iMsgType)
{
case eHardError:
strTime += "[HardErrorIMsg] ";
break;
case eSoftError:
strTime += "[SoftErrorIMsg] ";
break;
case eConfigError:
strTime += "[ConfErrorIMsg] ";
break;
case eParameterError:
strTime += "[ParamErrorMsg] ";
break;
case eReadError:
strTime += "[ReadErrorIMsg] ";
break;
case eWriteError:
strTime += "[WriteErrorMsg] ";
break;
case eControlMessage:
strTime += "[ControlExeMsg] ";
//strPath += "chain_";
break;
case eResponseMessage:
strTime += "[ResponseUpMsg] ";
//strPath += "chain_";
break;
case eTipMessage:
strTime += "[NoticeTipIMsg] ";
break;
default:
strTime += "[PromptUnNoMsg] ";
break;
}
strPath += LOG_FILE_NAME;
//open
#ifdef WIN32
fopen_s(&fpLog, strPath.c_str(), "a+");
#else
fpLog = fopen(strPath.c_str(), "a+");
#endif
if (NULL != fpLog)
{
fseek(fpLog, 0, SEEK_END);
fwrite(strTime.c_str(), strTime.length(), 1, fpLog);
fwrite(strMsg, strlen(strMsg), 1, fpLog);
fwrite("\n", 1, 1, fpLog);
fclose(fpLog);
}
}
catch (...) {
printf("write log[%d]{%s}error\n", iMsgType, strMsg);
}
}
#ifdef WIN32
#include <windows.h>
#include <atlcomtime.h>
extern char SVCNAME[128];
void WriteLog( const int iMsgType, const char * strMsg)
{
try {
if (iMsgType < int(eConfigError))
{
HANDLE hEventSource;
LPCTSTR lpszStrings[2];
hEventSource = RegisterEventSource(NULL, SVCNAME);
if (NULL != hEventSource)
{
lpszStrings[0] = SVCNAME;
lpszStrings[1] = strMsg;
ReportEvent(hEventSource, // event log handle
EVENTLOG_ERROR_TYPE, // event type
0, // event category
0, // event identifier
NULL, // no security identifier
2, // size of lpszStrings array
0, // no binary data
lpszStrings, // array of strings
NULL); // no binary data
DeregisterEventSource(hEventSource);
}
}
}
catch (...) {
printf("WriteLog[%d]{%s}for Evnet error\n", iMsgType, strMsg);
}
WriteLog1(iMsgType, strMsg);
}
#else
void WriteLog( const int iMsgType, const char * strMsg)
{
WriteLog1(iMsgType, strMsg);
}
#endif
CLogger::CLogger()
: running(true)
, i_log_over(0)
{
char buf[256] = {0};
sprintf_s(buf,"mkdir %s",logdir.c_str());
system(buf);
this->start();
};
CLogger::~CLogger()
{
running = false;
};
CLogger* CLogger::createInstance( void )
{
if (m_pLogInstance == NULL)
{
m_pLogInstance = new CLogger();
return m_pLogInstance;
}
else
return m_pLogInstance;
};
int CLogger::Run()
{
MyLogStruct log_;
while (running) {
if (getFirstLog(log_))
{
WriteLog(log_.type, log_.szBuffer);
#ifndef WIN32
printf("Log::[%d]-->%s\n", getpid(), log_.szBuffer);
#else
printf("Log::-->%s\n", log_.szBuffer);
#endif
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
void CLogger::Log(const eLogType type, const char* lpszFormat, ...)
{
va_list args;
//char szBuffer[2048] = {0};
MyLogStruct log_;
log_.type = static_cast<int>(type);
va_start(args, lpszFormat);
#ifdef WIN32
vsnprintf_s(log_.szBuffer,sizeof(log_.szBuffer), lpszFormat, args);
#else
vsnprintf(log_.szBuffer, sizeof(log_.szBuffer), lpszFormat, args);
#endif
va_end(args);
addLog(log_);
}
bool CLogger::getFirstLog(MyLogStruct &it) {
bool ret = false;
m_Mutex_Log.Lock();
if (!mylogs.empty()) {
it = mylogs.front();
mylogs.pop();
ret = true;
}
m_Mutex_Log.Unlock();
return ret;
}
void CLogger::addLog(MyLogStruct it) {
m_Mutex_Log.Lock();
if (mylogs.size() > 100) {
i_log_over++;
mylogs.pop();
}
mylogs.push(it);
m_Mutex_Log.Unlock();
if (i_log_over >= 100) {//每溢出100次,报告一次
MyLogStruct log_;
log_.type = static_cast<int>(eTipMessage);
sprintf(log_.szBuffer,"the size of mylogs queue is up to limmit size[100],[%s %s %d]."
, __FILE__, __FUNCTION__, __LINE__);
m_Mutex_Log.Lock();
mylogs.push(log_);
m_Mutex_Log.Unlock();
i_log_over = 0;
}
}
#ifndef WIN32
#include <sys/vfs.h>
#include <mntent.h>
//int GetPrivateProfileInt(const char * lpAppName, const char *lpKeyName, int nDefault, const char * lpFileName)
//{
// return nDefault;
//}
//int GetPrivateProfileString(const char * lpAppName, const char * lpKeyName, const char * szDefault, char * szOut, const int nLen ,const char * lpFileName)
//{
// strncpy(szOut, szDefault, nLen);
// return 0;
//}
int GetPrivateProfileString(const char *AppName, const char *key, const char *defaultvalue, char *lpReturnedString, const int nSize, const char *lpFileName)
{
bool bFindAppName = false;
char tem[1000];
char *ptr;
FILE *fp;
char name[150];
snprintf(name, sizeof(name), "[%s]", AppName);
strncpy(lpReturnedString,defaultvalue, nSize);
if( (lpFileName == NULL) || ((fp=fopen(lpFileName,"rt"))==NULL) )// || fgets(tem,len,fp) == NULL)
{
return strlen(lpReturnedString);
}
while(fgets(tem,sizeof(tem),fp))
{
if(tem[0] == '[')
{
bFindAppName = false;
if(strstr(tem,name)==tem)
bFindAppName = true;
}
else
{
if(bFindAppName == true)
{
unsigned int n =strcspn(tem,"=");
if(static_cast<unsigned int>(strlen(key)) == n
&& static_cast<int>(strncasecmp(tem,key,n)) == 0)
{
strncpy(lpReturnedString, tem+n+1,nSize);
if( (ptr = strchr(lpReturnedString, '\n')) != NULL)
*ptr = '\0';
if( (ptr = strchr(lpReturnedString, '\r')) != NULL)
*ptr = '\0';
break;
}
}
}
}
fclose(fp);
return strlen(lpReturnedString);
}
int GetPrivateProfileInt(const char *AppName,const char *key,const int defaultvalue,const char *lpFileName)
{
char str[20];
int nRet = defaultvalue;
if(GetPrivateProfileString(AppName, key, "", str, sizeof(str), lpFileName) > 0)
{
nRet = atoi(str);
}
return nRet;
}
#endif
Mutex.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef PYMUTEX_H
#define PYMUTEX_H
#ifdef WIN32
//#include <windows.h>
#else
#include <pthread.h>
#endif
typedef void *HANDLE;
class IMutex
{
public:
virtual ~IMutex() {}
virtual void Lock() const = 0;
virtual bool TryLock() const = 0;
virtual void Unlock() const = 0;
};
class PYMutex : public IMutex
{
public:
PYMutex();
~PYMutex();
virtual void Lock() const;
virtual bool TryLock() const;
virtual void Unlock() const;
private:
#ifdef _WIN32
HANDLE m_mutex;
#else
mutable pthread_mutex_t m_mutex;
#endif
};
#endif //PYMUTEX_H
Mutex.cpp
#include "Mutex.h"
#ifdef WIN32
#include <windows.h>
#endif
//#include <iostream>
#include <stdio.h>
PYMutex::PYMutex()
{
#ifdef _WIN32
m_mutex = ::CreateMutex(NULL, FALSE, NULL);
#else
pthread_mutex_init(&m_mutex, NULL);
#endif
}
PYMutex::~PYMutex()
{
#ifdef _WIN32
::CloseHandle(m_mutex);
#else
pthread_mutex_destroy(&m_mutex);
#endif
}
void PYMutex::Lock() const
{
#ifdef _WIN32
//DWORD d = WaitForSingleObject(m_mutex, INFINITE);
WaitForSingleObject(m_mutex, INFINITE);
/// \todo check 'd' for result
#else
pthread_mutex_lock(&m_mutex);
#endif
}
bool PYMutex::TryLock() const
{
#ifdef _WIN32
DWORD dwWaitResult = WaitForSingleObject(m_mutex, 0);
if (dwWaitResult != WAIT_OBJECT_0 && dwWaitResult != WAIT_TIMEOUT) {
printf("thread WARNING: bad result from try-locking mutex\n");
}
return (dwWaitResult == WAIT_OBJECT_0) ? true : false;
#else
return (0==pthread_mutex_trylock(&m_mutex))?true:false;
#endif
};
void PYMutex::Unlock() const
{
#ifdef _WIN32
::ReleaseMutex(m_mutex);
#else
pthread_mutex_unlock(&m_mutex);
#endif
}
myFunc.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MYFUNC_H_
#define _MYFUNC_H_
/*
public function
*/
#include <string>
#include <vector>
namespace PFunc
{
//字符串分割
bool string_divide(std::vector<std::string> &_strlist, const std::string src, const std::string div);
//hex to ascii
int string2bytes(const char* pSrc, unsigned char* pDst, int nSrcLength);
//ascii to hex
int bytes2string(const unsigned char* pSrc, char* pDst, int nSrcLength);
//frame code
int code(const unsigned char *buff, const int len, unsigned char *outbuf);
//frame uncode
int uncode(const unsigned char *buff, int len, unsigned char *outbuf);
//
bool ipCheck(std::string ip_str);
long ipToInt(std::string ip_str);
std::string intToIp(long ip_int);
//crc
unsigned int crc16(unsigned char *ptr, unsigned int len);
//年-月-日 时:分:秒
std::string getCurrentTime();
};
#endif
myFunc.cpp
#include "myFunc.h"
#include <stdlib.h>
#ifdef WIN32
#include <Winsock2.h>
#else
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#endif
#include <time.h>
#include "Log.h"
bool PFunc::string_divide(std::vector<std::string> &_strlist, const std::string src, const std::string div)
{
std::string _src = src;
std::string::size_type _pos = _src.find(div);
while (std::string::npos != _pos)
{
std::string _buf = "";
_buf = _src.substr(0, _pos);
_strlist.push_back(_buf);
_src = _src.erase(0, _pos + div.size());
_pos = _src.find(div.c_str());
}
if (!_src.empty())
{
_strlist.push_back(_src);
}
return true;
};
int PFunc::string2bytes(const char* pSrc, unsigned char* pDst, int nSrcLength)
{
for (int i = 0; i < nSrcLength; i += 2)
{
if (*pSrc >= '0' && *pSrc <= '9')
*pDst = (*pSrc - '0') << 4;
else
*pDst = (*pSrc - 'A' + 10) << 4;
pSrc++;
// 输出低4位
if (*pSrc >= '0' && *pSrc <= '9')
*pDst |= *pSrc - '0';
else
*pDst |= *pSrc - 'A' + 10;
pSrc++;
pDst++;
}
return (nSrcLength / 2);
};
int PFunc::bytes2string(const unsigned char* pSrc, char* pDst, int nSrcLength)
{
const char tab[] = "0123456789ABCDEF"; // 0x0-0xf的字符查找表
for (int i = 0; i < nSrcLength; i++)
{
*pDst++ = tab[*pSrc >> 4];
*pDst++ = tab[*pSrc & 0x0f];
pSrc++;
}
*pDst = '\0';
return nSrcLength * 2;
};
int PFunc::code(const unsigned char *buff, const int len, unsigned char *outbuf)
{
char ch = 0;
int nLen = 0;
unsigned char * buf = (unsigned char *)buff;
*outbuf++ = 0XF1;//头字节
nLen+=1;
for (int i = 0; i < len; i++, nLen++)
{
ch = buf[i];
if ((buf[i] | 0x0f) == 0xff && i > 0 && i < (len - 1))
{
*outbuf++ = 0xf0 & buf[i];
*outbuf++ = 0x0f & buf[i];
nLen += 1;
}
else {
*outbuf++ = ch;
}
}
*outbuf++ = 0XFF;//末字节
nLen+=1;
buf = NULL;
return nLen;
}
int PFunc::uncode(const unsigned char *buff, int len, unsigned char *outbuf)
{
char ch = 0;
int nLen = 0;
unsigned char * buf = (unsigned char *)buff;
//头、末尾字节判断
if(len<=2&&0XF1!=buf[0]&&0XFF!=buf[len-1]){
printf("uncode func, start bit or end bit Error!\r\n");
return 0;
}
for (int i = 1; i < (len-1); i++, nLen++)
{
ch = buf[i];
if (buf[i] == 0xf0)
{
#ifdef _DEBUG
if (i > len - 2)
printf("Error!\r\n");
if (buf[i + 1] > 0x0f)
printf("Error!\r\n");
#endif
ch = 0xf0 | buf[++i];
}
*outbuf++ = ch;
}
buf = NULL;
return nLen;
}
bool PFunc::ipCheck(std::string ip_str)
{
if (INADDR_NONE != inet_addr(ip_str.c_str()))
{
return true;
}
return false;
};
long PFunc::ipToInt(std::string ip_str)
{
if (INADDR_NONE != inet_addr(ip_str.c_str()))
{
return ntohl(inet_addr(ip_str.c_str()));
}
else {
CLogger::createInstance()->Log(eConfigError
, "ip format [%s] error: %s %s %d,please check the file format and code!"
, ip_str.c_str(), __FILE__, __FUNCTION__, __LINE__);
return 0;
}
};
std::string PFunc::intToIp(long ip_int)
{
char ip[64] = { 0 };
#ifdef WIN32
strcpy_s(ip, inet_ntoa(*(in_addr*)&ip_int));
#else
strcpy(ip, inet_ntoa(*(in_addr*)&ip_int));
#endif
return std::string(ip);
};
//CRC校验
unsigned int PFunc::crc16(unsigned char *ptr, unsigned int len)
{
unsigned int wcrc = 0XFFFF;//预置16位crc寄存器,初值全部为1
unsigned char temp;//定义中间变量
unsigned int i = 0, j = 0;//定义计数
for (i = 0; i < len; i++)//循环计算每个数据
{
temp = *ptr & 0X00FF;//将八位数据与crc寄存器亦或
ptr++;//指针地址增加,指向下个数据
wcrc ^= temp;//将数据存入crc寄存器
for (j = 0; j < 8; j++)//循环计算数据的
{
if (wcrc & 0X0001)//判断右移出的是不是1,如果是1则与多项式进行异或。
{
wcrc >>= 1;//先将数据右移一位
wcrc ^= 0XA001;//与上面的多项式进行异或
}
else//如果不是1,则直接移出
{
wcrc >>= 1;//直接移出
}
}
}
temp = wcrc;//crc的值
return wcrc;
};
std::string PFunc::getCurrentTime()
{
time_t _t = time(NULL);
struct tm _tt;
#ifdef WIN32
localtime_s(&_tt, &_t);
#else
localtime_r(&_t, &_tt);
#endif
_tt.tm_year += 1900;
_tt.tm_mon += 1;
char buf[32] = { 0 };
sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d"
, _tt.tm_year, _tt.tm_mon, _tt.tm_mday, _tt.tm_hour, _tt.tm_min, _tt.tm_sec);
return std::string(buf);
}
myThread.h
#ifndef _MYTHREAD_H
#define _MYTHREAD_H
#include <pthread.h>
#include <unistd.h>
class MyThread
{
private:
//current thread ID
pthread_t tid;
//thread status
int threadStatus;
//get manner pointer of execution
static void* run0(void* pVoid);
//manner of execution inside
void* run1();
public:
//threadStatus-new create
static const int THREAD_STATUS_NEW = 0;
//threadStatus-running
static const int THREAD_STATUS_RUNNING = 1;
//threadStatus-end
static const int THREAD_STATUS_EXIT = -1;
// constructed function
MyThread();
~MyThread();
//the entity for thread running
virtual int Run()=0;
//start thread
bool start();
//gte thread ID
pthread_t getThreadID();
//get thread status
int getState();
//wait for thread end
void join();
//wait for thread end in limit time
void join(unsigned long millisTime);
};
#endif /* _MYTHREAD_H */
myThread.cpp
#include "myThread.h"
#include <stdio.h>
void* MyThread::run0(void* pVoid)
{
MyThread* p = (MyThread*) pVoid;
p->run1();
return p;
}
void* MyThread::run1()
{
threadStatus = THREAD_STATUS_RUNNING;
tid = pthread_self();
Run();
threadStatus = THREAD_STATUS_EXIT;
tid = 0;
pthread_exit(NULL);
}
MyThread::MyThread()
{
tid = 0;
threadStatus = THREAD_STATUS_NEW;
}
MyThread::~MyThread()
{
join(10);
}
int MyThread::Run()
{
while(true){
printf("thread is running!\n");
sleep(100);
}
return 0;
}
bool MyThread::start()
{
return pthread_create(&tid, NULL, run0, this) == 0;
}
pthread_t MyThread::getThreadID()
{
return tid;
}
int MyThread::getState()
{
return threadStatus;
}
void MyThread::join()
{
if (tid > 0)
{
pthread_join(tid, NULL);
}
}
void MyThread::join(unsigned long millisTime)
{
if (tid == 0)
{
return;
}
if (millisTime == 0)
{
join();
}else
{
unsigned long k = 0;
while (threadStatus != THREAD_STATUS_EXIT && k <= millisTime)
{
usleep(100);
k++;
}
}
}
queuedata.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _QUEUE_DATA_H_
#define _QUEUE_DATA_H_
#include <deque>
#include <stdio.h>
#include <string.h>
#include "Mutex.h"
#include "Log.h"
template <class T>
class QueueData
{
public:
QueueData(std::string desc = "thread_queue");
~QueueData();
//
/**
* 获取队列大小
* @return {int } 队列大小
*/
int size();
/**
* 判定队列是否为空
* @return {bool } 是否为空队列
*/
bool isEmpty();
/**
* 获取队列头元素
* @param it {T&} 头元素
* @return {bool } 是否成功
*/
bool getFirst(T &it);
/**
* 删除元素
* @return {bool } 是否成功
*/
bool removeFirst();
/**
* 获取队列头元素,并从队列终删除
* @param it {T&} 头元素
* @return {bool } 是否成功
*/
bool pop(T &it);
/**
* 从队列头开始逐步获取多个元素,并剔除
* @param its {queue<T>&} 获取到的元素集
* @param sizel {int} 一次获取多少个
* @return {bool } 至少获取一个元素以上则成功
*/
bool getList(std::queue<T> &its,unsigned int sizel=5);
/**
* 从队列尾部添加元素
* @param it {T} 被添加元素
* @return {void } 无返回
*/
void add(T it);
/**
* 从队列头部添加元素
* @param it {T} 被添加元素
* @return {void } 无返回
*/
void add_front(T it);
/**
* 清空元素
* @return {void }
*/
void clear();
private:
void init();
QueueData& operator=(const QueueData&) {return this;};
protected:
std::string queue_desc;
private:
/点集转发
//协议解析结果缓存
std::deque<T> datacache_queue; //队列容器
PYMutex m_Mutex; //线程锁,或者如果更彻底采用acl库,采用acl::thread_mutex替代
//
static unsigned int QSize; //队列大小约束,超出是会从队列头剔除旧数据腾出空位在对末添加数据
//
int queue_overS; //队列溢出次数计数
};
template <class T>
unsigned int QueueData<T>::QSize = 100;
template <class T>
QueueData<T>::QueueData(std::string desc)
: queue_desc(desc)
{
init();
};
template <class T>
void QueueData<T>::init()
{
queue_overS = 0;
};
template <class T>
QueueData<T>::~QueueData()
{
}
//
template <class T>
int QueueData<T>::size()
{
int ret = 0;
m_Mutex.Lock();
ret = static_cast<int>(datacache_queue.size());
m_Mutex.Unlock();
return ret;
}
template <class T>
bool QueueData<T>::isEmpty()
{
bool ret = false;
m_Mutex.Lock();
ret = datacache_queue.empty();
m_Mutex.Unlock();
return ret;
}
template <class T>
bool QueueData<T>::getFirst(T &it)
{
bool ret = false;
m_Mutex.Lock();
if (!datacache_queue.empty())
{
it = datacache_queue.front();
ret = true;
}
m_Mutex.Unlock();
return ret;
}
template <class T>
bool QueueData<T>::removeFirst()
{
bool ret = false;
m_Mutex.Lock();
if (!datacache_queue.empty())
{
datacache_queue.pop_front();
ret = true;
}
m_Mutex.Unlock();
return ret;
}
template <class T>
bool QueueData<T>::pop(T &it)
{
bool ret = false;
m_Mutex.Lock();
if (!datacache_queue.empty())
{
it = datacache_queue.front();
datacache_queue.pop_front();
ret = true;
}
m_Mutex.Unlock();
return ret;
};
template <class T>
bool QueueData<T>::getList(std::queue<T> &its,unsigned int sizel)
{
m_Mutex.Lock();
while (!datacache_queue.empty())
{
its.push(datacache_queue.front());
datacache_queue.pop_front();
if (its.size() >= sizel)
{
break;
}
}
m_Mutex.Unlock();
return !its.empty();
};
template <class T>
void QueueData<T>::add(T it)
{
m_Mutex.Lock();
if (datacache_queue.size() > QSize)
{
queue_overS++;
datacache_queue.pop_front();
}
datacache_queue.push_back(it);
m_Mutex.Unlock();
if (queue_overS >= 10)
{
//每溢出10次,报告一次
CLogger::createInstance()->Log(eSoftError
,"add item to queue %s at end,but the size of QueueData is up to limmit size: %d.\n"
, queue_desc.c_str(), QSize);
queue_overS = 0;
}
}
template <class T>
void QueueData<T>::add_front(T it)
{
m_Mutex.Lock();
if (datacache_queue.size() > QSize)
{
queue_overS++;
datacache_queue.pop_front();
}
datacache_queue.push_front(it);
m_Mutex.Unlock();
if (queue_overS >= 10)
{
//每溢出10次,报告一次
CLogger::createInstance()->Log(eSoftError,
"add item to queue %s at first,but the size of QueueData is up to limmit size: %d.\n"
, queue_desc.c_str(), QSize);
queue_overS = 0;
}
}
template <class T>
void QueueData<T>::clear()
{
m_Mutex.Lock();
datacache_queue.clear();
m_Mutex.Unlock();
queue_overS = 0;
}
#endif //_QUEUE_DATA_H_
win32Thread.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef WIN32THREAD_H
#define WIN32THREAD_H
#include <process.h>
#include <iostream>
typedef void *HANDLE;
class MyThread
{
public:
MyThread();
~MyThread();
void start();
virtual int Run();
HANDLE getThread();
private:
HANDLE hThread;
static void agent(void *p);
};
#endif
win32Thread.cpp
#include "win32Thread.h"
#include <windows.h>
MyThread::MyThread()
{
}
MyThread::~MyThread()
{
WaitForSingleObject(hThread, INFINITE);
}
void MyThread::start()
{
hThread =(HANDLE)_beginthread(agent, 0, (void *)this);
}
int MyThread::Run()
{
printf("Base Thread\n");
return 0;
}
void MyThread::agent(void *p)
{
MyThread *agt = (MyThread *)p;
agt->Run();
}
HANDLE MyThread::getThread()
{
return hThread;
}
7.2 srv_IO目录
MySocket.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MySocket_H_
#define _MySocket_H_
/*
*建立socket服务端
*/
#include "DataDef.h"
class MySocketPrivate;
class MySocketSrv;
class MySocketRD;
class MySocketWD;
class MySocket
{
public:
MySocket(unsigned int port,int netType_=1);
~MySocket(void);
public:
int Read(){ return -1; };
int Write(){ return -1; };
//
//int Read(char* buf, int size);
int Write(const char* buf, int size);
private:
MySocketPrivate *my_PrivateData;
MySocketSrv *my_SocketSrv;
MySocketRD *m_MySocketRD;
MySocketWD *m_MySocketWD;
};
#endif //_MYSOCKET_H_
MySocket.cpp
#include "MySocket.h"
#include "MySocketPrivate.h"
#include "MySocketSrv.h"
#include "MySocketRD.h"
#include "MySocketWD.h"
#include "Log.h"
*MySocketExx*///
MySocket::MySocket(unsigned int port,int netType_)
{
try {
my_PrivateData = new MySocketPrivate(port);
if (my_PrivateData->onConnect() > 0)
{
my_SocketSrv = new MySocketSrv();
my_SocketSrv->setPDataPtr(my_PrivateData);
my_SocketSrv->start();
m_MySocketRD = new MySocketRD(my_PrivateData,netType_);
m_MySocketRD->start();
m_MySocketWD = new MySocketWD( my_PrivateData,netType_);
m_MySocketWD->start();
}
else {
my_SocketSrv = NULL;
m_MySocketRD = NULL;
m_MySocketWD = NULL;
CLogger::createInstance()->Log(eSoftError,
"listen port(%u) error, [%s %s %d]"
, port
, __FILE__, __FUNCTION__, __LINE__);
}
}
catch (...) {
delete my_SocketSrv;
my_SocketSrv = NULL;
delete m_MySocketRD;
m_MySocketRD = NULL;
delete m_MySocketWD;
m_MySocketWD = NULL;
delete my_PrivateData;
my_PrivateData = NULL;
CLogger::createInstance()->Log(eSoftError,
"MySocketGx init error, [%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
}
};
MySocket::~MySocket(void)
{
if (NULL != my_SocketSrv)
{
delete my_SocketSrv;
my_SocketSrv = NULL;
}
if (NULL != m_MySocketRD)
{
delete m_MySocketRD;
m_MySocketRD = NULL;
}
if (NULL != m_MySocketWD)
{
delete m_MySocketWD;
m_MySocketWD = NULL;
}
if(NULL!=my_PrivateData)
{
delete my_PrivateData;
my_PrivateData = NULL;
}
};
int MySocket::Write(const char* buf, int size)
{
if (NULL != m_MySocketWD&&NULL!=buf)
{
return m_MySocketWD->AddData(buf, size);
}
else {
return -1;
}
}
MySocketPrivate.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MYSOCKETPRIVATE_H_
#define _MYSOCKETPRIVATE_H_
#include <map>
#include <queue>
#include <set>
#ifdef WIN32
#include "afxsock.h"
#define MY_SOCKET SOCKET
#define MY_SOCKET_NULL NULL
#endif
#ifdef __linux__
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define printf_s printf
#define sprintf_s sprintf
#define MY_SOCKET int
#define MY_SOCKET_NULL (-1)
#endif
#include "Mutex.h"
#include "hashmap.h"
#include "DataDef.h"
class MySocketPrivate
{
public:
MySocketPrivate(unsigned int port)
: m_Port(port)
, m_OnListen(false)
{
m_SSocket = MY_SOCKET_NULL;
m_CSockets.clear();
#ifdef WIN32
/*
* This function should be called once in each secondary thread
* before the first socket is created in the new thread.
*/
SocketThreadInit();
#endif
};
~MySocketPrivate(){
disConnect();
};
public:
int onConnect();
void disConnect();
int Read(std::map<KeyObj_Client,RDClient> &bufs);
int Write(const char* buf, int size);
int Write(unsigned long long ipInt,const char* buf, int size);
bool Accept();
bool get_ipInt_list(std::set<long> &ipintlist); //获取在线端的整型IP
#ifdef WIN32
private:
void SocketThreadInit();
#endif
private:
void deleteSSocket(); //删除服务端
void deleteCSocket(); //删除所有客户端
void deleteCSocket(MY_SOCKET m_CSocket);//删除指定客户端
private:
MY_SOCKET m_SSocket; //服务端
unsigned int m_Port; //端口变量
bool m_OnListen; //用于标注侦听
PYMutex m_MyMutex;
std::map<KeyObj_Client,MY_SOCKET> m_CSockets; //绑定客户端
};
#endif //
MySocketPrivate.cpp
#include "MySocketPrivate.h"
#include "myFunc.h"
#include "Log.h"
#ifdef __linux__
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#define printf_s printf
#define sprintf_s sprintf
#pragma message("def printf_s from printf And def sprintf_s from sprintf")
#endif
#ifdef WIN32
void MySocketPrivate::SocketThreadInit()
{
WORD wVersionRequested;
WSADATA wsaData;
int err;
wVersionRequested = MAKEWORD(2, 2);
err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0)
{
//printf("WSAStartup failed with error: %d\n", err);
CLogger::createInstance()->Log(eSoftError,
"WSAStartup failed with error: %d, [%s %s %d]"
, err
, __FILE__, __FUNCTION__, __LINE__);
return;
}
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
{
/* Tell the user that we could not find a usable */
/* WinSock DLL. */
//printf("Could not find a usable version of Winsock.dll\n");
CLogger::createInstance()->Log(eSoftError,
"Could not find a usable version of Winsock.dll: [%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
WSACleanup();
return;
}
else {
printf("The Winsock 2.2 dll was found okay\n");
}
}
#endif
int MySocketPrivate::onConnect()
{
if (m_OnListen) //服务器Socket是否已经创建
{
//printf_s("it's has been Listten! \r\n");
CLogger::createInstance()->Log(eTipMessage,
"it's has been Listten, [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
return 1;
}
else {
#ifdef WIN32
m_SSocket = socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN addrServ;
addrServ.sin_family = AF_INET;
addrServ.sin_port = htons(m_Port);
addrServ.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
bind(m_SSocket, (SOCKADDR*)&addrServ, sizeof(SOCKADDR));
/*
*3.在send(),recv()过程中有时由于网络状况等原因,收发不能预期进行,可以设置收发时限:
*int nNetTimeout = 1000; //1秒
*发送时限
*setsockopt( socket, SOL_SOCKET, SO_SNDTIMEO, ( char * )&nNetTimeout, sizeof( int ) );
*接收时限
*setsockopt( socket, SOL_SOCKET, SO_RCVTIMEO, ( char * )&nNetTimeout, sizeof( int ) );
*4.在send()的时候,返回的是实际发送出去的字节(同步)或发送到socket缓冲区的字节(异步);系统默认的状态发送和接收一次为8688字节(约
*为8.5K);在实际的过程中如果发送或是接收的数据量比较大,可以设置socket缓冲区,避免send(),recv()不断的循环收发:
* 接收缓冲区
*int nRecvBufLen = 32 * 1024; //设置为32K
*setsockopt( s, SOL_SOCKET, SO_RCVBUF, ( const char* )&nRecvBufLen, sizeof( int ) );
*发送缓冲区
*int nSendBufLen = 32*1024; //设置为32K
*setsockopt( s, SOL_SOCKET, SO_SNDBUF, ( const char* )&nSendBufLen, sizeof( int ) );
*5.在发送数据的时,不执行由系统缓冲区到socket缓冲区的拷贝,以提高程序的性能:
*int nZero = 0;
*setsockopt( socket, SOL_SOCKET, SO_SNDBUF, ( char * )&nZero, sizeof( nZero ) );
*6.在接收数据时,不执行将socket缓冲区的内容拷贝到系统缓冲区:
*int nZero = 0;
*setsockopt( s, SOL_SOCKET, SO_RCVBUF, ( char * )&nZero, sizeof( int ) );
*/
//如果创建Socket失败则提示,成功则开始监听
if (listen(m_SSocket, 20) == SOCKET_ERROR)
{
closesocket(m_SSocket);
//printf_s("ServerSocket Create failed! error:%d \r\n",static_cast<int>(GetLastError()));
CLogger::createInstance()->Log(eParameterError,
"ServerSocket Create failed! error:%d, [%s %s %d]"
, static_cast<int>(GetLastError())
, __FILE__, __FUNCTION__, __LINE__);
return -1;
}
else {
CLogger::createInstance()->Log(eTipMessage, "on listen port(%d)", m_Port);
m_OnListen = true;
return 1;
}
#else
m_SSocket = socket(AF_INET, SOCK_STREAM, 0);
if (MY_SOCKET_NULL == m_SSocket)
{
CLogger::createInstance()->Log(eSoftError,
"socket create fail ![%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
return -1;
}
struct sockaddr_in s_add;
bzero(&s_add, sizeof(struct sockaddr_in));
s_add.sin_family = AF_INET;
s_add.sin_addr.s_addr = htonl(INADDR_ANY);
s_add.sin_port = htons(m_Port);
if (-1 == bind(m_SSocket, (struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
{
CLogger::createInstance()->Log(eSoftError,
"socket bind fail, %d ![%s %s %d]",m_Port
, __FILE__, __FUNCTION__, __LINE__);
return -1;
}
else {
CLogger::createInstance()->Log(eTipMessage,
"bind success, %d! [%s %s %d]!",m_Port
, __FILE__, __FUNCTION__, __LINE__);
}
if (-1 == listen(m_SSocket, 5))
{
CLogger::createInstance()->Log(eSoftError,
"listen %d fail ![%s %s %d]",m_Port
, __FILE__, __FUNCTION__, __LINE__);
return -1;
}
else {
CLogger::createInstance()->Log(eTipMessage,
"listen success, %d ! [%s %s %d]!",m_Port
, __FILE__, __FUNCTION__, __LINE__);
}
m_OnListen = true;
return 1;
#endif
}
}
bool MySocketPrivate::Accept()
{
bool bRet = true;
if (m_OnListen)
{
#ifdef WIN32
SOCKADDR_IN cliAddr;
int length = sizeof(SOCKADDR);
SOCKET cliSock = accept(m_SSocket, (SOCKADDR*)&cliAddr, &length);
if (INVALID_SOCKET == cliSock)
{
closesocket(cliSock);
//printf_s("Connect Accept Failed: %d! \r\n",static_cast<int>(GetLastError()));
CLogger::createInstance()->Log(eSoftError,
"Connect Accept Failed: %d! , [%s %s %d]"
, static_cast<int>(GetLastError())
, __FILE__, __FUNCTION__, __LINE__);
bRet = false;
}
else {
//cliAddr.sin_addr.S_un.S_addr;
char _ipport[64] = { 0 };
//sprintf_s(_ipport,"%s",(char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))));
sprintf_s(_ipport, "%s:%d", (char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
//std::string _linkInfo = _ipport;
KeyObj_Client _linkInfo((char*)inet_ntoa((*(in_addr*)&(cliAddr.sin_addr))), cliAddr.sin_port);
int nNetTimeout = 100; //1秒
//setsockopt(cliSock, SOL_SOCKET, SO_SNDTIMEO, (char *)&nNetTimeout, sizeof(int));
setsockopt(cliSock, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));
m_MyMutex.Lock();
m_CSockets[_linkInfo] = cliSock;//添加客户端
m_MyMutex.Unlock();
//printf_s("Connect Accept Success: %s \r\n", _ipport);
CLogger::createInstance()->Log(eTipMessage,
"Connect Accept Success: %s,[%s %s %d]"
, _ipport
, __FILE__, __FUNCTION__, __LINE__);
}
#else
int sin_size = sizeof(struct sockaddr_in);
struct sockaddr_in c_add;
// printf("MySocketPrivate::Accept 1\n");
int nfp = accept(m_SSocket, (struct sockaddr *)(&c_add), (socklen_t*)&sin_size);
if (-1 == nfp)
{
//printf("accept fail !\r\n");
CLogger::createInstance()->Log(eParameterError,
"accept fail![%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
bRet = false;
}
else {
char _ipport[64] = { 0 };
std::string _ipStr = inet_ntoa((*(in_addr*)&(c_add.sin_addr)));
//std::string _ipStr = PFunc::intToIp(htonl(c_add.sin_addr.s_addr));
int _port = static_cast<int>(htons(c_add.sin_port));
sprintf(_ipport, "%s:%d", _ipStr.c_str(), _port);
/*
struct timeval timeout = {3,0};
//setsockopt(nfp,SOL_SOCKET,SO_SNDTIMEO,(char *)&timeout,sizeof(struct timeval));
setsockopt(nfp,SOL_SOCKET,SO_RCVTIMEO,(char *)&timeout,sizeof(struct timeval));
KeyObj_Client _linkInfo((char*)inet_ntoa((*(in_addr*)&(c_Addr.sin_addr))), cliAddr.sin_port);
*/
int x = fcntl(nfp, F_GETFL, 0);
fcntl(nfp, F_SETFL, x | O_NONBLOCK);
KeyObj_Client _linkInfo(_ipStr, _port);
m_MyMutex.Lock();
//nfps.push_back(nfp);
//nfps[KeyObj_Client(_ipStr, _port)] = nfp;
m_CSockets[_linkInfo] = nfp;
m_MyMutex.Unlock();
//printf("accept ok!\r\nServer start get connect from %s\r\n", _ipport);
CLogger::createInstance()->Log(eTipMessage,
"accept ok!\r\nServer start get connect from %s.[%s %s %d]"
, _ipport
, __FILE__, __FUNCTION__, __LINE__);
}
#endif
}
else {
bRet = false;
//printf_s("m_OnListen is false, please check Listen state, Accept error \r\n");
CLogger::createInstance()->Log(eTipMessage,
"m_OnListen is false, please check Listen state, Accept error,[%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
}
return bRet;
};
bool MySocketPrivate::get_ipInt_list(std::set<long> &ipintlist)
{
for (std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();it!= m_CSockets.end();it++)
{
ipintlist.insert(it->first.m_ip);
}
return !ipintlist.empty();
}
void MySocketPrivate::disConnect()
{
deleteCSocket();//删除客户端
deleteSSocket();//删除服务端
#ifdef WIN32
WSACleanup();
#endif
CLogger::createInstance()->Log(eTipMessage,
"socket disConnect success and exit: [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
}
void MySocketPrivate::deleteCSocket()
{
m_MyMutex.Lock();
std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
while (it != m_CSockets.end())
{
//删除链接
deleteCSocket(it->second);
#ifdef WIN32
it = m_CSockets.erase(it);
#else
std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
m_CSockets.erase(ittemp);
#endif
}
m_MyMutex.Unlock();
}
void MySocketPrivate::deleteCSocket(MY_SOCKET m_CSocket)
{
try {
if (MY_SOCKET_NULL != m_CSocket)
{
#ifdef WIN32
closesocket(m_CSocket);
#else
close(m_CSocket);
#endif
m_CSocket = MY_SOCKET_NULL;
}
}
catch (...) {
CLogger::createInstance()->Log(eSoftError,
"socket deleteCSocket exception and failed! [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
}
}
void MySocketPrivate::deleteSSocket()
{
m_OnListen = false;
try {
if (MY_SOCKET_NULL != m_SSocket)
{
#ifdef WIN32
closesocket(m_SSocket);
#else
close(m_SSocket);
#endif
m_SSocket = MY_SOCKET_NULL;
}
}
catch (...) {
CLogger::createInstance()->Log(eSoftError,
"socket deleteSSocket exception and failed! [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
}
};
//return success read count
int MySocketPrivate::Read(std::map<KeyObj_Client, RDClient> &bufs)
{
int ret = 0;
m_MyMutex.Lock();
std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
while (it != m_CSockets.end())
{
char _buf[512] = { 0 };
#ifdef WIN32
int re_one = recv(it->second, _buf, 512, 0);
if (re_one <= 0)
{
int _error = GetLastError();
if (_error != 10060)
{
//printf_s("read data failed from %s! return val is %d. \r\n", it->first.c_str(), re);
CLogger::createInstance()->Log(eReadError,
"read data failed from %s! return val is %d,error(%d).[%s %s %d]"
, it->first.m_ipStr.c_str(), re_one, _error
, __FILE__, __FUNCTION__, __LINE__);
//删除链接
deleteCSocket(it->second);
it = m_CSockets.erase(it);
continue;
}
else {
re_one = 0;
}
}
#else
int re_one = recv(it->second, _buf, 256, MSG_DONTWAIT);
if (re_one <= 0)
{
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re_one = 0;
}
else {
CLogger::createInstance()->Log(eReadError,
"read data failed from %s! return val is %d.[%s %s %d]"
, it->first.m_ipStr.c_str(), re_one
, __FILE__, __FUNCTION__, __LINE__);
//删除连接
deleteCSocket(it->second);
std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
m_CSockets.erase(ittemp);
continue;
}
}
#endif
if (re_one>0)
{
ret += 1;
std::map<KeyObj_Client, RDClient>::iterator itrd = bufs.find(it->first);
if (itrd != bufs.end())
{
itrd->second.add((unsigned char*)_buf, re_one);
}
else {
bufs[it->first] = RDClient((unsigned char*)_buf, re_one);
}
}
it++;
}
m_MyMutex.Unlock();
return ret;
};
//return success count
int MySocketPrivate::Write(const char* buf, int size)
{
int ret = 0;
m_MyMutex.Lock();
std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
while (it != m_CSockets.end())
{
// printf_s("write data %d to client is started!\r\n",size);
#ifdef WIN32
int re = send(it->second, buf, size, 0);
if (re <= 0)
{
int _error = GetLastError();
if (_error != 10060)
{
CLogger::createInstance()->Log(eWriteError,
"socket write data failed! return val is %d,%s.[%s %s %d]"
, re, buf
, __FILE__, __FUNCTION__, __LINE__);
//删除连接
deleteCSocket(it->second);
it = m_CSockets.erase(it);
continue;
}
else {
re = 0;
}
}
#else
int re = send(it->second, buf, size, MSG_DONTWAIT);
if (re <= 0)
{
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re = 0;
}
else {
CLogger::createInstance()->Log(eWriteError,
"Write Data Failed! error(%d,%s)! [%s %s %d]"
, errno, strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
//删除连接
deleteCSocket(it->second);
std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
m_CSockets.erase(ittemp);
continue;
}
}
#endif
else{
ret += 1;
}
it++;
}
m_MyMutex.Unlock();
return ret;
};
//return success count
int MySocketPrivate::Write(unsigned long long ipInt, const char* buf, int size)
{
int ret = 0;
m_MyMutex.Lock();
std::map<KeyObj_Client, MY_SOCKET>::iterator it = m_CSockets.begin();
while (it != m_CSockets.end())
{
// printf_s("write data %d to client is started!\r\n",size);
//当前版本只针对网络地址做判断,即一台电脑多个客户端连接,都会被发送数据
if ((unsigned long long)it->first.m_ip == ipInt)
{
#ifdef WIN32
int re = send(it->second, buf, size, 0);
if (re < 0)
{
int _error = GetLastError();
if (_error != 10060)
{
CLogger::createInstance()->Log(eWriteError,
"socket write data failed! return val is %d,%s.[%s %s %d]"
, re, buf
, __FILE__, __FUNCTION__, __LINE__);
//删除连接
deleteCSocket(it->second);
it = m_CSockets.erase(it);
continue;
}
else {
re = 0;
}
}
#else
int re = send(it->second, buf, size, MSG_DONTWAIT);
if (re <= 0)
{
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re = 0;
}
else {
CLogger::createInstance()->Log(eWriteError,
"Write Data Failed! error(%d,%s)! [%s %s %d]"
, errno, strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
//删除连接
deleteCSocket(it->second);
std::map<KeyObj_Client, MY_SOCKET>::iterator ittemp = it++;
m_CSockets.erase(ittemp);
continue;
}
}
#endif
else {
ret += 1;
}
}
it++;
}
m_MyMutex.Unlock();
return ret;
}
MySocketRD.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MYSOCKETGXRD_H_
#define _MYSOCKETGXRD_H_
/*
该线程从各个客户端读取数据,并将数据分帧
*/
#ifdef WIN32
#include "win32Thread.h"
#endif
#ifdef linux
#include "myThread.h"
#endif
#include "DataDef.h"
#include "queuedata.h"
class MySocketPrivate;
class MySocketRD : public MyThread
{
public:
MySocketRD(MySocketPrivate* myPDataPrt_, int netType_=1);
virtual ~MySocketRD(void);
int Run();
//从缓存中读取帧数据处理,请按需自行处理该函数
int AddFrame(const std::string link, const unsigned char *buf, int len);
private:
bool running;
int netType;//数据读写处理类型
MySocketPrivate *myPDataPrt;
QueueData<RDS> ReadData;
};
#endif
MySocketRD.cpp
#include "MySocketRD.h"
#include "MySocketPrivate.h"
#include "myFunc.h"
#include "Log.h"
MySocketRD::MySocketRD( MySocketPrivate* myPDataPrt_, int netType_)
: running(true)
, netType(netType_)
, myPDataPrt(myPDataPrt_)
{
}
MySocketRD::~MySocketRD(void)
{
running = false;
};
int MySocketRD::Run()
{
if (NULL == myPDataPrt )
{
CLogger::createInstance()->Log(eSoftError,
"MySocketRD start fail for myPDataPrt is NULL,[%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
return 0;
}
std::map<KeyObj_Client, RDClient> bufs;
RDS rdataGx;
while (running)
{
int re = myPDataPrt->Read(bufs);
if (re <= 0)
{
#ifdef _DEBUG
printf_s("Read Data Failed or NULL\n!");
#else
;
#endif
}else {
switch (netType)
{
case 1:
{
try {
std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
while (it != bufs.end())
{
if (it->second.len > 0)
{
RDS rdata(TCP_Data(it->second.Buf, it->second.len), it->first.m_ipStr);
ReadData.add(rdata);
}
it++;
}
bufs.clear();
}
catch (const std::exception& e)
{
CLogger::createInstance()->Log(eSoftError,
"Exception for Reading and Parsing Error[%s],NetType(%d), [%s %s %d]"
, e.what()
, netType
, __FILE__, __FUNCTION__, __LINE__);
}
catch (...) {
CLogger::createInstance()->Log(eSoftError,
"Exception for Reading and Parsing Error,NetType(%d),[%s %s %d]!"
, netType
, __FILE__, __FUNCTION__, __LINE__);
}
while (ReadData.getFirst(rdataGx))
{
this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
ReadData.removeFirst();
}
break;
}
case 2:
{
try {
std::map<KeyObj_Client, RDClient>::iterator it = bufs.begin();
while (it != bufs.end())
{
unsigned char * buff = it->second.Buf;
int start_frame = 0;
unsigned char ctype = 0;
for (int i = 0; i < it->second.len; i++)
{
//printf_s("%02X ",buff[i]);
if (buff[i] > 0xf0) {
if (buff[i] == 0xff)
{
if (ctype)
{
ctype = 0;
int re_len = i - start_frame + 1;
// RDS rdata(TCP_Data(buff + start_frame, i - start_frame + 1), it->first.m_ipStr);
unsigned char * pBuf = new unsigned char[re_len];
//
int nLen = PFunc::uncode(buff + start_frame, re_len, pBuf);
RDS rdata(TCP_Data(pBuf, nLen), it->first.m_ipStr);
// printf("rev01:%s\r\n",(char*)pBuf);
printf("rev01:%d\r\n",nLen);
ReadData.add(rdata);
start_frame = i + 1;
delete[] pBuf;
pBuf = NULL;
}
}
else {
ctype = buff[i];
start_frame = i;
}
}
}
buff = NULL;
if (start_frame < it->second.len)
{
RDClient _newrd(it->second.Buf + start_frame, it->second.len - start_frame);
it->second = _newrd;
it++;
}
else {
#ifdef WIN32
it = bufs.erase(it);
#else
std::map<KeyObj_Client, RDClient>::iterator ittemp = it++;
bufs.erase(ittemp);
#endif
}
}
}
catch (const std::exception& e)
{
CLogger::createInstance()->Log(eSoftError,
"Data Deserialize false[%s],[%s %s %d]"
, e.what()
, __FILE__, __FUNCTION__, __LINE__);
}
catch (...) {
CLogger::createInstance()->Log(eSoftError,
"Data Deserialize false,[%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
}
while (ReadData.getFirst(rdataGx))
{
this->AddFrame(rdataGx.flag, rdataGx.data.Buf, rdataGx.data.len);
ReadData.removeFirst();
}
break;
}
default:
break;
}
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
int MySocketRD::AddFrame(const std::string link, const unsigned char *buf, int len)
{
if(NULL == buf)
return 0;
printf("rev:%s\r\n",(char*)buf);
// TcpSocket ts;
// memcpy(&ts,buf,len);
// printf("rev:%u,%d,%0.2f,%s\r\n",ts.type,ts.len,ts.val,ts.desc);
return 0;
};
MySocketSrv.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MYSOCKETSRV_H_
#define _MYSOCKETSRV_H_
#ifdef WIN32
#include "win32Thread.h"
#endif
#ifdef linux
#include "myThread.h"
#endif
class MySocketPrivate;
class MySocketSrv : public MyThread
{
public:
MySocketSrv();
virtual ~MySocketSrv();
void setPDataPtr(MySocketPrivate *myPData);
int Run();
private:
MySocketPrivate *myPDataPrt;
};
#endif
MySocketSrv.cpp
#include "MySocketSrv.h"
#include "MySocketPrivate.h"
#include "Log.h"
MySocketSrv::MySocketSrv(void)
: myPDataPrt(NULL)
{
}
MySocketSrv::~MySocketSrv(void)
{
}
void MySocketSrv::setPDataPtr(MySocketPrivate *myPData)
{
myPDataPrt=myPData;
};
int MySocketSrv::Run()
{
if (NULL == myPDataPrt)
{
CLogger::createInstance()->Log(eSoftError,
"MySocketSrv start fail for myPDataPrt is NULL,[%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
return 0;
}
while(1)
{
myPDataPrt->Accept();
#ifdef WIN32
Sleep(300);
#else
usleep(300000);
#endif
}
return 0;
}
MySocketWD.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MySocketWD_H_
#define _MySocketWD_H_
/*
该线程向指定客户端写入数据,将数据协议编码并序列化
*/
#ifdef WIN32
#include "win32Thread.h"
#endif
#ifdef linux
#include "myThread.h"
#endif
#include "DataDef.h"
#include "queuedata.h"
class MySocketPrivate;
class MySocketWD : public MyThread
{
public:
MySocketWD(MySocketPrivate* myPDataPrt_,int netType_=1);
virtual ~MySocketWD(void);
int Run();
int AddData(const char* buf, int len);
int getBuffer(unsigned long long &_ipInt, unsigned char* _buf);
private:
bool running;
int netType;
MySocketPrivate *myPDataPrt;
QueueData<WDS> WriteData;
};
#endif
MySocketWD.cpp
#include "MySocketWD.h"
#include "MySocketPrivate.h"
#include "myFunc.h"
#include "Log.h"
#include <set>
#ifdef __linux__
#include <stdexcept>
#endif
MySocketWD::MySocketWD( MySocketPrivate* myPDataPrt_,int netType_)
: running(true)
, netType(netType_)
, myPDataPrt(myPDataPrt_)
{
}
MySocketWD::~MySocketWD(void)
{
running = false;
}
int MySocketWD::Run()
{
if (NULL == myPDataPrt)
{
CLogger::createInstance()->Log(eSoftError,
"MySocketWD start fail for myPDataPrt is NULL,[%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
return 0;
}
while(running)
{
try {
unsigned long long _ipInt = 0;
unsigned char buf[512] = { 0 };
int len = this->getBuffer(_ipInt, buf);
if (len > 0)
{
int ret = -1;
switch (netType)
{
case 1:
{
ret = myPDataPrt->Write(_ipInt, (const char*)buf, len);
break;
}
case 2:
{
// printf("send data: %s\r\n",buf);
unsigned char* _buf = new unsigned char[2 * len + 1];
memset(_buf, 0, 2 * len + 1);
len = PFunc::code(buf, len, _buf);//序列化处理
printf("send data: %d\r\n",len);
ret = myPDataPrt->Write(_ipInt, (const char*)_buf, len);
delete[] _buf;
_buf = NULL;
break;
}
default:
{
char warBuf[128] = { 0 };
sprintf(warBuf, "MySocketWD::Run For Unkown NetType(%02X)", netType);
#ifdef WIN32
throw std::exception(warBuf);
#else
throw std::domain_error(warBuf);
#endif
break;
}
}
if (ret <=0)
{
//printf("send data: %d, buf %d\n",len,ret);
CLogger::createInstance()->Log(eTipMessage,
"MySocketWD send data(%s,%d) fail. [%s %s %d]"
,buf,len
, __FILE__, __FUNCTION__, __LINE__);
}
//else{
// printf("send data: %d, and real send %d\n",len,ret);
//}
}
}
catch (const std::exception& e)
{
CLogger::createInstance()->Log(eSoftError,
"MySocketWD Run for data writing exception[%s],[%s %s %d]"
, e.what()
, __FILE__, __FUNCTION__, __LINE__);
}
catch (...) {
CLogger::createInstance()->Log(eSoftError,
"MySocketWD Run for data writing exception, [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
int MySocketWD::AddData(const char* buf, int len)
{
if(len>0&&NULL!=buf){
if(len>=512){
printf("buf len is >=512!\r\n");
}else{
std::set<long> ipintlist;
if(!myPDataPrt->get_ipInt_list(ipintlist))
{
return 0;
}
std::set<long>::iterator it = ipintlist.begin();
while (it != ipintlist.end())
{
/* code */
WDS wdata(*it,TCP_Data((unsigned char*)buf, len));
WriteData.add(wdata);
it++;
}
return len;
}
}
return 0;
}
int MySocketWD::getBuffer(unsigned long long &_ipInt, unsigned char* _buf)
{
if(NULL == _buf)
return 0;
int ret = 0;
WDS wdata;
if(WriteData.getFirst(wdata))
{
try{
if (!WriteData.removeFirst())
{
#ifdef WIN32
throw std::exception("removeFirst WData failed!");
#else
throw std::logic_error("removeFirst WData failed!");
#endif
}
_ipInt = wdata.ipInt;
memcpy(_buf,wdata.data.Buf,wdata.data.len);
ret = wdata.data.len;
}
catch (const std::exception& e)
{
CLogger::createInstance()->Log(eSoftError,
"write item info to socket failed! have error[%s]. [%s %s %d]"
, e.what()
, __FILE__, __FUNCTION__, __LINE__);
ret = -1;
}
catch (...) {
//printf_s("write item info to socket failed! have error. \r\n");
CLogger::createInstance()->Log(eSoftError,
"write item info to socket failed! have error. [%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
ret = -2;
}
}
return ret;
}
7.4 client_IO目录
MySocket.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MYSOCKET_H_
#define _MYSOCKET_H_
/*
*建立socket客户端
*/
#include <string>
#include "DataDef.h"
class MySocketClient;
class MySocketWD;
class MySocketRD;
class MySocket
{
public:
MySocket(int _tranid, NetArg _netarg);
virtual ~MySocket(void);
public:
virtual int Read(){ return -1; };
virtual int Write(){ return -1; };
int Write(const char* buf, int size);
private:
int tranid;
NetArg netarg;
MySocketClient *my_PrivateData;
MySocketWD *m_MySocketWD;
MySocketRD *m_MySocketRD;
};
#endif //_MYSOCKET_H_
MySocket.cpp
#include "MySocket.h"
#include "MySocketClient.h"
#include "MySocketWD.h"
#include "MySocketRD.h"
#include "Log.h"
*MySocket*///
MySocket::MySocket(int _tranid, NetArg _netarg)
: tranid(_tranid)
, netarg(_netarg)
{
try {//防止构造时异常出现内存泄漏
//TCP/IP客户端,连接监控服务或其他平台
my_PrivateData = new MySocketClient(netarg.ipStr, netarg.port);
if (my_PrivateData->onConnect() <= 0)
{
CLogger::createInstance()->Log(eSoftError,
"connect server[%s,%d] error,please check it,[%s %s %d]!"
, netarg.ipStr.c_str(), netarg.port
, __FILE__, __FUNCTION__, __LINE__);
}
//数据协议编码解码 序列化及反序列化
//数据发送线程
m_MySocketWD = new MySocketWD();
m_MySocketWD->setPrivateDataPtr(my_PrivateData, netarg.type);
m_MySocketWD->start();
//数据接收线程
m_MySocketRD = new MySocketRD();
m_MySocketRD->setPrivateDataPtr(my_PrivateData, netarg.type);
m_MySocketRD->start();
}
catch (...)
{
delete m_MySocketRD;
m_MySocketRD = NULL;
delete m_MySocketWD;
m_MySocketWD = NULL;
delete my_PrivateData;
my_PrivateData = NULL;
CLogger::createInstance()->Log(eSoftError,
"MySocket init error,please check it,[%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
}
}
MySocket::~MySocket(void)
{
if(NULL!= m_MySocketRD)
{
delete m_MySocketRD;
m_MySocketRD = NULL;
}
if (NULL != m_MySocketWD)
{
delete m_MySocketWD;
m_MySocketWD = NULL;
}
if (NULL != my_PrivateData)
{
delete my_PrivateData;
my_PrivateData = NULL;
}
};
int MySocket::Write(const char* buf, int size)
{
if (size <= 0)
{
return -1;
}
if (NULL != m_MySocketWD && NULL != buf)
{
return m_MySocketWD->add_data(buf, size);
}
else
{
return -1;
}
};
MySocketClient.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef MY_SOCKET_CLIENT_H
#define MY_SOCKET_CLIENT_H
#ifdef WIN32
#include "afxsock.h"
#else
#define UINT unsigned int
#endif
#include <string>
#ifdef __linux__
#include <string.h>
#include <stdio.h>
#endif
#include "DataDef.h"
class MySocketClient
{
public:
MySocketClient(std::string ip, UINT port);
~MySocketClient(void);
int onConnect();
void disConnect();
bool isConnect(){ return m_OnConnect; };
int reSetSocket();
int Read(RDClient &bufs);
int Read(char* buf, int size);
int Write(const char* buf, int size);
private:
#ifdef WIN32
void SocketThreadInit();
#endif
private:
int sock_fd;
//fd_set read_flags,write_flags; // you know what these are
std::string m_IpAddress;
UINT m_Port; //
bool m_OnConnect; //
/*
*当前写入失败及读取线程都可重新建立链接,m_OnConnecting设置防止冲突
*/
bool m_OnConnecting; //
bool m_ConnectONLog; //防止链接错误日志反复记录,切换状态时必定记录
unsigned int m_log_Interval; //防止链接错误日志长时间不记录,2018-10-08
};
#endif
MySocketClient.cpp
#include "MySocketClient.h"
#ifdef __linux__
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#define printf_s printf
#define sprintf_s sprintf
#endif
#include "Log.h"
#ifdef WIN32
#include <mstcpip.h>
void MySocketClient::SocketThreadInit()
{
WORD wVersionRequested;
WSADATA wsaData;
int err;
wVersionRequested = MAKEWORD(2, 2);
err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0)
{
//printf("WSAStartup failed with error: %d\n", err);
CLogger::createInstance()->Log(eSoftError,
"WSAStartup failed with error: %d, [%s %s %d]"
, err, __FILE__, __FUNCTION__, __LINE__);
return;
}
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
{
/* Tell the user that we could not find a usable */
/* WinSock DLL. */
//printf("Could not find a usable version of Winsock.dll\n");
CLogger::createInstance()->Log(eSoftError,
"Could not find a usable version of Winsock.dll: [%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
WSACleanup();
return;
}
else
{
printf("The Winsock 2.2 dll was found okay\n");
}
}
#endif
MySocketClient::MySocketClient(std::string ip, UINT port)
: m_IpAddress(ip)
, m_Port(port)
{
sock_fd = -1;
#ifdef WIN32
/*initf = true;
if(!AfxWinInit(::GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0)){
CLogger::createInstance()->Log(eTipMessage,"AfxWinInit initial failed!");
initf = false;
}
if (!AfxSocketInit())
{
CLogger::createInstance()->Log(eTipMessage,"WindowSocket initial failed!");
initf = false;
}
*/
SocketThreadInit();
#endif
m_OnConnect = false;
m_OnConnecting = false;
m_ConnectONLog = true;
m_log_Interval = static_cast<unsigned int>(time(NULL));
}
MySocketClient::~MySocketClient(void)
{
disConnect();
}
int MySocketClient::onConnect()
{
if (m_OnConnect) //
{
CLogger::createInstance()->Log(eTipMessage,
"it is on connecting,Please disconnect!, [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
return 0;
}
//防止链接冲突调用
if (m_OnConnecting)
{
return 0;
}
try {
m_OnConnecting = true;
#ifdef WIN32
sock_fd = static_cast<int>(socket(AF_INET, SOCK_STREAM, 0));
SOCKADDR_IN ser_addr;
memset(&ser_addr, 0, sizeof(ser_addr));
ser_addr.sin_family = AF_INET;
ser_addr.sin_addr.s_addr = inet_addr(m_IpAddress.c_str());
ser_addr.sin_port = htons(static_cast<unsigned short>(m_Port));
if (connect(sock_fd, (struct sockaddr *)&ser_addr, sizeof(ser_addr)) < 0)
{
//printf("%s:%d, connect socket failed,%s:%d \r\n", __FILE__, __LINE__,m_IpAddress.c_str(),m_Port);
if (m_ConnectONLog|| m_log_Interval < static_cast<unsigned int>(time(NULL)))
{
m_ConnectONLog = false;
m_log_Interval = static_cast<unsigned int>(time(NULL)) + 3600;
CLogger::createInstance()->Log(eConfigError,
"connect socket failed,%s:%d, [%s %s %d]"
, m_IpAddress.c_str(), m_Port
, __FILE__, __FUNCTION__, __LINE__);
}
m_OnConnecting = false;
return -1;
}
printf("connect socket %s:%d !\r\n", m_IpAddress.c_str(), m_Port);
CLogger::createInstance()->Log(eTipMessage,
"connect socket %s:%d, [%s %s %d]!"
, m_IpAddress.c_str(), m_Port
, __FILE__, __FUNCTION__, __LINE__);
int nNetTimeout = 10; //10毫秒
setsockopt(sock_fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&nNetTimeout, sizeof(int));
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&nNetTimeout, sizeof(int));
//KeepAlive
bool bKeepAlive = true;
int nRet = setsockopt(sock_fd, SOL_SOCKET, SO_KEEPALIVE,(char*)&bKeepAlive, sizeof(bKeepAlive));
if (nRet == SOCKET_ERROR)
{
CLogger::createInstance()->Log(eTipMessage
, "connect socket %s:%d and setsockopt(SO_KEEPALIVE=true) fail!"
, m_IpAddress.c_str(), m_Port);
}
// set KeepAlive parameter
tcp_keepalive alive_in;
tcp_keepalive alive_out;
alive_in.keepalivetime = 1000; // 1s
alive_in.keepaliveinterval = 3000; //3s
alive_in.onoff = true;
unsigned long ulBytesReturn = 0;
nRet = WSAIoctl(sock_fd, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in),
&alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL);
if (nRet == SOCKET_ERROR)
{
CLogger::createInstance()->Log(eTipMessage
, "connect socket %s:%d and setsockopt(tcp_keepalive) fail!"
, m_IpAddress.c_str(), m_Port);
}
m_OnConnect = true;
m_OnConnecting = false;
m_ConnectONLog = true;
return 1;
#else
sock_fd = socket(PF_INET, SOCK_STREAM, 0);
//sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == sock_fd)
{
CLogger::createInstance()->Log(eTipMessage,
"socket fail!, [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
m_OnConnecting = false;
return -1;
}
//printf("socket ok !\r\n");
CLogger::createInstance()->Log(eTipMessage,
"socket ok !, [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
struct sockaddr_in s_add;
bzero(&s_add, sizeof(struct sockaddr_in));
s_add.sin_family = PF_INET;
s_add.sin_addr.s_addr = inet_addr(m_IpAddress.c_str());
s_add.sin_port = htons(m_Port);
printf("s_addr = %#x ,port : %#x\r\n", s_add.sin_addr.s_addr, s_add.sin_port);
CLogger::createInstance()->Log(eTipMessage,
"s_addr = %#x ,port : %#x, [%s %s %d]"
, s_add.sin_addr.s_addr, s_add.sin_port
, __FILE__, __FUNCTION__, __LINE__);
//int x=fcntl(sock_fd,F_GETFL,0);
//fcntl(sock_fd,F_SETFL,x | O_NONBLOCK);
if (-1 == connect(sock_fd, (struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
{
if (m_ConnectONLog)
{
m_ConnectONLog = false;
CLogger::createInstance()->Log(eConfigError,
"connect fail !, [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
}
m_OnConnecting = false;
return -1;
}
int x = fcntl(sock_fd, F_GETFL, 0);
fcntl(sock_fd, F_SETFL, x | O_NONBLOCK);
//signal(SIGCHLD, SIG_IGN);
//FD_ZERO(&read_flags); // Zero the flags ready for using
//FD_ZERO(&write_flags);
//FD_SET(sock_fd, &read_flags);
//FD_SET(sock_fd, &write_flags);
/*
struct timeval timeout = {0,100};
setsockopt(sock_fd,SOL_SOCKET,SO_SNDTIMEO,(char *)&timeout,sizeof(struct timeval));
int tSet = setsockopt(sock_fd,SOL_SOCKET,SO_RCVTIMEO,(char *)&timeout,sizeof(struct timeval));
socklen_t len=sizeof(timeout);
getsockopt(sock_fd,SOL_SOCKET,SO_RCVTIMEO,&timeout,&len);
printf("setsockopt(%d),socklen_t(%d)!\r\n",tSet,len);
KeepAlive实现,单位秒
//下面代码要求有ACE,如果没有包含ACE,则请把用到的ACE函数改成linux相应的接口
int keepAlive = 1;//设定KeepAlive
int keepIdle = 5;//开始首次KeepAlive探测前的TCP空闭时间
int keepInterval = 5;//两次KeepAlive探测间的时间间隔
int keepCount = 3;//判定断开前的KeepAlive探测次数
if(setsockopt(s,SOL_SOCKET,SO_KEEPALIVE,(void*)&keepAlive,sizeof(keepAlive)) == -1)
{
CLogger::createInstance()->Log(eTipMessage,"setsockopt SO_KEEPALIVE error!");
}
if(setsockopt(s,SOL_TCP,TCP_KEEPIDLE,(void *)&keepIdle,sizeof(keepIdle)) == -1)
{
CLogger::createInstance()->Log(eTipMessage,"setsockopt TCP_KEEPIDLE error!");
}
if(setsockopt(s,SOL_TCP,TCP_KEEPINTVL,(void *)&keepInterval,sizeof(keepInterval)) == -1)
{
CLogger::createInstance()->Log(eTipMessage,setsockopt TCP_KEEPINTVL error!");
}
if(setsockopt(s,SOL_TCP,TCP_KEEPCNT,(void *)&keepCount,sizeof(keepCount)) == -1)
{
CLogger::createInstance()->Log(eTipMessage,setsockopt TCP_KEEPCNT error!");
}
*/
CLogger::createInstance()->Log(eTipMessage,
"connect ok !, [%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
m_OnConnect = true;
m_OnConnecting = false;
return 1;
#endif
}
catch (...) {
#ifdef WIN32
CLogger::createInstance()->Log(eSoftError,
"ClientSocket::onConnect error: %d.[%s %s %d]", static_cast<int>(GetLastError())
, __FILE__, __FUNCTION__, __LINE__);
#else
CLogger::createInstance()->Log(eSoftError,
"ClientSocket::onConnect error: %s. [%s %s %d]", strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
#endif
m_OnConnecting = false;
return -2;
}
}
void MySocketClient::disConnect()
{
m_OnConnect = false;
if (-1 != sock_fd)
{
#ifdef WIN32
closesocket(sock_fd);
#else
close(sock_fd);
#endif
sock_fd = -1;
}
}
int MySocketClient::reSetSocket()
{
disConnect();
return onConnect();
}
int MySocketClient::Read(RDClient &bufs)
{
try {
if (m_OnConnect)
{
char buf[256] = { 0 };
#ifdef WIN32
int re = recv(sock_fd, buf, 256, 0);
if (re <= 0)
{
int _error = GetLastError();
if (_error != 10060 && 0 != _error)
//if (_error != 10060)
{
CLogger::createInstance()->Log(eReadError,
"Read Datas Failed! ret(%d) error(%d)! [%s %s %d]"
, re, _error
, __FILE__, __FUNCTION__, __LINE__);
disConnect();
}
else
{
re = 0;
}
}
#else
//int re = read(sock_fd, buf, 256);
int re = recv(sock_fd, buf, 256, MSG_DONTWAIT);
if (re <= 0)
{
if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re = 0;
}
else
{
CLogger::createInstance()->Log(eReadError,
"Read Data Failed! error(%d,%d,%s)! [%s %s %d]"
, re, errno, strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
disConnect();
}
//static int index = 0;
//printf("..%d..%d,%s\n",index++,errno,strerror(errno));
}
#endif
if (re > 0)
{
/*
for(int j=0; j<re; j++){
printf("%02X ",buf[j]);
}
printf("\n");
*/
bufs.add((unsigned char*)buf, re);
return bufs.len;
}
return re;
}
else
{
printf("Read Data Failed!unconnect!");
return -2;
}
}
catch (...)
{
disConnect();
#ifdef WIN32
CLogger::createInstance()->Log(eSoftError,
"Read Data Failed!unknown error: %d! [%s %s %d]", static_cast<int>(GetLastError())
, __FILE__, __FUNCTION__, __LINE__);
#else
CLogger::createInstance()->Log(eSoftError,
"Read Data Failed!unknown error: %s! [%s %s %d]", strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
#endif
return -3;
}
};
int MySocketClient::Read(char* buf, int size)
{
try {
if (m_OnConnect)
{
#ifdef WIN32
//int re = m_CSocket->Receive(buf, size);
int re = recv(sock_fd, buf, size, 0);
if (re <= 0)
{
int _error = GetLastError();
if (_error != 10060 && 0 != _error)
//if (_error != 10060)
{
CLogger::createInstance()->Log(eReadError,
"Read Data Failed!ret(%d),error(%d)! [%s %s %d]"
, re, _error
, __FILE__, __FUNCTION__, __LINE__);
disConnect();
}
else
{
re = 0;
}
}
#else
//int re = read(sock_fd, buf, size);
int re = recv(sock_fd, buf, size, MSG_DONTWAIT);
if (re <= 0)
{
if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re = 0;
}
else
{
CLogger::createInstance()->Log(eReadError,
"Read Data Failed! error(%d,%d,%s)! [%s %s %d]"
,re , errno, strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
disConnect();
}
}
#endif
return re;
}
else
{
printf("Read Data Failed! unconnect! \n");
return -2;
}
}
catch (...)
{
disConnect();
#ifdef WIN32
CLogger::createInstance()->Log(eSoftError,
"Read Data Failed!unknown error: %d! [%s %s %d]", static_cast<int>(GetLastError())
, __FILE__, __FUNCTION__, __LINE__);
#else
CLogger::createInstance()->Log(eSoftError,
"Read Data Failed!unknown error: %s! [%s %s %d]", strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
#endif
return -3;
}
}
int MySocketClient::Write(const char* buf, int size)
{
try {
if (m_OnConnect)
{
#ifdef WIN32
int re = send(sock_fd, buf, size, 0);
if (re <= 0)
{
int _error = GetLastError();
if (_error != 10060 && 0 != _error)
{
CLogger::createInstance()->Log(eWriteError,
"Write Data Failed! ret(%d)! error(%d)! [%s %s %d]"
, re, _error
, __FILE__, __FUNCTION__, __LINE__);
disConnect();
}
}
#else
//int re = write(sock_fd, buf, size);
int re = send(sock_fd, buf, size, MSG_DONTWAIT);
if (re <= 0)
{
if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
usleep(10);
re = 0;
}
else
{
CLogger::createInstance()->Log(eWriteError,
"Write Data Failed! error(%d,%d,%s)! [%s %s %d]"
, re, errno, strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
disConnect();
}
}
#endif
return re;
}
else {
//CLogger::createInstance()->Log(eWriteError,
// "Write Data Failed! unconnect! [%s %s %d]\r\n"
// , __FILE__, __FUNCTION__, __LINE__);
printf("Write Data Failed! unconnect!");
if (!m_OnConnecting)
{
reSetSocket();
}
return -2;
}
}
catch (...)
{
disConnect();
#ifdef WIN32
CLogger::createInstance()->Log(eSoftError,
"Write Data Failed! unknown error: %d! [%s %s %d]", static_cast<int>(GetLastError())
, __FILE__, __FUNCTION__, __LINE__);
#else
CLogger::createInstance()->Log(eSoftError,
"Write Data Failed! unknown error: %d! [%s %s %d]", strerror(errno)
, __FILE__, __FUNCTION__, __LINE__);
#endif
return -3;
}
}
MySocketRD.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MYSOCKETRD_H_
#define _MYSOCKETRD_H_
/*
*从服务端获取数据
*/
#ifdef WIN32
#include "win32Thread.h"
#endif
#ifdef linux
#include "myThread.h"
#endif
#include "DataDef.h"
#include "queuedata.h"
class MySocketClient;
class MySocketRD : public MyThread
{
public:
MySocketRD(void);
virtual ~MySocketRD(void);
void setPrivateDataPtr(MySocketClient *myPData, int _netType=1);
int Run();
//从缓存中读取帧数据处理,请按需自行处理该函数
int AddFrame(const unsigned char *buf, int len);
private:
bool running;
int netType;//数据读写处理类型
MySocketClient *myPDataPrt;
QueueData<TCP_Data> ReadData;
};
#endif
MySocketRD.cpp
#include "MySocketRD.h"
#include "MySocketClient.h"
#include "myFunc.h"
#include "Log.h"
MySocketRD::MySocketRD(void)
: running(true)
, netType(1)
, myPDataPrt(NULL)
{
}
MySocketRD::~MySocketRD(void)
{
running = false;
}
void MySocketRD::setPrivateDataPtr(MySocketClient *myPData, int _netType)
{
netType = _netType;
if (NULL != myPData)
{
myPDataPrt = myPData;
}
}
int MySocketRD::Run()
{
if (NULL == myPDataPrt )
{
CLogger::createInstance()->Log(eSoftError,
"MySocketRD start fail for myPDataPrt is NULL"
",[%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
return 0;
}
RDClient rdc_data;
TCP_Data rddata;
while (running)
{
if (!myPDataPrt->isConnect())
{
myPDataPrt->reSetSocket();//read or write thread do it
if (!myPDataPrt->isConnect())
{
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
}
}
else
{
//读取帧数据
switch (netType)
{
case 1:
{
//直接读取,不用做分帧处理,ACSII字段
char buf[256] = { 0 };
int len = myPDataPrt->Read(buf, 256);
if (len > 0)
{
TCP_Data rdata((unsigned char*)buf, len);
ReadData.add(rdata);
}
//数据帧解析
if (ReadData.getFirst(rddata))
{
this->AddFrame(rddata.Buf, rddata.len);
ReadData.removeFirst();
}
}
break;
case 2:
{
//数据有特定帧头和结尾,做分帧处理
int ret = myPDataPrt->Read(rdc_data);
if (ret > 0)
{
//printf("read(%d) from pcs_server\n",ret);
unsigned char * buff = rdc_data.Buf;
int frame_start = 0;
unsigned char ctype = 0;
for (int i = 0; i < rdc_data.len; ++i)
{
//printf("%02X ",buff[i]);
if (buff[i] > 0xf0)
{
if (buff[i] == 0xff)
{
if (ctype)
{
ctype = 0;
// TCP_Data rdata(buff + frame_start, i - frame_start + 1);
unsigned char * pBuf = new unsigned char[i - frame_start + 1];
int nLen = PFunc::uncode(buff + frame_start, i - frame_start + 1, pBuf);//反序列化处理
TCP_Data rdata(pBuf, nLen);
ReadData.add(rdata);
frame_start = i + 1;
delete[] pBuf;
pBuf = NULL;
}
}
else
{
ctype = buff[i];
frame_start = i;
}
}
}
buff = NULL;
if (frame_start < rdc_data.len)
{
RDClient _newrd(rdc_data.Buf + frame_start, rdc_data.len - frame_start);
rdc_data = _newrd;
}
else
{
rdc_data.len = 0;
}
}
//数据帧解析
while (ReadData.getFirst(rddata))
{
this->AddFrame(rddata.Buf, rddata.len);
ReadData.removeFirst();
}
}
break;
default:
CLogger::createInstance()->Log(eSoftError,
"Exception for Reading and Parsing is undef NetType(%d),[%s %s %d]!"
, netType
, __FILE__, __FUNCTION__, __LINE__);
break;
}
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
int MySocketRD::AddFrame(const unsigned char *buf, int len)
{
if(NULL==buf)
return 0;
printf("rev:%s\r\n",(char*)buf);
// TcpSocket ts;
// memcpy(&ts,buf,len);
// printf("rev:%u,%d,%0.2f,%s\r\n",ts.type,ts.len,ts.val,ts.desc);
return 0;
};
MySocketWD.h
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _MYSOCKETWD_H_
#define _MYSOCKETWD_H_
/*
*从缓存采集数据,向服务端发送数据
*/
#ifdef WIN32
#include "win32Thread.h"
#endif
#ifdef linux
#include "myThread.h"
#endif
#include "DataDef.h"
#include "queuedata.h"
class MySocketClient;
class MySocketWD : public MyThread
{
public:
MySocketWD(void);
virtual ~MySocketWD(void);
void setPrivateDataPtr(MySocketClient *myPData, int _netType=1);
int Run();
int add_data(const char* buf, int len);
int getBuffer(unsigned char * _buf);
int getHeartBeatBuffer(unsigned char * buf);
private:
bool running;
int netType;//数据读写处理类型
unsigned int heartBeatWrite;
MySocketClient *myPDataPrt;
QueueData<TCP_Data> WriteData;
};
#endif
MySocketWD.cpp
#include "MySocketWD.h"
#include "MySocketClient.h"
#include "myFunc.h"
#include "Log.h"
#ifdef __linux__
#include <stdexcept>
#endif
#define heartBeat_interval 10
MySocketWD::MySocketWD(void)
: running(true)
, netType(1)
, heartBeatWrite(static_cast<unsigned int>(time(NULL)))
, myPDataPrt(NULL)
{
}
MySocketWD::~MySocketWD(void)
{
running = false;
}
void MySocketWD::setPrivateDataPtr(MySocketClient *myPData, int _netType)
{
netType = _netType;
if (NULL != myPData)
{
myPDataPrt = myPData;
}
}
int MySocketWD::Run()
{
if (NULL == myPDataPrt )
{
CLogger::createInstance()->Log(eSoftError,
"MySocketWD start fail for myPDataPrt or m_MonitorData is NULL"
",[%s %s %d]!"
, __FILE__, __FUNCTION__, __LINE__);
return 0;
}
while(running)
{
if (!myPDataPrt->isConnect())
{
myPDataPrt->reSetSocket();//read or write thread do it
if (!myPDataPrt->isConnect())
{
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
}
}
else {
//由读取进程去重新建立链接,写入线程只判定链接状态,进行数据写入
unsigned char buf[512] = { 0 };
int len = this->getBuffer(buf);
if (len <= 0 && (heartBeatWrite+heartBeat_interval)<static_cast<unsigned int>(time(NULL)))
{
len = this->getHeartBeatBuffer(buf);
}
if (len > 0) {
switch (netType)
{
case 1:
{
int ret = myPDataPrt->Write((const char*)buf, len);
if (ret != len) {
//printf("send data: %d, buf %d\n",len,ret);
CLogger::createInstance()->Log(eTipMessage,
"send point data: %d, buf %d. [%s %s %d]"
, len, ret
, __FILE__, __FUNCTION__, __LINE__);
}
else {
heartBeatWrite = static_cast<unsigned int>(time(NULL));
}
}
break;
case 2:
{
int cacheLen = 2 * len + 1;
unsigned char* _buf = new unsigned char[cacheLen];
memset(_buf, 0, cacheLen);
int nLen = PFunc::code(buf, len, _buf);//序列化处理
int ret = myPDataPrt->Write((const char*)_buf, nLen);
if (ret != nLen) {
//printf("send data: %d, buf %d\n",len,ret);
CLogger::createInstance()->Log(eTipMessage,
"send point data: %d, buf %d. [%s %s %d]"
, nLen, ret
, __FILE__, __FUNCTION__, __LINE__);
}
else {
heartBeatWrite = static_cast<unsigned int>(time(NULL));
}
delete[] _buf;
_buf = NULL;
}
break;
default:
CLogger::createInstance()->Log(eConfigError,
"Exception for Write data and unkown NetType(%d),[%s %s %d]!"
, netType
, __FILE__, __FUNCTION__, __LINE__);
break;
}
}
}
#ifdef WIN32
Sleep(1);
#else
usleep(1000);
#endif
}
return 0;
}
int MySocketWD::add_data(const char* buf, int len)
{
if(len>0&&NULL!=buf){
if(len>=512){
printf("buf len is >=512!\r\n");
}
else
{
TCP_Data rdata((unsigned char*)buf, len);
WriteData.add(rdata);
return len;
}
}
return 0;
}
int MySocketWD::getBuffer(unsigned char * _buf)
{
int ret = 0;
TCP_Data data;
if(WriteData.getFirst(data))
{
try{
if (!WriteData.removeFirst())
{
#ifdef WIN32
throw std::exception("removeFirst WData failed!");
#else
throw std::logic_error("removeFirst WData failed!");
#endif
}
memcpy(_buf,data.Buf,data.len);
ret = data.len;
printf("send:%s\r\n",_buf);
}
catch (const std::exception& e)
{
CLogger::createInstance()->Log(eSoftError,
"write item info to socket failed! have error[%s]. [%s %s %d]"
, e.what()
, __FILE__, __FUNCTION__, __LINE__);
ret = -1;
}
catch (...)
{
//printf_s("write item info to socket failed! have error. \r\n");
CLogger::createInstance()->Log(eSoftError,
"write item info to socket failed! have error. [%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
ret = -2;
}
}
return ret;
};
int MySocketWD::getHeartBeatBuffer(unsigned char * buf)
{
if (NULL != buf)
{
int idx = 0;
std::string cur_time_str = PFunc::getCurrentTime();
char buf_[64]={0};
sprintf(buf_,"HeartBeat:%s",cur_time_str.c_str());
idx = (int)strlen(buf_);
memcpy(buf,buf_,idx);
return idx;
}
else
{
return 0;
}
};
7.4 main.cpp
srv_test/main.cpp文章来源:https://www.toymoban.com/news/detail-574634.html
#include "MySocket.h"
#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#endif
const unsigned int PORT = 60008;
//log conf
char LOG_FILE_NAME[128] = "server_test.log";
std::string logdir = "log";
char SVCNAME[128] = "TCPServer_Srv";
int main(int argc, char *argv[])
{
MySocket server_test(70001,1);
char buf[]="hello, this is server!";
// TcpSocket ts;
// ts.len=(int)strlen(buf);
// memcpy(ts.desc,buf,ts.len);
// ts.type = 1;
// ts.val = 10.0;
while(1)
{
server_test.Write((const char*)buf,(int)strlen(buf));
// server_test.Write((const char*)&ts,(int)sizeof(TcpSocket));
#ifdef WIN32
Sleep(10000);
#else
usleep(10000000);
#endif
}
return 0;
}
/client_test/main.cpp文章来源地址https://www.toymoban.com/news/detail-574634.html
#include "MySocket.h"
#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#endif
//log conf
char LOG_FILE_NAME[128] = "server_test.log";
std::string logdir = "log";
char SVCNAME[128] = "TCPServer_Srv";
int main(int argc, char *argv[])
{
if(3!=argc)
{
printf("CMD prompt: client_test ip_addr port\r\n");
}
NetArg _netarg;
_netarg.ipStr = std::string(argv[1]);
_netarg.port = (int)atoi(argv[2]);
_netarg.type = 1;
MySocket client_test(1,_netarg);
char buf[]="hello, this is client 01!";
// TcpSocket ts;
// ts.len=(int)strlen(buf);
// memcpy(ts.desc,buf,ts.len);
// ts.type = 1;
// ts.val = 10.0;
while(1)
{
client_test.Write((const char*)buf,(int)strlen(buf));
// client_test.Write((const char*)&ts,(int)sizeof(TcpSocket));
#ifdef WIN32
Sleep(10000);
#else
usleep(10000000);
#endif
}
return 0;
}
到了这里,关于c/c++开发,c++无可避免的TCP/Socket通信开发实战案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!