flink-sql对kafka数据进行清洗过滤

这篇具有很好参考价值的文章主要介绍了flink-sql对kafka数据进行清洗过滤。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

今天这篇blog主要记录使用flink-sql对kafka中的数据进行过滤。

以前对kafka数据进行实时处理时都是使用java来进行flink开发,需要创建一个工程,并且打成jar包再提交,流程固定但对于简单任务来说还是比较繁琐的。

今天我们要对logstash采集到kafka中的数据进行过滤筛选,将筛选后的数据发送给另外一个kafka topic,由于处理逻辑比较简单,使用flink自带的sql函数就可以搞定,所以我们今天就用flink-sql来解决这问题。

问题描述

我们需要筛选出ServiceA、ServiceB、ServiceC、ServiceD四个类打印出来的日志信息,并将目标信息发送到另外一个kafka topic。logstash推送到kafka中的日志格式如下,日志信息均在message字段中。

{
    "@version": "1",
    "@timestamp": "2022-11-18T08:11:33.000Z",
    "host": "localhost",
    "message": "ServiceX XXXX",
    "uid": 3081609001,
    "type": "xxx"
}

环境说明

flink 1.13.6

重要文档

flink-sql内置函数官方文档

flink kafka connector官方文档文章来源地址https://www.toymoban.com/news/detail-599327.html

实现代码

--sourceTable
CREATE TABLE omg_log(
    message VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'source-topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'group_id',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";',
    'scan.startup.mode' = 'group-offsets',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true'
);

--sinkTable
CREATE TABLE omg_log_sink (
    message VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'target-topic',
    'properties.bootstrap.servers' = 'loaclhost:9093',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";',
    'format' = 'csv'
);

--filter and insert 
INSERT INTO omg_log_sink(message)
SELECT message
FROM omg_log
where REGEXP(message,'ServiceA|ServiceB|ServiceC|ServiceD')
;

到了这里,关于flink-sql对kafka数据进行清洗过滤的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink-SQL——时态表(Temporal Table)

    这里我们需要注意一下的是虽然我们介绍的是Flink 的 Temporal Table 但是这个概念最早是在数据库中提出的 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作

    2024年01月16日
    浏览(31)
  • Flink-SQL——动态表 (Dynamic Table)

    SQL 和关系代数在设计时并未考虑流数据。因此,在关系代数(和 SQL)之间几乎没有概念上的差异。 本文会讨论这种差异,并介绍 Flink 如何在无界数据集上实现与数据库引擎在有界数据上的处理具有相同的语义。 下表比较了传统的关系代数和流处理与输入数据、执行和输出结果

    2024年01月17日
    浏览(37)
  • 【flink-sql实战】flink 主键声明与upsert功能实战

    主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的 某个(些)列 是唯一的 并且不包含 Null 值 。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。 主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,

    2024年02月03日
    浏览(29)
  • Flink-SQL join 优化 -- MiniBatch + local-global

    问题1. 近期在开发flink-sql期间,发现数据在启动后,任务总是进行重试,运行一段时间后,container heartbeat timeout,内存溢出(GC overhead limit exceede) ,作业无法进行正常工作 问题2. 未出现container心跳超时的,作业运行缓慢,超过一天 ,作业仍存在反压情况 查看日志内容发现,出

    2024年02月06日
    浏览(34)
  • FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

    本文基于flink-1.13.6 SQL Client: Init scripts and Statement Sets 这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 通过Java 应用程序执行的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能

    2023年04月27日
    浏览(31)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(42)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(40)
  • 数据清洗是什么?如何进行数据清洗?

    数据清洗是数据治理过程中非常重要的一环, 它指的是对数据进行清理、筛选、去重、格式化等操作,以确保数据质量和数据准确性。 。在本文中,我们将围绕数据清洗展开讨论,并介绍一些数据清洗 相关 技术。 一、 数据清洗的概念 数据清洗是指对数据进行处理和加工,

    2024年02月03日
    浏览(35)
  • 使用kettle进行数据清洗

    申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址 全文共计2175字,阅读大概需要3分钟 本实验任务主要完成基于ubuntu环境的使用kettle进行数据清洗的工作。通过完成本实验任务,要求学生熟练掌握使用kettle进行数据清洗的方法,为后续实验的开展奠定ETL平

    2023年04月20日
    浏览(26)
  • BDA初级分析——SQL清洗和整理数据

    一、数据处理 数据处理之类型转换 字符格式与数值格式存储的数据,同样是进行大小排序, 会有什么区别? 以rev为例,看看字符格式与数值格式存储时,排序会有什么区别? 用cast as转换为字符后进行排序 99.1982.1? 字符串比较大小是逐位来比较的 Cast as 按...分组 作用:对

    2024年02月12日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包