flink 对每天的数据进行汇总

这篇具有很好参考价值的文章主要介绍了flink 对每天的数据进行汇总。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用flink 对kafka中每天的数据进行汇总,来一条数据统计一次结果,并将结果进行持久化

public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties props = new Properties();
        props.setProperty("group.id", "test005");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", false);
        props.put("auto.offset.reset", "latest ");


        // 定义kafka服务器地址列表,不需要指定所有的broker
        props.setProperty("bootstrap.servers", "192.168.0.45:9092,192.168.0.46:9092,192.168.0.47:9092");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("audit_alarm", new SimpleStringSchema(), props);

        DataStream<String> stream = env.addSource(kafkaConsumer);

        DataStream<DailyData> dailyData = stream.map(new MapFunction<String, DailyData>() {
            @Override
            public DailyData map(String value) throws Exception {

  
                DailyData dailyData = new DailyData(day, message, timestamp);
                
                return dailyData;
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<DailyData>(Time.seconds(1)) {
            @Override
            public long extractTimestamp(DailyData element) {
                return element.getTimestamp();
            }
        });

        DataStream<AlarmGatherResponse> dailyDataCount = dailyData
                .keyBy(DailyData::getKey)
                .window(TumblingEventTimeWindows.of(Time.days(1)))
                //.trigger(EventTimeTrigger.create()) // 每来一条数据触发一次计算
                .trigger(CountTrigger.of(1))
                //.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
                .process(new DailyDataCountProcessProcess());

        dailyDataCount.addSink(new MySQLSink(url, username, password));
        env.execute("Daily Data Count");
    }

之前一直在trigger中想用时间进行触发,后面发现当环境中使用的是事件时间(EventTime)

事件时间(EventTime)语义,因此触发器应该使用 EventTimeTrigger,而不是 ProcessingTimeTrigger

因此,将 .trigger(ProcessingTimeTrigger.create()) 更改为 .trigger(EventTimeTrigger.create()) 可以使触发器生效。

但是,这样的话,窗口将不会每来一条数据触发一次计算,而是要等到该窗口的 Watermark 推进到窗口结束时间后才会触发计算。如果要每来一条数据就立即计算,可以考虑使用 CountTrigger 触发器,例如 .trigger(CountTrigger.of(1))。这将在每接收到一条数据时触发计算。注意,这样会大幅增加计算和数据传输的开销。

还有种触发条件可根据时间

trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))

设置多久触发一次算子

补充:文章来源地址https://www.toymoban.com/news/detail-597009.html

MySQLSink为将数据写入到mysql,进行了更新和新增操作,根据生成的key,对数据先进行修改动作,若修改失败,则进行新增,保证一天内,聚合的数据的唯一性
package utils;

import cn.hutool.json.JSONUtil;
import mode.AlarmGatherResponse;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public  class MySQLSink extends RichSinkFunction<AlarmGatherResponse> {
    private Connection connection;
    private PreparedStatement insertStatement;
    private PreparedStatement updateStatement;

    private String jdbcUrl;
    private String username;
    private String password;

    public MySQLSink(String jdbcUrl, String username, String password) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = DriverManager.getConnection(jdbcUrl, username, password);
        // 创建插入和更新的 PreparedStatement
        String insertSql = "insert into t_flink_dailyDataCount(uuid," +
                "start_time," +
                "end_time," +
                "src_list," +
                "dst_list," +
                "attacker_list," +
                "attacked_list," +
                "alarm_type," +
                "priority," +
                "attack_direct," +
                "attack_chain," +
                "update_time," +
                "related_uuid_list,group_key ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
        String updateSql = "UPDATE t_flink_dailyDataCount " +
                "SET end_time = ?," +
                "src_list = ?," +
                "dst_list = ?," +
                "attacker_list = ?," +
                "attacked_list = ?," +
                "alarm_type = ?," +
                "priority = ?," +
                "attack_direct = ?," +
                "attack_chain = ?," +
                "update_time = ? " +
                " WHERE group_key = ?";
        insertStatement = connection.prepareStatement(insertSql);
        updateStatement = connection.prepareStatement(updateSql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        // 关闭连接和 PreparedStatement
        if (insertStatement != null) {
            insertStatement.close();
        }
        if (updateStatement != null) {
            updateStatement.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    @Override
    public void invoke(AlarmGatherResponse row, Context context) throws Exception {
        // 先尝试更新数据
        updateStatement.setDate(1, row.getEndTime());
        updateStatement.setString(2, JSONUtil.toJsonStr(row.getSrcList()));
        updateStatement.setString(3, JSONUtil.toJsonStr(row.getDstList()));
        updateStatement.setString(4, JSONUtil.toJsonStr(row.getAttackerList()));
        updateStatement.setString(5, JSONUtil.toJsonStr(row.getAttackedList()));
        updateStatement.setString(6, row.getAlarmType());
        updateStatement.setString(7, row.getPriority());
        updateStatement.setString(8, row.getAttackDirect());
        updateStatement.setInt(9, row.getAttackChain()==null?0:row.getAttackChain());
        updateStatement.setTimestamp(10, new java.sql.Timestamp(System.currentTimeMillis()));
        updateStatement.setString(11, row.getKey());
        int updatedRows = updateStatement.executeUpdate();

        if (updatedRows == 0) {
            // 如果更新失败,则插入数据
            insertStatement.setString(1, row.getUuid());
            insertStatement.setDate(2, row.getStartTime());
            insertStatement.setDate(3, row.getEndTime());
            insertStatement.setString(4, JSONUtil.toJsonStr(row.getSrcList()));
            insertStatement.setString(5, JSONUtil.toJsonStr(row.getDstList()));
            insertStatement.setString(6, JSONUtil.toJsonStr(row.getAttackerList()));
            insertStatement.setString(7, JSONUtil.toJsonStr(row.getAttackedList()));
            insertStatement.setString(8, row.getAlarmType());
            insertStatement.setString(9, row.getPriority());
            insertStatement.setString(10, row.getAttackDirect());
            insertStatement.setInt(11, row.getAttackChain()==null?0:row.getAttackChain());
            insertStatement.setDate(12, new Date(System.currentTimeMillis()));
            insertStatement.setString(13, JSONUtil.toJsonStr(row.getRelatedUuidList()));
            insertStatement.setString(14, row.getKey());
            insertStatement.executeUpdate();
        }
    }
}

到了这里,关于flink 对每天的数据进行汇总的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • PostGreSql中统计表中每天的数据,并统计每天的回复数,未回复数以及未回复占比(显示百分比)

    要在 PostgreSQL 中统计表中每天的数据,并统计每天的回复数、未回复数以及未回复占比,并以百分比形式显示,你可以使用以下 SQL 查询。假设你有一个名为 \\\"messages\\\" 的表,其中包含消息的时间戳列 \\\"timestamp\\\" 和一个指示消息是否已回复的列 \\\"replied\\\"(1 表示已回复,0 表示未回

    2024年02月07日
    浏览(41)
  • 编写sql统计一段时间内,每天、每月、每年的统计数据(PostgreSQL)

    前言 在做数据统计页面时,总会遇到统计某段时间内,每天、每月、每年的数据视图(柱状图、折线图等)。这些统计数据一眼看过去也简单呀,不就是按照时间周期(天、月、年)对统计数据进行分个组就完了嘛?但是会有一个问题,简单的写个sql对周期分组,获取到的统

    2024年02月12日
    浏览(40)
  • 共享单车之数据分析-统计共享单车每天的平均使用时间

    第1关:统计共享单车每天的平均使用时间 任务描述 相关知识 如何配置Hbase的MapReduce类 如何使用Hbase的MapReduce进行数据分析 编程要求 测试说明 任务描述 本关任务:使用 Hbase 的 MapReduce 对已经存在 Hbase 的共享单车运行数据进行分析,统计共享单车每天的平均使用时间,其中

    2024年02月03日
    浏览(60)
  • 实战Flink Java api消费kafka实时数据落盘HDFS

    在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件: 确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。 确保 Kafka Serve

    2024年01月24日
    浏览(51)
  • 每天几道面试题(第一天)

    友情提醒 背面试题很枯燥,加入一些戏剧场景故事人物来加深记忆。PS:点击文章目录可直接跳转到文章指定位置。 【门卫甲,门卫乙,面试者老王,路人等】 门卫甲:来者何人?报上名来。 老王:隔壁老王前来面试 门卫乙:现在面试Java的人很多,如果谁都放进去,恐怕总

    2024年02月09日
    浏览(39)
  • 2023-06-13:统计高并发网站每个网页每天的 UV 数据,结合Redis你会如何实现?

    2023-06-13:统计高并发网站每个网页每天的 UV 数据,结合Redis你会如何实现? 答案2023-06-13: 如果统计 PV (页面浏览量)那非常好办,可以考虑为每个网页创建一个独立的 Redis 计数器,并将日期添加为键(key)的后缀。当网页收到请求时,对应的计数器将被递增。对于每天的

    2024年02月08日
    浏览(90)
  • 使用Flink完成流数据统计

    所有流计算统计的流程都是: 1、接入数据源 2、进行多次数据转换操作(过滤、拆分、聚合计算等) 3、计算结果的存储 其中数据源可以是多个、数据转换的节点处理完数据可以发送到一个和多个下一个节点继续处理数据 Flink程序构建的基本单元是stream和transformation(DataSet实质

    2024年02月05日
    浏览(40)
  • 大数据-玩转数据-Flink 网站UV统计

    在实际应用中,我们往往会关注,到底有多少不同的用户访问了网站,所以另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。 对于UserBehavior数据源来说,我们直接可以根据userId来区分不同的用户。 将userid放到SET集合里面,统计集合长度,便可以统计到网

    2024年02月11日
    浏览(48)
  • Arcgis小技巧【13】——数据统计(Statistics)相关工具汇总

    在Arcgis中可以通过属性表中字段的【统计】功能或使用统计相关的工具对属性表进行数据统计。 在Arcgis工具箱中有一组【统计分析】工具集,不仅包含对属性数据执行标准统计分析(例如平均值、最小值、最大值和标准差)的工具,也包含对重叠和相邻要素计算面积、长度和

    2024年02月06日
    浏览(45)
  • nginx 每天各IP访问次数记录统计

    此文章主要介绍了,在nginx代理的情况下,统计当天IP的出现次数,并且生成表格的相关步骤 1.nginx 配置的修改 在/etc/nginx/nginx.conf 2.定时执行的shell脚本 该脚本主要在每天的0点自动执行IPStatistics.py来记录昨天的IP访问次数,然后对昨天的日志进行备份,并且备份最长时间为m

    2024年02月12日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包