SpringBoot自定义消息总线

这篇具有很好参考价值的文章主要介绍了SpringBoot自定义消息总线。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、前言

        在现代的分布式系统中,消息传递已成为一个非常流行的模式。它使得系统内的不同部分可以松耦合地通信,从而实现更高效、更可靠的应用程序。本博客将介绍SpringBoot如何提供简单易用的消息传递机制,并展示如何自定义消息总线以满足特定需求。

二、依赖引入

// gradle 自身需求资源库 放头部
buildscript {
    repositories {
        maven { url 'https://maven.aliyun.com/repository/public' }// 加载其他Maven仓库
        mavenCentral()
    }
    dependencies {
        classpath('org.springframework.boot:spring-boot-gradle-plugin:2.1.1.RELEASE')// 加载插件,用到里面的函数方法
    }
}


apply plugin: 'java'
apply plugin: 'idea'
// 使用spring boot 框架
apply plugin: 'org.springframework.boot'
// 使用spring boot的自动依赖管理
apply plugin: 'io.spring.dependency-management'

// 版本信息
group 'com.littledyf'
version '1.0-SNAPSHOT'

// 执行项目中所使用的的资源仓库
repositories {
    maven { url 'https://maven.aliyun.com/repository/public' }
    mavenCentral()
}

// 项目中需要的依赖
dependencies {
    // 添加 jupiter 测试的依赖
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
    // 添加 jupiter 测试的依赖
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

    // 添加 spring-boot-starter-web 的依赖 必须 排除了security 根据自身需求
    implementation('org.springframework.boot:spring-boot-starter-web') {
        exclude group: 'org.springframework.security', module: 'spring-security-config'
    }

    // 添加 spring-boot-starter-test 该依赖对于编译测试是必须的,默认包含编译产品依赖和编译时依赖
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    // 添加 junit 测试的依赖
    testImplementation group: 'junit', name: 'junit', version: '4.11'
    // 添加 lombok
    annotationProcessor 'org.projectlombok:lombok:1.18.22' // annotationProcessor代表main下代码的注解执行器
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'// testAnnotationProcessor代表test下代码的注解执行器
    compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.22' // compile代表编译时使用的lombok



}


test {
    useJUnitPlatform()
}

三、代码

        定义注册器实现类:

import org.springframework.context.ApplicationContext;
import org.springframework.core.GenericTypeResolver;

import java.util.HashMap;
import java.util.Map;

/**
 * @description 注册器
 */
public class Registry {

    /**
     * Query对象和命令提供者的对应关系
     */
    private Map<Class<? extends Query>,QueryProvider> queryProviderMap =  new HashMap<>();

    /**
     * Event对象和命令提供者的对应关系
     */
    private Map<Class<? extends Event>,EventProvider> eventProviderMap =  new HashMap<>();

    public Registry(ApplicationContext applicationContext){
        String[] names = applicationContext.getBeanNamesForType(QueryHandler.class);
        for (String name : names) {
            registerQuery(applicationContext,name);
        }
        names = applicationContext.getBeanNamesForType(EventHandler.class);
        for (String name : names) {
            registerEvent(applicationContext,name);
        }
    }

    private void registerQuery(ApplicationContext applicationContext, String name) {
        Class<QueryHandler<?,?>> handlerClass = (Class<QueryHandler<?,?>>) applicationContext.getType(name);
        Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, QueryHandler.class);
        Class<? extends Query> queryType  = (Class<? extends Query>) generics[1];
        queryProviderMap.put(queryType, new QueryProvider(applicationContext, handlerClass));
    }

    private void registerEvent(ApplicationContext applicationContext, String name) {
        Class<EventHandler<?>> handlerClass = (    Class<EventHandler<?>>) applicationContext.getType(name);
        Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, EventHandler.class);
        Class<? extends Event> eventType  = (Class<? extends Event>) generics[0];
        eventProviderMap.put(eventType, new EventProvider(applicationContext, handlerClass));
    }

    /**
     * 获取具体的QueryHandler   <R, Q extends Query<R>>定义R  Q的类型
     * @param queryClass
     * @param <R>
     * @param <Q>
     * @return
     */
    <R, Q extends Query<R>> QueryHandler<R,Q> getQuery(Class<Q> queryClass) {
        return queryProviderMap.get(queryClass).get();
    }

    /**
     * 获取具体的EventHandler
     * @param eventClass
     * @return
     */
    <E extends Event> EventHandler<E> getEvent(Class<? extends Event> eventClass) {
        return eventProviderMap.get(eventClass).get();
    }
}

        消息总线接口,定义两个方法,一个执行查询,一个执行事件:

/**
 * @description  消息总线
 */
public interface Bus {
    <R,Q extends Query<R>> R executeQuery(Q query);

    <E extends Event> void dispatchEvent(E event);
}

        消息总线实现类:

public class SpringBus implements Bus {

    private final Registry registry;

    public SpringBus(Registry registry) {
        this.registry = registry;
    }


    @Override
    public <R, Q extends Query<R>> R executeQuery(Q query) {
        QueryHandler<R, Q> queryHandler = (QueryHandler<R, Q>) registry.getQuery(query.getClass());
        return queryHandler.handle(query);
    }

    @Override
    public <E extends Event> void dispatchEvent(E event) {
        EventHandler<E> eventHandler = (EventHandler<E>) registry.getEvent(event.getClass());
        eventHandler.process(event);
    }
}

        Query接口:

public interface Query<R> {

}

        QueryHandler接口:

public interface QueryHandler<R, C extends Query<R>> {
    R handle(C query);
}

        QueryProvider类:

import org.springframework.context.ApplicationContext;

/**
 * query  提供者
 * @param <H>
 */
public class QueryProvider<H extends QueryHandler<?, ?>> {
    private final ApplicationContext applicationContext;
    private final Class<H> type;

    QueryProvider(ApplicationContext applicationContext, Class<H> type) {
        this.applicationContext = applicationContext;
        this.type = type;
    }

    public H get() {
        return applicationContext.getBean(type);
    }
}

        Event类似,Event接口:

public interface Event {

}

        EventHandler接口:

/**
 * @description  事件处理器
 */
public interface EventHandler<E extends Event> {
    /**
     *
     * @param event  事件
     */
    void process(E event);
}

        EventProvider类:

import org.springframework.context.ApplicationContext;

/**
 * event  提供者
 * @param <H>
 */
public class EventProvider<H extends EventHandler<?>> {
    private final ApplicationContext applicationContext;
    private final Class<H> type;

    EventProvider(ApplicationContext applicationContext, Class<H> type) {
        this.applicationContext = applicationContext;
        this.type = type;
    }

    public H get() {
        return applicationContext.getBean(type);
    }
}

        实体类:

import com.littledyf.cqs.Query;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class TestDto implements Serializable, Query<List<TestVo>> {
    private String name;
}
import lombok.Data;

@Data
public class TestVo {

    private String nameVo;
}

        Query具体实现类:

import com.littledyf.cqs.QueryHandler;
import com.littledyf.cqs.domain.TestDto;
import com.littledyf.cqs.domain.TestVo;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
@NoArgsConstructor
public class TestQueryHandler implements QueryHandler<List<TestVo>, TestDto> {

    @Override
    public List<TestVo> handle(TestDto testDto) {

        List<TestVo> testVos = new ArrayList<>();
        TestVo testVo = new TestVo();
        testVo.setNameVo(testDto.getName());

        testVos.add(testVo);
        return testVos;
    }
}

        Controller层:

import com.littledyf.cqs.Bus;
import com.littledyf.cqs.domain.TestDto;
import com.littledyf.cqs.domain.TestVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@RestController
@RequestMapping("/my-test/cqs")
public class CqsController {

    @Resource
    private Bus bus;

    @PostMapping(value = "/query-test")
    public List<TestVo> queryTest(@RequestBody TestDto testDto)  {
        return bus.executeQuery(testDto);
    }
}

        SpringBoot启动类,启动类中进行ApplicationContext的注入:

import com.littledyf.cqs.Bus;
import com.littledyf.cqs.Registry;
import com.littledyf.cqs.SpringBus;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MyTestApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyTestApplication.class, args);
	}

	/**
	 * 注册器
	 */
	@Bean
	public Registry registry(ApplicationContext applicationContext) {
		return new Registry(applicationContext);
	}

	/**
	 * 消息总线
	 */
	@Bean
	public Bus commandBus(Registry registry) {
		return new SpringBus(registry);
	}
}

        yml文件配置:

server:
  port: 8080
spring:
  application:
    name: my-test-service

四、测试

SpringBoot自定义消息总线,Java学习,微服务,spring boot,java

        这里只要模拟了查询,事件等与查询类似,需要实现具体的接口。整体实现就是在SpringBoot启动时加载注册类,注册类会根据具体的类注入相应的bean,在具体调用时,会根据不同的类实现调用相关的bean。文章来源地址https://www.toymoban.com/news/detail-690939.html

到了这里,关于SpringBoot自定义消息总线的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 集成 WebSocket 实现服务端推送消息到客户端

          假设有这样一个场景:服务端的资源经常在更新,客户端需要尽量及时地了解到这些更新发生后展示给用户,如果是 HTTP 1.1,通常会开启 ajax 请求询问服务端是否有更新,通过定时器反复轮询服务端响应的资源是否有更新。                         在长时间不更新

    2024年02月16日
    浏览(60)
  • 本地存储、自定义事件、全局事件总线、消息订阅与发布【Vue】

    存储内容大小一般支持5MB左右(不同浏览器可能还不一样) 浏览器通过 Window.sessionStorage 和 Window.localStorage 属性来实现本地存储机制 相关API: (1) xxxxStorage.setItem(\\\'key\\\', \\\'value\\\'); 该方法接受一个键和值作为参数,会把键值对添加到存储中,如果键名存在,则更新其对应的值。

    2023年04月21日
    浏览(40)
  • 【SpringBoot】Spring Boot 单体应用升级 Spring Cloud 微服务

    Spring Cloud 是在 Spring Boot 之上构建的一套微服务生态体系,包括服务发现、配置中心、限流降级、分布式事务、异步消息等,因此通过增加依赖、注解等简单的四步即可完成 Spring Boot 应用到 Spring Cloud 升级。 Spring Boot 应用升级为 Spring Cloud Cloud Native 以下是应用升级 Spring Clou

    2024年02月02日
    浏览(45)
  • Java 框架面试题-Spring Boot自定义配置与自动配置共存

    Spring Boot 是一个快速开发框架,可以简化 Spring 应用程序的开发,其中自定义配置是其中一个非常重要的特性。 在 Spring Boot 中,自定义配置允许开发者以自己的方式来配置应用程序。自定义配置可以用于覆盖默认配置,也可以用于添加新的配置项。本文将详细介绍 java框架面

    2023年04月11日
    浏览(54)
  • (零基础学习)Neo4j+Spring boot 自行定义属性

    节点和关系都可以设置自己的属性。 属性是由Key-Value键值对组成,键名是字符串。 属性值是要么是原始值,要么是原始值类型的一个数组 。比如+String+,+int+和i+int[]+都是合法的。 注意 null不是一个合法的属性值。 Nulls能代替模仿一个不存在的Key 。 …/_images/image3.9.png 属性值

    2024年01月15日
    浏览(44)
  • Spring Boot学习随笔- 实现AOP(JoinPoint、ProceedingJoinPoint、自定义注解类实现切面)

    学习视频:【编程不良人】2021年SpringBoot最新最全教程 问题 现有业务层开发存在问题 额外功能代码存在大量冗余 每个方法都需要书写一遍额外功能代码不利于项目维护 Spring中的AOP AOP:Aspect 切面 + Oriented 面向 Programmaing 面向切面编程 Aspect(切面) = Advice(通知) + Pointcut(

    2024年02月04日
    浏览(48)
  • spring boot学习第六篇:SpringBoot 集成WebSocket详解

    1、WebSocket简介 WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。 2、为什么需要WebSocket HTTP 是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接。 无状态:每次连

    2024年01月21日
    浏览(52)
  • Springboot学习:安装spring boot helper插件的相关问题

    在idea中安装在线插件spring boot helper后,在构建spring boot项目发现IDE严重报错:spring boot helper不是JetBrains的插件,解决方法是: 卸载刚才安装的插件,注意需要卸载插件后点击“应用”,然后重启idea即可卸载成功,不然会卸载不成功。 需要说明的是:安装自己搜索的“spring

    2024年02月14日
    浏览(42)
  • 消息总线在微服务中的应用

    上一篇文章介绍了 Spring Cloud 中的分布式配置组件 Config ,每个服务节点可以从 Config Server 拉取外部配置信息。但是似乎还有一个悬而未决的问题,那就是当服务节点数量非常庞大的时候,我们不可能一台一台服务器挨个去手工触发刷新,这时候就需要一个可以号令武林的角色

    2024年02月21日
    浏览(31)
  • 【SpringBoot系列】Spring Boot Bean Factory的生命周期 Spring Boot学习大纲,可以留言自己想了解的技术点

    继续完成既定的任务,走起 可以概括为以下步骤: 资源加载:Spring Boot 应用程序启动时,会扫描指定的路径,加载配置文件和类文件等资源。 配置加载:Spring Boot 会根据配置文件(比如 application.properties)中的配置,创建一个 Environment 对象,用于保存应用程序的配置信息。

    2024年02月05日
    浏览(60)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包