从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能

这篇具有很好参考价值的文章主要介绍了从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

/\*\*

* 设备ID
*/
private Integer deviceId;

/\*\*

* 监控的变量名称
*/
private String varName;

/\*\*

* 最小值
*/
private Double min;

/\*\*

* 最大值
*/
private Double max;

}


##### 报警事件



/**
* 报警消息
*/
@Data
public class AlarmMessage {

/\*\*

* 设备
*/
private Integer deviceId;

/\*\*

* 报警时间
*/
private Long timestamp;
/**
* 触发报警的采集变量名称
*/
private String alarmVar;

/\*\*

* 触发报警的采集值
*/
private Number alarmValue;
}


#### 开始实现



public class IotMonitorJob {

public static void main(String[] args) throws Exception {


    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setParallelism(1);

    // 采集数据Stream
    DataStreamSource<IotData> iotDataStream = getIotStream(environment);
    // 报警规则Stream
    DataStreamSource<AlarmRule> ruleConfig = getRuleConfig(environment);
    // 缓存报警规则 并监控报警数据
    SingleOutputStreamOperator<AlarmMessage> alarmStream = iotDataStream.connect(ruleConfig)
            .keyBy(IotData::getDeviceId, AlarmRule::getDeviceId)
            .process(new CoProcessFunction<IotData, AlarmRule, AlarmMessage>() {

                // 用临时保存设备的报警规则 ,这里的状态交由flink维护
                private MapState<Integer, AlarmRule> alarmRuleValueState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    // 初始化 ValueState
                    alarmRuleValueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("alarm-rule-state", Integer.class, AlarmRule.class));
                }

                @Override
                public void processElement1(IotData iotData, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
                    Map<String, Double> data = iotData.getData();

                    // 遍历每个规则
                    alarmRuleValueState.values().forEach(rule -> {

                        String varName = rule.getVarName();
                        // 获取变量值
                        Double val = data.get(varName);
                        if (val == null) {
                            // 变量里没有值
                            return;
                        }

                        if (val <= rule.getMin() || val > rule.getMax()) {
                            // 超过限制,输出报警信息
                            AlarmMessage alarmMessage = new AlarmMessage();
                            alarmMessage.setDeviceId(iotData.getDeviceId());
                            alarmMessage.setTimestamp(iotData.getTimestamp());
                            alarmMessage.setAlarmVar(varName);
                            alarmMessage.setAlarmValue(val);
                            collector.collect(alarmMessage);
                        }
                    });


                }

                @Override
                public void processElement2(AlarmRule alarmRule, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
                    // 接收到AlarmRule, 仅更新 alarmRuleValueState
                    alarmRuleValueState.put(alarmRule.getId(), alarmRule);
                }
            });


    alarmStream.print();
    environment.execute();
}

/\*\*

* 获取物联采集数据
*
* @param environment
* @return
*/
private static DataStreamSource getIotStream(StreamExecutionEnvironment environment) {
return environment.addSource(new SourceFunction<>() {
private boolean running = true;

        @Override
        public void run(SourceContext<IotData> sourceContext) throws Exception {
            while (running) {

                // 模拟100个设备 每秒一次上报数据

                long ts = System.currentTimeMillis();
                ts = ts - ts % 1000;

                for (int i = 0; i < 100; i++) {
                    IotData iotData = new IotData();
                    iotData.setTimestamp(ts);
                    iotData.setDeviceId(i);

                    Map<String, Double> data = new HashMap<>();
                    data.put("var1", RandomUtils.nextDouble());
                    data.put("var2", RandomUtils.nextDouble());
                    iotData.setData(data);

                    sourceContext.collect(iotData);
                }

                Thread.sleep(1000 - ts % 1000);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    });
}


/\*\*

* 获取规则配置
*/
public static DataStreamSource getRuleConfig(StreamExecutionEnvironment environment) {
// 仅针对部分设备监控

    List<AlarmRule> ruleList = new ArrayList<>();
    for (int i = 0; i < 20; i++) {
        AlarmRule alarmRule1 = new AlarmRule();
        alarmRule1.setDeviceId(i);
        alarmRule1.setVarName("var1");
        alarmRule1.setMax(20.0);
        alarmRule1.setMin(0.0);
        ruleList.add(alarmRule1);

        AlarmRule alarmRule2 = new AlarmRule();
        alarmRule2.setDeviceId(i);
        alarmRule2.setVarName("var2");
        alarmRule2.setMax(10.0);
        alarmRule2.setMin(0.0);
        ruleList.add(alarmRule2);

    }
    return environment.fromCollection(ruleList);
}

}


### 启动job



> 
> 实际运行基于 java 11 , flink 1.18.1
> 
> 
> 


输出结果:



AlarmMessage(deviceId=0, timestamp=1709732511000, alarmVar=var2, alarmValue=1.0408785873261203E308)
AlarmMessage(deviceId=1, timestamp=1709732511000, alarmVar=var2, alarmValue=8.409717342118433E306)
AlarmMessage(deviceId=2, timestamp=1709732511000, alarmVar=var2, alarmValue=6.955367711979709E307)
AlarmMessage(deviceId=3, timestamp=1709732511000, alarmVar=var2, alarmValue=2.5403069646236554E307)
AlarmMessage(deviceId=4, timestamp=1709732511000, alarmVar=var2, alarmValue=7.629789041713245E307)
AlarmMessage(deviceId=5, timestamp=1709732511000, alarmVar=var2, alarmValue=6.918664964996954E307)
AlarmMessage(deviceId=6, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1660434456728436E308)
AlarmMessage(deviceId=7, timestamp=1709732511000, alarmVar=var2, alarmValue=2.1272561368179368E307)
AlarmMessage(deviceId=8, timestamp=1709732511000, alarmVar=var2, alarmValue=2.8693117885744695E307)
AlarmMessage(deviceId=9, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1232501067396574E308)
AlarmMessage(deviceId=10, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6192738031099514E308)
AlarmMessage(deviceId=11, timestamp=1709732511000, alarmVar=var2, alarmValue=7.515829766654446E307)
AlarmMessage(deviceId=12, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6409410780574847E308)
AlarmMessage(deviceId=13, timestamp=1709732511000, alarmVar=var2, alarmValue=7.372363635115241E307)
AlarmMessage(deviceId=14, timestamp=1709732511000, alarmVar=var2, alarmValue=5.269385013806783E306)
AlarmMessage(deviceId=15, timestamp=1709732511000, alarmVar=var2, alarmValue=9.736804956554577E307)
AlarmMessage(deviceId=16, timestamp=1709732511000, alarmVar=var2, alarmValue=5.403962718372102E307)
AlarmMessage(deviceId=17, timestamp=1709732511000, alarmVar=var2, alarmValue=1.7957965318588386E308)
AlarmMessage(deviceId=18, timestamp=1709732511000, alarmVar=var2, alarmValue=6.546384330721207E307)
AlarmMessage(deviceId=19, timestamp=1709732511000, alarmVar=var2, alarmValue=1.2797848722222382E308)
AlarmMessage(deviceId=0, timestamp=1709732512000, alarmVar=var2, alarmValue=8.096850966966417E307)
AlarmMessage(deviceId=1, timestamp=1709732512000, alarmVar=var2, alarmValue=1.1459880504481993E308)
AlarmMessage(deviceId=2, timestamp=1709732512000, alarmVar=var2, alarmValue=1.6878563127635106E308)
AlarmMessage(deviceId=3, timestamp=1709732512000, alarmVar=var2, alarmValue=1.3431398337246118E308)
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数嵌入式工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年嵌入式&物联网开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能,程序员,嵌入式

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能,程序员,嵌入式

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能,程序员,嵌入式

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上嵌入式&物联网开发知识点,真正体系化!

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能,程序员,嵌入式

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能,程序员,嵌入式

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以+V:Vip1104z获取!!! (备注:嵌入式)

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能,程序员,嵌入式

最后

资料整理不易,觉得有帮助的朋友可以帮忙点赞分享支持一下小编~

你的支持,我的动力;祝各位前程似锦,offer不断,步步高升!!!

项目、讲解视频,并且后续会持续更新**

如果你觉得这些内容对你有帮助,可以+V:Vip1104z获取!!! (备注:嵌入式)

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能,程序员,嵌入式

最后

资料整理不易,觉得有帮助的朋友可以帮忙点赞分享支持一下小编~

你的支持,我的动力;祝各位前程似锦,offer不断,步步高升!!!

更多资料点击此处获qu!!文章来源地址https://www.toymoban.com/news/detail-848193.html

到了这里,关于从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink + MySQL 流式计算数据分析

    作者:禅与计算机程序设计艺术 大数据时代,海量的数据源源不断涌入到互联网、移动应用、企业数据库等各个领域,同时这些数据也逐渐成为各种业务场景中的主要输入数据。如何在短时间内对海量数据进行处理、分析并得出有价值的信息,已经成为当今社会越来越关注的

    2024年02月06日
    浏览(52)
  • 【Elasticsearch】从零开始搭建ES8集群并且集成到Springboot,更好的服务电商类等需要全文索引的项目(一)

    最近公司的电商项目越来越庞大,功能需求点也越来越多,各种C端对查询和检索的要求也越来越高,是时候在项目中引入全文检索了。 ElasticSearch 是一个基于 Lucene 的搜索服务器,它提供了一个分布式多用户能力的全文搜索引擎,并且是基于Java 开发的,我记得很久之前ES还不

    2024年02月15日
    浏览(48)
  • Flink流式计算状态检查点与恢复

    Flink流式计算状态检查点与恢复 Apache Flink是一个流处理框架,用于实时数据处理和分析。Flink可以处理大规模数据流,并提供一种高效、可靠的方法来处理和分析这些数据。Flink流式计算状态检查点与恢复是流处理的关键组件,它们确保Flink应用程序在故障时能够恢复并继续处

    2024年02月19日
    浏览(46)
  • 从零开始快速构建自己的Flink应用

    本文介绍如何在 mac 下快速构建属于自己的 Flink 应用。 在 mac 上使用homebrew安装 flink: 查看安装的位置: 进入安装目录,启动 flink 集群: 进入 web 页面:http://localhost:8081/ 基于模板直接构建一个项目: 在项目的 DataStreamJob 类实现如下计数的功能: 在上面的例子中,我们使用

    2024年02月20日
    浏览(51)
  • 从零开始搭建web组态

    成果展示:by组态[web组态插件] 目前只有两种选择,canvas和svg Canvas: 是一个基于像素的渲染引擎,使用JavaScript API在画布上绘制图像,它的优点包括: Canvas渲染速度快,适合处理大量图像和高度动态的图像。 可以直接操作像素,能够创建高质量、流畅的动画效果。 Canvas可用于

    2024年04月23日
    浏览(43)
  • 从零开始搭建群众权益平台(一)

    本次的平台我们名为群众权益维护平台,我们将讲解整体的思路,涉及到很多内容,我将给出一份简化的示例,包含了网页的基本结构、前端和后端代码,以及部署的基本步骤。 技术栈使用:HTML,CSS,JavaScript(前端),Node.js(后端),MongoDB(数据库),Heroku(部署)。 这

    2024年02月09日
    浏览(39)
  • 从零开始搭建群众权益平台(五)

    本篇博客我们将实现 验证新的用户名或电子邮件,文件上传,支付,通知等内容 验证新的用户名或电子邮件: 在更新用户信息的路由中,我们需要确保新的用户名或电子邮件还没有被其他用户使用: 输入验证: 对用户输入进行验证非常重要,以

    2024年02月09日
    浏览(36)
  • 从零开始学架构-计算高性能

            高性能是每个程序员的追求,无论做一个系统、还是写一组代码,都希望能够达到高性能的效果。而高性能又是最复杂的一环,磁盘、操作系统、CPU、内存、缓存、网络、编程语言、数据库、架构等,每个都可能影响系统的高性能,一行不恰当的 debug 日志,一个

    2023年04月24日
    浏览(93)
  • 从零开始搭建STM32CubeMX开发环境

    本文记录一下如何从零开始使用STM32CubeMX,包括软件的安装,环境的搭建,配置代码的生成等; 本文以STM32G030C8T6为例,如果你的单片机不是以STM32G030C8T6为例,换成你的单片机类型即可,过程都是通用的; STM32CubeMX 是意法半导体推出的针对STM32 系列芯片的图形化配置工具,通

    2024年02月12日
    浏览(53)
  • 从零开始搭建家庭网络:软路由实战经验分享(一)

    最近入门了软路由,研究了半个月,一步一步从网络小白到最后自己搭建了家庭局域网络,现在给大家分享一下我搭建软路由的经验。 既然有软路由,那么相对的肯定有硬路由:目前我们网上买到的路由器,就是硬路由,这种从一开始就是 按照路由器设计规范设计出来的硬

    2024年02月02日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包