#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <functional>
#include <future>
#include <algorithm>
const int MAX_CLIENTS = 10;
const int BUFFER_SIZE = 1024;
const int MAX_THREADS = 4;
struct EventData
{
int clientSocket;
};
class ThreadPool
{
public:
ThreadPool(size_t numThreads)
{
for (size_t i = 0; i < numThreads; ++i)
{
threads_.emplace_back([this]
{
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
//std::cout << "task(); " << std::endl;
task();
} });
}
}
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread &thread : threads_)
{
thread.join();
}
}
void Enqueue(std::function<void()> func)
{
{
std::unique_lock<std::mutex> lock(mutex_);
tasks_.emplace(std::move(func));
}
condition_.notify_one();
}
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable condition_;
bool stop_ = false;
};
class Reactor
{
public:
Reactor(ThreadPool &threadPool) : threadPool_(threadPool)
{
}
void Register(int clientSocket)
{
std::cout << "Register " << std::endl;
//std::lock_guard<std::mutex> lock(mutex_);
std::cout << "Register2 " << std::endl;
clientSockets_.push_back(clientSocket);
std::cout << "Socket " << clientSocket << " registered." << std::endl;
}
void Remove(int clientSocket)
{
std::lock_guard<std::mutex> lock(mutex_);
clientSockets_.erase(std::remove_if(clientSockets_.begin(), clientSockets_.end(),
[clientSocket](int socket)
{ return socket == clientSocket; }),
clientSockets_.end());
close(clientSocket);
FD_CLR(clientSocket, &readFds);
std::cout << "Socket " << clientSocket << " removed." << std::endl;
clientSocket = 0;
}
void Run(int mainSocket)
{
int maxFd = mainSocket;
while (true)
{
FD_ZERO(&readFds);
FD_SET(mainSocket, &readFds);
{
std::unique_lock<std::mutex> lock(mutex_);
for (int socket : clientSockets_)
{
if (socket > 0)
{
FD_SET(socket, &readFds);
maxFd = std::max(maxFd, socket);
}
}
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0.01;
int selectResult = select(maxFd + 1, &readFds, nullptr, nullptr, &timeout);
if (selectResult == -1)
{
perror("select");
return;
}
else if (selectResult == 0)
{
// 没有就绪套接字,继续事件循环
continue;
}
if (FD_ISSET(mainSocket, &readFds))
{
// 有新的连接请求
struct sockaddr_in clientAddress;
socklen_t clientAddressLength = sizeof(clientAddress);
int clientSocket = accept(mainSocket, (struct sockaddr *)&clientAddress, &clientAddressLength);
if (clientSocket == -1)
{
if (errno == EWOULDBLOCK)
{
// 没有新连接,继续事件循环
continue;
}
else
{
perror("accept");
break;
}
}
else
{
std::cout << "Accepted new connection." << std::endl;
// 将客户端套接字添加到客户端套接字数组
Register(clientSocket);
std::cout << "Register1 " << std::endl;
}
}
// for (int i = 0; i < MAX_CLIENTS; ++i)
// {
// if (FD_ISSET(clientSockets[i], &readFds))
// {
// readySockets.push_back(clientSockets[i]);
// }
// }
}
for (int socket : clientSockets_)
{
if (FD_ISSET(socket, &readFds))
{
threadPool_.Enqueue([this, socket]()
{
EventData eventData;
eventData.clientSocket = socket;
ProcessEvent(eventData); });
}
}
}
}
private:
void ProcessEvent(EventData eventData)
{
// 处理事件,这里只是简单示例,回传客户端的数据
char buffer[BUFFER_SIZE];
memset(buffer, 0, sizeof(BUFFER_SIZE));
ssize_t bytesRead = recv(eventData.clientSocket, buffer, BUFFER_SIZE, 0);
if (bytesRead > 0)
{
send(eventData.clientSocket, buffer, bytesRead, 0);
}
else if (bytesRead == 0 || (bytesRead == -1 && errno != EWOULDBLOCK))
{
// 连接关闭或出错,移除客户端
Remove(eventData.clientSocket);
}
// 通知事件已处理
std::cout << "Processed socket " << eventData.clientSocket << " event data " << buffer << "in Thread: " << std::this_thread::get_id() << std::endl;
}
private:
ThreadPool &threadPool_;
std::mutex mutex_;
std::vector<int> clientSockets_;
fd_set readFds;
};
class Server
{
public:
Server(ThreadPool &threadPool) : reactor_(threadPool)
{
}
bool Init(int port)
{
mainSocket_ = socket(AF_INET, SOCK_STREAM, 0);
if (mainSocket_ == -1)
{
std::cerr << "creat socket err" << std::endl;
return false;
}
int opt = 1;
if (setsockopt(mainSocket_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1)
{
std::cerr << "setsockopt err" << std::endl;
close(mainSocket_);
return false;
}
int flags = fcntl(mainSocket_, F_GETFL, 0);
if (flags == -1)
{
std::cerr << "Error getting socket flags." << std::endl;
close(mainSocket_);
return false;
}
if (fcntl(mainSocket_, F_SETFL, flags | O_NONBLOCK) == -1)
{
std::cerr << "Error setting socket to non-blocking mode." << std::endl;
close(mainSocket_);
return false;
}
struct sockaddr_in serverAddress;
serverAddress.sin_family = AF_INET;
serverAddress.sin_addr.s_addr = INADDR_ANY;
serverAddress.sin_port = htons(port);
if (bind(mainSocket_, (struct sockaddr *)&serverAddress, sizeof(serverAddress)) == -1)
{
perror("bind");
return false;
}
if (listen(mainSocket_, MAX_CLIENTS) == -1)
{
perror("listen");
return false;
}
port_ = port;
std::cout << "server init ok, listening on port: " << port_ << "。" << std::endl;
return true;
}
void Run()
{
std::thread reactorThread([&]()
{ reactor_.Run(mainSocket_); });
// 等待Reactor线程结束
reactorThread.join();
}
private:
int mainSocket_;
int port_;
Reactor reactor_;
};
int main(int argc, char *argv[])
{
if (argc < 2)
{
std::cerr << "please input the port of server。" << std::endl;
return -1;
}
int port = atoi(argv[1]);
ThreadPool threadPool(MAX_THREADS); // 创建线程池
Server server(threadPool); // 将线程池传递给服务器构造函数
if (!server.Init(port))
{
std::cerr << "int server failed :" << port << std::endl;
return 1;
}
server.Run();
return 0;
}
- 调试: Linux下nc命令作为客户端:
nc 127.0.0.1 7777
文章来源地址https://www.toymoban.com/news/detail-694081.html
文章来源:https://www.toymoban.com/news/detail-694081.html
到了这里,关于服务器IO复用reactor模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!