Spring Integration Ip 一个好用的TCP/UDP开发框架

这篇具有很好参考价值的文章主要介绍了Spring Integration Ip 一个好用的TCP/UDP开发框架。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

基于我的上一篇文章《Spring Integration超详细解读》,相信大家对Spring Integration已经有了基本的认识。

因此本文中,着重讲解Spring Integration Ip的实际应用。

导入依赖

如果是在POM中,则导入以下依赖(由于spring-integration-ip实际会引入spring-integration的依赖,因此无需再添加相关的依赖了)

<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-ip</artifactId> 
    <version>5.5.18</version> 
</dependency>

如果是在gradle中,则导入以下依赖

compile "org.springframework.integration:spring-integration-ip:5.5.18"

配置之前

Spring Integration Ip提供了对TCP和UDP的支持。

其中TcpSendingMessageHandler类用于发送TCP消息,TcpReceivingChannelAdapter类用于配置接收TCP消息的相关参数。

而UnicastSendingMessageHandler类用于发送UDP单播消息,UnicastReceivingChannelAdapter类用于配置接收UDP单播消息的相关参数。

另外,MulticastSendingMessageHandler类用于发送UDP组播消息,MulticastReceivingChannelAdapter类用于配置接收UDP组播消息的相关参数。

开始配置

UDP

以UDP为例,官方支持三种配置方式,XML配置方式、Java配置方式、Java DSL配置方式

UDP Outbound

UDP Outbound(UDP 出站)就是udp发送消息。以单播为例。

XML方式
<int-ip:udp-outbound-channel-adapter id="udpOut" 
    host="somehost" 
    port="11111" 
    multicast="false" 
    socket-customizer="udpCustomizer" 
    channel="exampleChannel"/>
Java方式
@Bean
public UnicastSendingMessageHandler handler() {
    return new UnicastSendingMessageHandler("localhost", 11111); 
}
Java DSL方式
@Bean
public IntegrationFlow udpOutFlow() {
    return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
                    .configureSocket(socket -> socket.setTrafficClass(0x10)))
                .get();
}

UDP Inbound

UDP Inbound(UDP 入站)就是udp接收消息。以独播为例。

XML方式
<int-ip:udp-inbound-channel-adapter id="udpReceiver" 
    channel="udpOutChannel" 
    port="11111" 
    receive-buffer-size="500" 
    multicast="false" 
    socket-customizer="udpCustomizer" 
    check-length="true"/>
Java方式
@Bean
public UnicastReceivingChannelAdapter udpIn() {
    UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
    adapter.setOutputChannelName("udpChannel");
    return adapter;
}
Java DSL方式
@Bean
public IntegrationFlow udpIn() {
    return IntegrationFlows.from(Udp.inboundAdapter(11111))
            .channel("udpChannel")
            .get();
}

项目实战

本文以UDP为例,分别配置客户端与服务端

客户端

定义用于配置UDP参数的类型

定义基础UDP参数类型

@Data
@NoArgsConstructor
@AllArgsConstructor
public class BaseUdpProp {
    private String host;
    private Integer port;
}

定义用于配置UDP单播入站参数的类型

@Component
@ConfigurationProperties(prefix = "custom.udp.unicast.inbound")
public class UnicastInboundUdpProp extends BaseUdpProp {

}

定义用于配置UDP单播出站参数的类型

@Component
@ConfigurationProperties(prefix = "custom.udp.unicast.outbound")
public class UnicastOutboundUdpProp extends BaseUdpProp {

}

定义用于配置UDP组播出站参数的类型

@Component
@ConfigurationProperties(prefix = "custom.udp.multicast.outbound")
public class MulticastOutboundUdpProp extends BaseUdpProp {

}

自定义的命令

public class ClusterCommand {
    public static final String FINDING_COMMAND = "Finding cluster";
    public static final String JOINING_COMMAND = "Joining cluster";
    public static final String RESET_COMMAND = "Reset node";
}

配置UDP单播入站

配置了Message Router和Transformer消息组件,用来实现按内容进行不同通道分发的业务逻辑

@Configuration
public class UnicastInboundUdpConfig {

    @Resource
    private UnicastInboundUdpProp prop;
    @Resource
    private ObjectMapper objectMapper;
    
    /**
     * 用于配置UDP单播的适配器,对应前文中提到的Channel Adapter的概念。
     * 该实例创建后,会自动产生对于目标端口的监听线程
     * 这里我们只定义了通道的名称,没有创建通道,所以会使用默认的DirectChannel,
     * 如果希望自定义通道,则可以调用adapter.setOutputChannel方法
     *
     * @author huangji
    */
    @Bean
    public UnicastReceivingChannelAdapter unicastAdapter() {
        UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(prop.getPort());
        adapter.setOutputChannelName("UnicastInboundUdpChannel");

        return adapter;
    }

    /**
     * 填入inputChannel,用来接收从apdater输出的数据,
     * 之后根据我们定义的判断逻辑,来分发给对应的目标通道
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @param headers 加了@Headers注解,会被通道自动注入消息头
     * @return 返回通道名称,即最终分发的目标通道的名称
     * @author huangji
     */
    @Router(inputChannel = "UnicastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received unicast inbound from " + host);
        System.out.println("content is " + payload);
        try {
            objectMapper.readValue(payload, new TypeReference<List<String>>() {
            });
            return "NodeNameListChannel";
        } catch (JsonProcessingException ignored) {

        }
        try {
            objectMapper.readValue(payload, NodeJoiningParameter.class);
            return "JoiningParameterChannel";
        } catch (JsonProcessingException ignored) {

        }
        if (payload.equals(ClusterCommand.RESET_COMMAND)) {
            return "JoinedListeningChannel";
        }


        return "OtherChannel";
    }

    /**
     * 这里用@Transformer注解定义了一个转换器组件,将负载类型转化成了目标类型
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @return 返回转化后的结果List<String>
     * @throws JsonProcessingException
     * @author huangji
     */
    @Transformer(inputChannel = "NodeNameListChannel", outputChannel = "NodeNameTransformerChannel")
    public List<String> nodeNameListTransformer(String payload) throws JsonProcessingException {
        return objectMapper.readValue(payload, new TypeReference<>() {
        });
    }

    /**
     * 这里用@Transformer注解定义了一个转换器组件,将负载类型转化成了目标类型
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @return 返回转化后的结果
     * @throws JsonProcessingException
     * @author huangji
     */
    @Transformer(inputChannel = "JoiningParameterChannel", outputChannel = "ParameterTransformerChannel")
    public NodeJoiningParameter parameterTransformer(String payload) throws JsonProcessingException {
        return objectMapper.readValue(payload, NodeJoiningParameter.class);
    }
}

配置消息最终接收的通道

在这里进行自身业务逻辑的处理

    @ServiceActivator(inputChannel = "NodeNameTransformerChannel")
    public void nodeInfoListHandler(List<String> list, @Headers Map<String, Object> headers) {
          ...
    }

    @ServiceActivator(inputChannel = "ParameterTransformerChannel")
    public void joiningParameterHandler(NodeJoiningParameter parameter, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        String udpHost = udpConfig.getMessageHandler().getHost();
        if (host.equals(udpHost)) {
           ...
        }
    }

    @ServiceActivator(inputChannel = "JoinedListeningChannel")
    public void joinedListeningHandler(@Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        String udpHost = udpConfig.getMessageHandler().getHost();
        if (host.equals(udpHost)) {
           ...
        }
    }

    @ServiceActivator(inputChannel = "OtherChannel")
    public void otherHandler(String payload) {
        System.out.println("other payload: " + payload);
    }

发送消息

有效负载的内容在代码中必须是String类型或者byte[]类型,否则会在发送消息时抛出异常

发送UDP广播消息

MulticastSendingMessageHandler multicastHandler= SpringUtil.getBean("multicastHandler");

multicastHandler.handleMessage(MessageBuilder.withPayload(ClusterCommand.FINDING_COMMAND).build());

发送UDP单播消息

UnicastSendingMessageHandler messageHandler = outboundUdp.getMessageHandler();

messageHandler.handleMessage(MessageBuilder.withPayload(ClusterCommand.JOINING_COMMAND).build());

服务端

配置UDP组播入站

@Configuration
public class MulticastInboundUdpConfig {
    @Value("${custom.udp.multicast.inbound.host}")
    private String host;
    @Value("${custom.udp.multicast.inbound.port}")
    private Integer port;

    /**
     * 与单播的配置类似,这里也是配置组播的ChannelAdapter
     *
    */
    @Bean
    public MulticastReceivingChannelAdapter multicastAdapter() {
        MulticastReceivingChannelAdapter adapter = new MulticastReceivingChannelAdapter(host, port);
        adapter.setOutputChannelName("MulticastInboundUdpChannel");

        return adapter;
    }

    /**
     * 这里也是配置消息的Router组件
     *
    */
    @Router(inputChannel = "MulticastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String remoteHost = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received multicast inbound from " + remoteHost);
        System.out.println("content is " + payload);
        if (payload.equals(ClusterCommand.FINDING_COMMAND)) {
            return "FindingChannel";
        }

        return "OtherChannel";
    }
}

配置UDP单播入站

@Configuration
public class UnicastInboundUdpConfig {
    @Value("${custom.udp.unicast.inbound.port}")
    private Integer port;

    @Bean
    public UnicastReceivingChannelAdapter unicastAdapter() {
        UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(port);
        adapter.setOutputChannelName("UnicastInboundUdpChannel");

        return adapter;
    }

    @Router(inputChannel = "UnicastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received unicast inbound from " + host);
        if (payload.equals(ClusterCommand.JOINING_COMMAND)) {
            return "JoiningChannel";
        }

        return "OtherChannel";
    }
}

配置消息最终接收的通道

    @ServiceActivator(inputChannel = "FindingChannel")
    public void listenFindingHandler(@Headers Map<String, Object> headers) throws JsonProcessingException {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        UnicastSendingMessageHandler messageHandler = new UnicastSendingMessageHandler(host, udpOutboundPort);
        String payload = objectMapper.writeValueAsString(fetchNodeNameList());
        messageHandler.handleMessage(MessageBuilder.withPayload(payload).build());
    }

    @ServiceActivator(inputChannel = "JoiningChannel")
    public void listenJoiningHandler(@Headers Map<String, Object> headers) throws IOException, InterruptedException {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        UnicastSendingMessageHandler messageHandler = new UnicastSendingMessageHandler(host, udpOutboundPort);
       ...
    }

    @ServiceActivator(inputChannel = "OtherChannel")
    public void otherHandler(String payload) {
        System.out.println("other payload: " + payload);
    }

附录

消息头

Spring Integration Ip包含以下的头部信息,可以根据需要去获取自己想要的头部信息。文章来源地址https://www.toymoban.com/news/detail-792712.html

Header Name IpHeaders Constant Description
ip_hostname HOSTNAME The host name from which a TCP message or UDP packet was received. If lookupHost is false, this contains the IP address.
ip_address IP_ADDRESS The IP address from which a TCP message or UDP packet was received.
ip_port PORT The remote port for a UDP packet.
ip_localInetAddress IP_LOCAL_ADDRESS The local InetAddress to which the socket is connected (since version 4.2.5).
ip_ackTo ACKADDRESS The remote IP address to which UDP application-level acknowledgments are sent. The framework includes acknowledgment information in the data packet.
ip_ackId ACK_ID A correlation ID for UDP application-level acknowledgments. The framework includes acknowledgment information in the data packet.
ip_tcp_remotePort REMOTE_PORT The remote port for a TCP connection.
ip_connectionId CONNECTION_ID A unique identifier for a TCP connection. Set by the framework for inbound messages. When sending to a server-side inbound channel adapter or replying to an inbound gateway, this header is required so that the endpoint can determine the connection to which to send the message.
ip_actualConnectionId ACTUAL_CONNECTION_ID For information only. When using a cached or failover client connection factory, it contains the actual underlying connection ID.
contentType MessageHeaders. CONTENT_TYPE An optional content type for inbound messages Described after this table. Note that, unlike the other header constants, this constant is in the MessageHeaders class, not the IpHeaders class.

到了这里,关于Spring Integration Ip 一个好用的TCP/UDP开发框架的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • TCP/IP、UDP和TELNET

           TCP/IP是一个Protocol Stack,包括TCP、IP、UDP、ICMP、RIP、TELNET、FTP、SMTP、ARP等许多协议。        最早发源于1969年美国国防部(缩写为DoD)的因特网的前身ARPA网络项目,1983年1月1日,TCP/IP取代了旧的网络控制协议NCP,成为今天的互联网和局域网的基石和标准,由互联网工

    2024年01月25日
    浏览(46)
  • HTTP/UDP/TCP/IP网络协议

    OSI模型定义了网络互连的七层框架(物理层、数据链路层、网络层、传输层、会话层、表示层、应用层),每一层实现各自的功能和协议,并完成与相邻层的接口通信。OSI模型各层的通信协议,大致举例如下表所示: 层次 常见协议 应用层 HTTP、SMTP、SNMP、FTP、Telnet、SIP、SS

    2024年04月11日
    浏览(43)
  • TCP、UDP、IP、RTP头长度

    各种协议的数据包头长度如下: UDP 头(8 字节) 源端口(2 字节) 目标端口(2 字节) UDP 数据包长度(2 字节) 校验和(2 字节) RTP 头(12 字节或者24字节) 版本号(2位) 填充位(1位) 扩展位(1位) CSRC计数器(4位) 标记位(1位) 负载类型(7位) 序列号(16位) 时

    2024年02月11日
    浏览(44)
  • TCP/IP UDP广播无法发送或接收

    在看《TCP/IP 网络编程》这本书的时候,看到广播那一节,跟着书上写代码,怎么写都不行,广播就是没法发送/接收,发送端一直在发送数据,接收端就是没有反应。 对了好几遍源码,没有问题。实在是愁人。 最后查了很多资料,确定是网卡的问题。 现在的计算机都是多网

    2024年02月04日
    浏览(51)
  • SCTP, TCP, UDP, IP, ICMP都在哪一层?(TCP/IP网络通信协议学习)

    TCP/IP网络通信协议最早是由 罗伯特·卡恩 (Robert E. Kahn)和 文顿·瑟夫 (Vinton G. Cerf)于1972年提出的,它是一个实际的协议栈。 OSI七层网络通信协议最早是 由国际标准化组织 (ISO)于1977年提出的,它是一个理论模型。TCP/IP网络通信协议由于其简单性和实用性,成为 事实上

    2024年01月22日
    浏览(74)
  • TCP、UDP、IP以及ensp基本配置

    目录 一、TCP报文段 二、UDP报文段 三、IP报文段 四、ensp基本配置​          源端口号: 表示发数据那个进程的端口号。          目的端口号: 表示收数据那个进程的端口号。          校验和: 验证数据的传输是否是正确的。          选项 :扩展Tcp功能时

    2024年02月02日
    浏览(52)
  • IP、ICMP、UDP、TCP 校验和算法分享

    以前看计算机网络相关的书,每次看到IP或者UDP报头校验和时,都一瞥而过,以为相当简单,不就是16bit数据的相加吗。最近在研究《TCP/IP详解 卷1:协议》这本书,看到校验和是 16bit字的二进制反码和 (晕,以前都没注意原来是反码和,看来以前看书不仔细啊!罪过,罪过

    2024年02月06日
    浏览(42)
  • IP、ICMP、TCP和UDP校验和计算

    一. 前言         计算网络数据包的校验和是机器自动完成,不需要手动计算。但是正因为如此,我们往往不会去深究校验和到底是怎么计算的,留下这一块盲区。虽然书上有大致介绍计算的方法,但是,“纸上得来终觉浅,绝知此事要躬行”,本文将详细演示IP、ICMP、TCP、

    2024年02月10日
    浏览(46)
  • TCP/IP协议族之TCP、UDP协议详解(小白也能看懂)

            在进行网络编程之前,我们必须要对网络通信的基础知识有个大概的框架,TCP/IP协议族涉及到多种网络协议,一般说TCP/IP协议,它不是指某一个具体的网络协议,而是一个协议族。本篇章主要针对IP协议、TCP和UDP协议记录总结。 OSI七层参考模型是国际标准化组织(

    2024年02月02日
    浏览(47)
  • 【业务领域】以太Mac/IP/UDP/TCP报文格式简介

    长度/类型域段: VLAN (Virtual Local Area Network)意为虚拟局域网,是在交换机实现过程中涉及到的概念,由802.1Q标准所定义。由于交换机是工作在链路层的网络设备,连接在同一台交换机的终端处于同一个三层网中,同时也处于同一个广播域。当交换机接入较多的终端时,任意一

    2024年01月22日
    浏览(60)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包