【开源项目】消息推送平台austin介绍

这篇具有很好参考价值的文章主要介绍了【开源项目】消息推送平台austin介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

项目介绍

核心功能:统一的接口发送各种类型消息,对消息生命周期全链路追踪。

意义:只要公司内部有发送消息的需求,都应该要有类似austin的项目。消息推送平台对各类消息进行统一发送处理,这有利于对功能的收拢,以及提高业务需求开发的效率。

项目地址https://github.com/ZhongFuCheng3y/austin

项目拆解

下发消息接口,分为群发和单发。接口参数主要有模板id(发送消息的内容模板),参数[用来替换模板参数,接收人],api可以存在多个,但是具体处理的方法只需要批量参数就可以。

单发的接口请求实体类

public class SendRequest {

    /**
     * 执行业务类型
     * send:发送消息
     * recall:撤回消息
     */
    private String code;

    /**
     * 消息模板Id
     * 【必填】
     */
    private Long messageTemplateId;


    /**
     * 消息相关的参数
     * 当业务类型为"send",必传
     */
    private MessageParam messageParam;


}

群发的接口实体类

public class BatchSendRequest {

    /**
     * 执行业务类型
     * 必传,参考 BusinessCode枚举
     */
    private String code;


    /**
     * 消息模板Id
     * 必传
     */
    private Long messageTemplateId;


    /**
     * 消息相关的参数
     * 必传
     */
    private List<MessageParam> messageParamList;

}

单个消息的实体

public class MessageParam {

    /**
     * @Description: 接收者
     * 多个用,逗号号分隔开
     * 【不能大于100个】
     * 必传
     */
    private String receiver;

    /**
     * @Description: 消息内容中的可变部分(占位符替换)
     * 可选
     */
    private Map<String, String> variables;

    /**
     * @Description: 扩展参数
     * 可选
     */
    private Map<String, String> extra;
}

消息模板中定义了发送渠道,消息渠道决定消息的处理器。

AssembleAction转换实体类。核心逻辑有

  • 将模板中的可变参数转化成文本内容
  • 根据消息渠道获取消息实体类
  • 生成业务编码

AssembleAction#getContentModelValueContentHolderUtil.replacePlaceHolder(originValue, variables);用来处理可替换变量。

private static ContentModel getContentModelValue(MessageTemplate messageTemplate, MessageParam messageParam) {

    // 得到真正的ContentModel 类型
    Integer sendChannel = messageTemplate.getSendChannel();
    Class<? extends ContentModel> contentModelClass = ChannelType.getChanelModelClassByCode(sendChannel);

    // 得到模板的 msgContent 和 入参
    Map<String, String> variables = messageParam.getVariables();
    JSONObject jsonObject = JSON.parseObject(messageTemplate.getMsgContent());


    // 通过反射 组装出 contentModel
    Field[] fields = ReflectUtil.getFields(contentModelClass);
    ContentModel contentModel = ReflectUtil.newInstance(contentModelClass);
    for (Field field : fields) {
        String originValue = jsonObject.getString(field.getName());

        if (StrUtil.isNotBlank(originValue)) {
            String resultValue = ContentHolderUtil.replacePlaceHolder(originValue, variables);
            Object resultObj = JSONUtil.isJsonObj(resultValue) ? JSONUtil.toBean(resultValue, field.getType()) : resultValue;
            ReflectUtil.setFieldValue(contentModel, field, resultObj);
        }
    }

    // 如果 url 字段存在,则在url拼接对应的埋点参数
    String url = (String) ReflectUtil.getFieldValue(contentModel, LINK_NAME);
    if (StrUtil.isNotBlank(url)) {
        String resultUrl = TaskInfoUtils.generateUrl(url, messageTemplate.getId(), messageTemplate.getTemplateType());
        ReflectUtil.setFieldValue(contentModel, LINK_NAME, resultUrl);
    }
    return contentModel;
}

SendMqAction负责发送消息,根据austin.mq.pipeline的值获取不同的消息发送者,有MQ发消息,有EventBus,有SpringEvent。

@Slf4j
@Service
public class SendMqAction implements BusinessProcess<SendTaskModel> {


    @Autowired
    private SendMqService sendMqService;

    @Value("${austin.business.topic.name}")
    private String sendMessageTopic;

    @Value("${austin.business.recall.topic.name}")
    private String austinRecall;
    @Value("${austin.business.tagId.value}")
    private String tagId;

    @Value("${austin.mq.pipeline}")
    private String mqPipeline;


    @Override
    public void process(ProcessContext<SendTaskModel> context) {
        SendTaskModel sendTaskModel = context.getProcessModel();
        try {
            if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) {
                String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});
                sendMqService.send(sendMessageTopic, message, tagId);
            } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) {
                String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName});
                sendMqService.send(austinRecall, message, tagId);
            }
        } catch (Exception e) {
            context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
            log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e)
                    , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));
        }
    }

}

ConsumeServiceImpl#consume2Send,负责处理消息

    @Override
    public void consume2Send(List<TaskInfo> taskInfoLists) {
        String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
        for (TaskInfo taskInfo : taskInfoLists) {
            logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
            Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
            taskPendingHolder.route(topicGroupId).execute(task);
        }
    }

Task#run,负责任务的具体处理

    @Override
    public void run() {

        // 0. 丢弃消息
        if (discardMessageService.isDiscard(taskInfo)) {
            return;
        }
        // 1. 屏蔽消息
        shieldService.shield(taskInfo);

        // 2.平台通用去重
        if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
            deduplicationRuleService.duplication(taskInfo);
        }

        // 3. 真正发送消息
        if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
            handlerHolder.route(taskInfo.getSendChannel()).doHandler(taskInfo);
        }

    }

EmailHandler#handler,邮件处理器专门处理邮件的消息

    @Override
    public boolean handler(TaskInfo taskInfo) {
        EmailContentModel emailContentModel = (EmailContentModel) taskInfo.getContentModel();
        MailAccount account = getAccountConfig(taskInfo.getSendAccount());
        try {
            File file = StrUtil.isNotBlank(emailContentModel.getUrl()) ? AustinFileUtils.getRemoteUrl2File(dataPath, emailContentModel.getUrl()) : null;
            String result = Objects.isNull(file) ? MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true) :
                    MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true, file);
        } catch (Exception e) {
            log.error("EmailHandler#handler fail!{},params:{}", Throwables.getStackTraceAsString(e), taskInfo);
            return false;
        }
        return true;
    }

ShieldServiceImpl#shield,发送消息判断是否需要白天屏蔽。将消息存储到Redis中,开启xxl-job从redis中获取数据。

    @Override
    public void shield(TaskInfo taskInfo) {

        if (ShieldType.NIGHT_NO_SHIELD.getCode().equals(taskInfo.getShieldType())) {
            return;
        }

        /**
         * example:当消息下发至austin平台时,已经是凌晨1点,业务希望此类消息在次日的早上9点推送
         * (配合 分布式任务定时任务框架搞掂)
         */
        if (isNight()) {
            if (ShieldType.NIGHT_SHIELD.getCode().equals(taskInfo.getShieldType())) {
                logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD.getCode())
                        .businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
            }
            if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) {
                redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo,
                                SerializerFeature.WriteClassName),
                        (DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds());
                logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
            }
            taskInfo.setReceiver(new HashSet<>());
        }
    }

    /**
     * 小时 < 8 默认就认为是凌晨(夜晚)
     *
     * @return
     */
    private boolean isNight() {
        return LocalDateTime.now().getHour() < 8;

    }

总结一下

  1. 下发消息接口,分为群发和单发。接口参数主要有模板id(发送消息的内容模板),模板中定义了消息渠道(消息类型决定消息的处理器),参数[用来替换模板参数,接收人],api可以存在多个,但是具体处理的方法只需要批量参数就可以。
  2. 根据模板组装数据,替换变量。
  3. 消息发送,使用监听器或者消息队列,进行异步解耦。消息发送的业务和消息接收的业务拆解开来。
  4. 根据发送渠道获取对应的消息处理器(用邮件还是短信),使用策略模式进行不同的消息渠道拆分,用具体的消息处理器进行处理消息。

【开源项目】消息推送平台austin介绍文章来源地址https://www.toymoban.com/news/detail-498903.html

到了这里,关于【开源项目】消息推送平台austin介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件

    (注:安装在虚拟机则填虚拟机地址,否则则为本机地址) 用户名和密码都为guest 看到如下页面则为RabbitMQ安装登录成功。 三、依赖注入 导入依赖坐标 四、配置yaml文件 配置yaml配置文件 (注:host为地址,如果安装在虚拟机则为虚拟机地址,安装在本机则本机地址。port为端

    2024年04月13日
    浏览(57)
  • Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

    #spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 spring.kafka.bootstrap-servers=192.168.x.xxx:9092 #=============== producer生产者 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.ap

    2024年04月09日
    浏览(42)
  • spring boot +微信小程序项目,通过微信公众号实现指定用户消息长期推送

    用户登录小程序,后台记录用户的小程序openId和用户唯一的UnionId。然后用户触发公众号事件(关注公众号或者发送指定消息),后台获取到用户公众号的openId,再调用接口通过公众号的openId查询用户的UnionId,再和数据库里的UnionId进行匹配,将用户的公众号openId存入数据库。此

    2024年02月03日
    浏览(57)
  • 如何使用Git将本地项目推送至代码托管平台?【Gitee、GitLab、GitHub】

    查看当前Git邮箱 git config user.email 设置Git账户名 git config --global user.name = “王会称” ​ 设置Git邮箱 git config --global user.email “wanghuichen2003@163.com” 再次查看是否设置成功 进入git全局配置文件修改 vi ~/.gitconfig 登录Gitee官网,并注册账户 ===================================================

    2024年04月16日
    浏览(56)
  • 开源物联网平台推荐介绍

    RT-Thread RT-Thread是一个来自中国的开源物联网操作系统,它提供了非常强的可伸缩能力:从一个可以运行在ARM Cortex-M0芯片上的极小内核,到中等的ARM Cortex-M3/4/7系统,甚至是运行于MIPS32、ARM Cortex-A系列处理器上功能丰富系统。 Thingsboard Thingsboard是一个开源的物联网平台,由俄罗

    2024年02月04日
    浏览(42)
  • ThingsBoard开源物联网平台介绍

    视频教程:  ThingsBoard介绍_哔哩哔哩_bilibili ThingsBoard是一个基于Java的开源物联网平台,旨在实现物联网项目的快速开发、管理和扩展。本课程主要从0到1带你熟悉ThingsBoard,学习优秀的物联网变成思维与思想,主要有有一下几点:1、ThingsBoard相关介绍,以及为何在物联网平台

    2024年04月09日
    浏览(44)
  • 【小项目】微信定时推送天气预报Github项目使用及原理介绍-包含cron、天气预报、常用api...

    一、资料链接 1、github地址 https://github.com/qq1534774766/wx-push 2、教程地址 https://blog.csdn.net/qq15347747/article/details/126521774 3、易客云API(自动发送天气) https://yikeapi.com/account/index 4、apispace-各种接口(名人名言) https://www.apispace.com/console/api?orgId=6356 5、微信公众平台 https://mp.weixin.qq.com/d

    2024年02月02日
    浏览(42)
  • JVS开源基础框架:平台基本信息介绍

    JVS是面向软件开发团队可以快速实现应用的基础开发脚手架,主要定位于企业信息化通用底座,采用微服务分布式框架,提供丰富的基础功能,集成众多业务引擎,它灵活性强,界面化配置对开发者友好,底层容器化构建,集合持续化构建。 JVS是定位为辅助研发团队的快速脚

    2024年02月12日
    浏览(52)
  • 开源项目-数据可视化分析平台

    哈喽,大家好,今天给大家带来一个开源项目-数据可视化分析平台。项目通过SpringBoot实现 数据可视化分析平台主要有数据源管理,项目管理,数据集管理,图表管理,看板管理等功能 数据源管理功能可以添加MySQL,Oracle,PostgreSQL等类型的数据源信息 项目管理可以对项目名

    2024年02月10日
    浏览(53)
  • 【无标题】好消息 突破:IM开源项目OpenIM采用wasm技术实现jssdk

    OpenIM 客户端sdk用golang实现,同时采用sqlite存储本地聊天记录,通过gomobile生成sdk,供iOS Android 调用,达到了了一套代码多端复用的效果。最近融合wasm技术,让浏览器具备存储能力,本地聊天记录存储在浏览器,彻底放弃了之前jssdk server服务端。 WebAssembly 是一种运行在现代网

    2024年02月08日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包