1. 本程序功能
1) 要有完整的request 和 response;
2) 支持多进程并行处理任务;
3)子进程任务结束后无僵尸进程
2.Apache Thrift C++库的编译和安装
见
步步详解:Apache Thrift C++库从编译到工作模式DEMO_北雨南萍的博客-CSDN博客
3.框架生成
数据字段定义:
cat Datainfo.thrift
# Datainfo.thrift
struct message
{
1:i32 seqId,
2:string content
}
struct response
{
1:i32 seqId,
2:string content
}
service serDemo
{
response put(1:message msg)
}
thrift -gen cpp Datainfo.thrift
4.Server端源码
serDemo_server.skeleton.cpp
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
#include <stdexcept>
#include <thread>
#include "serDemo.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::concurrency;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
void handler(int num) {
//我接受到了SIGCHLD的信号啦
int status;
int pid = waitpid(-1,&status,WNOHANG);
if(WIFEXITED(status)) {
printf("The child exit with code %d\n",WEXITSTATUS(status));
}
}
class serDemoHandler : virtual public serDemoIf {
public:
serDemoHandler() {
// 构造函数:用于初始任务处理句柄对象
}
void put(response& _return, const message& msg) {
// Your implementation goes here
printf("put starting ...\n");
printf("receive message: id: %d, content: %s\n", msg.seqId, msg.content.c_str());
char* ffmpeg_argv[]={"/ffmpeg",
"-i",
"/opt/videoroom-1234-user-161756615651626-1615963571436156-video.mp4",
"-t",
"60",
"-vcodec",
"libx264",
"-f",
"flv",
"-y",
"/opt/videoroom-1234-user-161756615651626-1615963571436156-video.flv",
NULL};
printf("ffmpeg_argv:\n");
for (int i = 0; ffmpeg_argv[i] != NULL; i++) {
printf("%s ", ffmpeg_argv[i]);
}
printf("\n");
pid_t pid = fork();
if (pid == 0) {
execvp(ffmpeg_argv[0], ffmpeg_argv);
exit(0);
} else if (pid > 0) {
printf("This is the parent process\n");
} else {
printf("Fork failed\n");
}
// 回复消息
_return.seqId = msg.seqId;
_return.content = "This is the response message.";
printf("put stopped.\n");
}
};
int main(int argc, char **argv) {
int port = 19090;
signal(SIGCHLD,handler);
/*** TThreadPoolServer工作模式 ***/
try {
//创建一个线程管理器
std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(16);
std::shared_ptr<ThreadFactory> threadFactory = std::shared_ptr<ThreadFactory>(new ThreadFactory());
threadManager->threadFactory(threadFactory);
threadManager->start();
std::shared_ptr<serDemoHandler> handler(new serDemoHandler());
std::shared_ptr<TProcessor> processor(new serDemoProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TThreadPoolServer server(processor,
serverTransport,
transportFactory,
protocolFactory,
threadManager);
// 启动服务
::std::cout << "Starting the TThreadPoolServer ..." << std::endl;
server.serve();
::std::cout << "Server stopped." << std::endl;
} catch (std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
return 0;
}
编译生成server
g++ -std=c++11 -g -Wall Datainfo_types.cpp serDemo.cpp serDemo_server.skeleton.cpp -o server -lthrift -lpthread
5. client端源码
client.cpp
// 在同级目录下创建 client.cpp 文件
// ----------替换成自己的头文件----------
#include "serDemo.h"
// --------------------------------------
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using boost::shared_ptr;
int main(int argc, char **argv) {
/* 不能用localhost, 否则会有运行时提示:"
* TSocket::open() connect() <Host: localhost Port: 19090>: Connection refused
*/
//std::shared_ptr<TSocket> socket(new TSocket("localhost", 19090));
std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 19090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
serDemoClient client(protocol);
transport->open();
// ----------------------------我们的代码写在这里------------------------------
message msg;
msg.seqId = 1;
msg.content = "client message";
response response_put;
client.put(response_put, msg);
printf("seqId: %d, content: %s\n", response_put.seqId, response_put.content.c_str());
//--------------------------------------------------------------------------
transport->close();
return 0;
}
编译生成client文章来源:https://www.toymoban.com/news/detail-626714.html
g++ -std=c++11 -g -Wall Datainfo_types.cpp serDemo.cpp client.cpp -o client -lthrift -lpthread文章来源地址https://www.toymoban.com/news/detail-626714.html
到了这里,关于Apache Thrift C++库的TThreadPoolServer模式的完整示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!