Springboot整合Netty实现RPC服务器

这篇具有很好参考价值的文章主要介绍了Springboot整合Netty实现RPC服务器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

try {

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(boss, worker)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new IdleStateHandler(0, 0, 60));

pipeline.addLast(new JsonDecoder());

pipeline.addLast(new JsonEncoder());

pipeline.addLast(new RpcInboundHandler(rpcServices));

}

})

.channel(NioServerSocketChannel.class);

ChannelFuture future = bootstrap.bind(port).sync();

log.info(“RPC 服务器启动, 监听端口:” + port);

future.channel().closeFuture().sync();

}catch (Exception e){

e.printStackTrace();

boss.shutdownGracefully();

worker.shutdownGracefully();

}

}).start();

}

}

RpcServerInboundHandler 负责处理RPC请求

@Slf4j

public class RpcServerInboundHandler extends ChannelInboundHandlerAdapter {

private Map<String, Object> rpcServices;

public RpcServerInboundHandler(Map<String, Object> rpcServices){

this.rpcServices = rpcServices;

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info(“客户端连接成功,{}”, ctx.channel().remoteAddress());

}

public void channelInactive(ChannelHandlerContext ctx) {

log.info(“客户端断开连接,{}”, ctx.channel().remoteAddress());

ctx.channel().close();

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg){

RpcRequest rpcRequest = (RpcRequest) msg;

log.info(“接收到客户端请求, 请求接口:{}, 请求方法:{}”, rpcRequest.getClassName(), rpcRequest.getMethodName());

RpcResponse response = new RpcResponse();

response.setRequestId(rpcRequest.getRequestId());

Object result = null;

try {

result = this.handleRequest(rpcRequest);

response.setResult(result);

} catch (Exception e) {

e.printStackTrace();

response.setSuccess(false);

response.setErrorMessage(e.getMessage());

}

log.info(“服务器响应:{}”, response);

ctx.writeAndFlush(response);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

log.info(“连接异常”);

ctx.channel().close();

super.exceptionCaught(ctx, cause);

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent){

IdleStateEvent event = (IdleStateEvent)evt;

if (event.state()== IdleState.ALL_IDLE){

log.info(“客户端已超过60秒未读写数据, 关闭连接.{}”,ctx.channel().remoteAddress());

ctx.channel().close();

}

}else{

super.userEventTriggered(ctx,evt);

}

}

private Object handleRequest(RpcRequest rpcRequest) throws Exception{

Object bean = rpcServices.get(rpcRequest.getClassName());

if(bean == null){

throw new RuntimeException("未找到对应的服务: " + rpcRequest.getClassName());

}

Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());

method.setAccessible(true);

return method.invoke(bean, rpcRequest.getParameters());

}

}

四、RPC客户端

========

/**

  • RPC远程调用的客户端

*/

@Slf4j

@Component

public class RpcClient {

@Value(“${rpc.remote.ip}”)

private String remoteIp;

@Value(“${rpc.remote.port}”)

private int port;

private Bootstrap bootstrap;

// 储存调用结果

private final Map<String, SynchronousQueue> results = new ConcurrentHashMap<>();

public RpcClient(){

}

@PostConstruct

public void init(){

bootstrap = new Bootstrap().remoteAddress(remoteIp, port);

NioEventLoopGroup worker = new NioEventLoopGroup(1);

bootstrap.group(worker)

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel channel) throws Exception {

ChannelPipeline pipeline = channel.pipeline();

pipeline.addLast(new IdleStateHandler(0, 0, 10));

pipeline.addLast(new JsonEncoder());

pipeline.addLast(new JsonDecoder());

pipeline.addLast(new RpcClientInboundHandler(results));

}

});

}

public RpcResponse send(RpcRequest rpcRequest) {

RpcResponse rpcResponse = null;

rpcRequest.setRequestId(UUID.randomUUID().toString());

Channel channel = null;

try {

channel = bootstrap.connect().sync().channel();

log.info(“连接建立, 发送请求:{}”, rpcRequest);

channel.writeAndFlush(rpcRequest);

SynchronousQueue queue = new SynchronousQueue<>();

results.put(rpcRequest.getRequestId(), queue);

// 阻塞等待获取响应

rpcResponse = queue.take();

results.remove(rpcRequest.getRequestId());

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

if(channel != null && channel.isActive()){

channel.close();

}

}

return rpcResponse;

}

}

RpcClientInboundHandler负责处理服务端的响应

@Slf4j

public class RpcClientInboundHandler extends ChannelInboundHandlerAdapter {

private Map<String, SynchronousQueue> results;

public RpcClientInboundHandler(Map<String, SynchronousQueue> results){

this.results = results;

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

RpcResponse rpcResponse = (RpcResponse) msg;

log.info(“收到服务器响应:{}”, rpcResponse);

if(!rpcResponse.isSuccess()){

throw new RuntimeException(“调用结果异常,异常信息:” + rpcResponse.getErrorMessage());

}

// 取出结果容器,将response放进queue中

SynchronousQueue queue = results.get(rpcResponse.getRequestId());

queue.put(rpcResponse);

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent){

IdleStateEvent event = (IdleStateEvent)evt;

if (event.state() == IdleState.ALL_IDLE){

log.info(“发送心跳包”);

RpcRequest request = new RpcRequest();

request.setMethodName(“heartBeat”);

ctx.channel().writeAndFlush(request);

}

}else{

super.userEventTriggered(ctx, evt);

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){

log.info(“异常:{}”, cause.getMessage());

ctx.channel().close();

}

}

接口代理

为了使客户端像调用本地方法一样调用远程服务,我们需要对接口进行动态代理。

代理类实现

@Component

public class RpcProxy implements InvocationHandler {

@Autowired

private RpcClient rpcClient;

@Override

public Object invoke(Object proxy, Method method, Object[] args){

RpcRequest rpcRequest = new RpcRequest();

rpcRequest.setClassName(method.getDeclaringClass().getName());

rpcRequest.setMethodName(method.getName());

rpcRequest.setParameters(args);

rpcRequest.setParameterTypes(method.getParameterTypes());

RpcResponse rpcResponse = rpcClient.send(rpcRequest);

return rpcResponse.getResult();

}

}

实现FactoryBean接口,将生产动态代理类纳入 Spring 容器管理。

public class RpcFactoryBean implements FactoryBean {

private Class interfaceClass;

@Autowired

private RpcProxy rpcProxy;

public RpcFactoryBean(Class interfaceClass){

this.interfaceClass = interfaceClass;

}

@Override

public T getObject(){

return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, rpcProxy);

}

@Override

public Class<?> getObjectType() {

return interfaceClass;

}

}

自定义类路径扫描器,扫描包下的RPC接口,动态生产代理类,纳入 Spring 容器管理

public class RpcScanner extends ClassPathBeanDefinitionScanner {

public RpcScanner(BeanDefinitionRegistry registry) {

super(registry);

}

@Override

protected Set doScan(String… basePackages) {

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。Springboot整合Netty实现RPC服务器,程序员,spring boot,rpc,服务器

Springboot整合Netty实现RPC服务器,程序员,spring boot,rpc,服务器

Springboot整合Netty实现RPC服务器,程序员,spring boot,rpc,服务器

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

Springboot整合Netty实现RPC服务器,程序员,spring boot,rpc,服务器

最后

Springboot整合Netty实现RPC服务器,程序员,spring boot,rpc,服务器

《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!
UkOm-1712205652089)]

[外链图片转存中…(img-g5loqjXc-1712205652090)]

[外链图片转存中…(img-1Z8zI5Qe-1712205652090)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

Springboot整合Netty实现RPC服务器,程序员,spring boot,rpc,服务器

最后

[外链图片转存中…(img-iFKL84EQ-1712205652091)]

《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!文章来源地址https://www.toymoban.com/news/detail-844920.html

到了这里,关于Springboot整合Netty实现RPC服务器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用

    🏷️ 个人主页 :鼠鼠我捏,要死了捏的主页  🏷️ 系列专栏 :Golang全栈-专栏 🏷️ 个人学习笔记,若有缺误,欢迎评论区指正   前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站AI学习网站。 目录 前言 快速上手

    2024年02月19日
    浏览(40)
  • springboot整合feign实现RPC调用,并通过Hystrix实现服务降级

    feign/openfeign和dubbo是常用的微服务RPC框架,由于feigin内部已经集成ribbon,自带了负载均衡的功能,当有多个同名的服务注册到注册中心时,会根据ribbon默认的负载均衡算法将请求分配到不同的服务。这篇文章就简单介绍一下怎么使用feign来调用远程的服务。 首先,需要有一个

    2024年02月16日
    浏览(50)
  • 基于Netty实现一个HTTP服务器

    一、序言 Netty因其易编程,高可靠性,高性能的网络IO,在分布式开发中被广泛用于网络通信,比如RocketMQ,Dubbo底层都能看到Netty的身影,高性能的本质是其Reactor线程模型以及异步的编程处理。Reactor有三种模型,常用的有主从 Reactor多线程模式,具体表现如下: 在日常开发中

    2023年04月25日
    浏览(53)
  • 用netty轻松实现一个高效稳定的TCP服务器

              随着物联网的发展,很多项目都开始涉及到了tcp连接这块,在这里我们轻松用netty去实现,站在巨人的肩膀上。 关于netty包引用: 实现TCP服务器代码 依赖netty只需几行代码tcp服务:  业务处理代码(参考) 以下是处理报文业务类可参考,注意代码未优化:  运行

    2024年02月22日
    浏览(45)
  • 服务器部署整合了elasticsearch的springboot项目后报错

            今天在服务器上面更新自己的项目的时候报错了 报错太长了,我提炼了一下,主要是说bean注入失败,各种service和controller全都寄了,后来看到里面有个elasticsearchRepository,又因为刚整合了elasticsearch,所以基本上可以确定问题就是出在elasticsearch上。         这

    2024年02月05日
    浏览(89)
  • Java使用Netty实现端口转发&Http代理&Sock5代理服务器

    这里总结整理了之前使用Java写的端口转发、Http代理、Sock5代理程序,放在同一个工程中,方便使用。 开发语言:Java 开发框架:Netty 端口转发: HTTP代理服务器,支持账号密码认证 Sock5代理服务器,支持账号密码认证 支持连接后端时直接连接或采用代理连接,也后端代理连接认

    2024年01月25日
    浏览(55)
  • 整合封装服务器模块设计实现

    服务器模块,是对当前所实现的所有模块的⼀个整合,并进⾏服务器搭建的⼀个模块,最终封装实现出⼀个gobang_server的服务器模块类,向外提供搭建五⼦棋对战服务器的接⼝。通过实例化的对象可以简便的完成服务器的搭建。 首先,我将采用websocketpp来搭建服务器,那么需要

    2024年02月13日
    浏览(41)
  • Springboot3.X整合Dubbo3.XSpringCloudAlibaba微服务 2022.0 + Springboot3.X 集成 Dubbo实现对外调用http内部调用RPC

    近期自己新开了一套SpringCloud Alibaba微服务项目,接口使用了对外HTTP,内部RPC的设计,具体点说就是外部用户或客户端通过Nginx访问到Gateway网关再分发到各个服务,内部各个服务之间统一使用Dubbo RPC进行通信。下面是Springboot3.x集成Dubbo的分享: 1. 需要的关键依赖 2. 启动程序入

    2024年02月15日
    浏览(37)
  • Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发

    前言:本文通过springBoot -maven 框架,对Rabbitmq 进行整合,完成客户端消息的发送和消费; 1 为什么要使用Rabbitmq: RabbitMQ 是一个可靠的、灵活的、开源的消息中间件,具有以下优点: 异步通信:RabbitMQ 支持异步通信,使得消息发送者和接收者能够异步处理,提高了系统性能和

    2024年02月07日
    浏览(57)
  • 如何用JAVA实现一款高可用的TCP数据传输服务器——【基于netty4.x】

    震惊!这可能是我与底层最接近的一次编程体验 首先netty是一款高性能、封装性良好且灵活、基于NIO(真·非阻塞IO)的开源框架。可以用来手写web服务器、TCP服务器等,支持的协议丰富,如:常用的HTTP/HTTPS/WEBSOCKET,并且提供的大量的方法,十分灵活,可以根据自己的需求量身

    2024年01月19日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包