PiflowX组件-ReadFromUpsertKafka

这篇具有很好参考价值的文章主要介绍了PiflowX组件-ReadFromUpsertKafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称 展示名称 默认值 允许值 是否必填 描述 例子
kafka_host KAFKA_HOST “” 逗号分隔的Kafka broker列表。 127.0.0.1:9092
topic TOPIC “” 用于写入Kafka topic名称。 topic-1
tableDefinition TableDefinition “” Flink table定义。
key_format keyFormat “” Set(“json”, “csv”, “avro”) 用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。 json
value_format ValueFormat “” Set(“json”, “csv”, “avro”) 用于对Kafka消息中value部分反序列化的格式 json
value_fields_include ValueFieldsInclude ALL Set(“ALL”, “EXCEPT_KEY”) 控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。 ALL
key_fields_prefix KeyFieldsPrefix “” 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
properties PROPERTIES “” 该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{
  "flow": {
    "name": "ReadFromUpsertKafkaTest",
    "uuid": "1234",
    "stops": [
      {
        "uuid": "5555",
        "name": "ReadFromUpsertKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "result_total_pv_uv_min",
          "key_format": "json",
          "value_format": "json",
          "value_fields_include": "ALL",
          "tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}",
          "properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"
        }
      },
      {
        "uuid": "6666",
        "name": "ShowChangeLogData1",
        "bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData",
        "properties": {
          "showNumber": "5000"
        }
      }
    ],
    "paths": [
      {
        "from": "ReadFromUpsertKafka1",
        "outport": "",
        "inport": "",
        "to": "ShowChangeLogData1"
      }
    ]
  }
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{
    "ifNotExists": true,
    "physicalColumnDefinition": [{
        "columnName": "do_date",
        "columnType": "STRING",
        "nullable": false,
        "primaryKey": true,
        "partitionKey": false,
        "comment": "统计日期"
    }, {
        "columnName": "do_min",
        "columnType": "STRING",
        "nullable": false,
        "primaryKey": true,
        "partitionKey": false,
        "comment": "统计分钟"
    }, {
        "columnName": "pv",
        "columnType": "BIGINT",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "点击量"
    }, {
        "columnName": "uv",
        "columnType": "BIGINT",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "一天内同个访客多次访问仅计算一个UV"
    }, {
        "columnName": "currenttime",
        "columnType": "TIMESTAMP",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "当前时间"
    }],
    "metadataColumnDefinition": null,
    "computedColumnDefinition": null
}

演示DEMO

PiflowX组件-ReadFromUpsertKafka,大数据,spark,big data,flink,hadoop
欢迎关注PiflowX公众号,谢谢支持!!!

PiflowX组件-ReadFromUpsertKafka,大数据,spark,big data,flink,hadoop

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客文章来源地址https://www.toymoban.com/news/detail-773130.html

到了这里,关于PiflowX组件-ReadFromUpsertKafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • PiflowX如何快速开发flink程序

    参考资料 Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com) Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计

    2024年01月16日
    浏览(32)
  • Vue组件使用(父组件监听子组件数据变化,子组件使用父组件的数据,并监听父组件的数据变化)

    父组件声明变量 父组件向子组件传递数据 Vue 数据类型 type 有以下几种: String:字符串类型。例如:“hello world”。 Number:数字类型。例如:12,1.5。 Boolean:布尔类型。例如:true,false。 Object:对象类型。例如:{name: ‘Tom’, age: 20}。 Array:数组类型。例如:[1, 2, 3]。 Fun

    2024年02月14日
    浏览(45)
  • 大数据组件有哪些?构建现代数据生态系统的组件一览

    随着数字时代的来临, 大数据 技术成为了企业获取、存储、处理和分析海量数据的关键工具。大数据组件构建了一个庞大而强大的数据生态系统,为企业提供了更深入的洞察和更智能的决策支持。本文将深入解析一些重要的大数据组件,揭示它们在现代数据处理中的关键角

    2024年04月10日
    浏览(43)
  • Vue前端框架10 组件的组成、组件嵌套关系、组件的注册方式、组件传递数据(props $emit)、数组传递多种数据类型、组件传递props校验、组件事件

    组件最大的优势就是可复用性 通常将组件定义在.vue中,也就是SFC单文件组件 组件的基本组成: 组件允许我们将UI划分为独立的、可重用的部分,并且对每个部分单独思考 实际应用中组件常常被组件成层层嵌套的树状结构 Vue组件使用前要注册,注册有两种方式:全局注册和

    2024年02月09日
    浏览(51)
  • vue父组件和子组件数据传递

    父组件: 子组件: 父组件向子组件传值方式: 1、父组件引入子组件,注册属性message 2、子组件通过props来获取到注册的属性message 页面显示:   转存失败重新上传取消 父组件: 子组件: 子组件向父组件传值方式: 1、父组件注册事件event 2、子组件由transmit事件方法,通过

    2024年02月15日
    浏览(40)
  • 【微信小程序】父组件修改子组件数据或调用子组件方法

    一、使用场景 页面中用到了自定义组件形成父子组件关系,在父组件某个特定时期想要操作子组件中的数据或方法,比如离开页面的时候清空子组件的数据。 二、方法 父组件可以通过 this.selectComponent 方法获取子组件实例对象,这样就可以直接访问组件的任意数据和方法 调

    2024年02月14日
    浏览(66)
  • Vue常用的组件库大全【前端工程师必备】【移动端、PC端(web端)、数据可视化组件库(数据大屏) 、动画组件库、3D组件库】

    1)Vant ui 🔸有赞移动 UI 组件库,支持 Vue2/3 微信小程序,支付宝小程序 https://vant-contrib.gitee.io/vant/v2/#/zh-CN/col Vant 是由有赞前端团队开发的一套基于 Vue.js 的移动端 UI 组件库,它包含了丰富的组件和功能,可以帮助开发者快速构建高质量的移动应用。Vant 以简洁易用和高质量

    2024年02月04日
    浏览(58)
  • 组件封装v-model .sync在父子组件中实现双向数据绑定 如何处理单向数据流 封装表单组件

    使用watch监听 父组件使用.sync进行数据的绑定 传值子组件时 把值赋值到data的变量中 然后监听该数据的变化 $emit抛出 父组件demo 子组件 展示效果 使用绑定对象的方式打破单向数据流实现 父组件 子组件 参考vue官方及各插件库的方案 使用计算属性来保证双向数据流 通过计算属

    2023年04月19日
    浏览(43)
  • BigTop3.2.0 大数据组件编译--组件编译

    接上篇环境准备,环境准备好以后就可以开始bigtop大数据组件编译了,建议使用科学上网方式,降低网络连接报错,主要网络报错出现在nodejs npm yarn bower等前端资源的下载上。搞定网络问题后,按本文的方式可以完成大数据相关组件的编译,编译后生成的rpm包在output目录中,

    2023年04月08日
    浏览(33)
  • 【数据可视化】(二)数据探索组件

    目录 0.简介 一、数据模式与数据组织 1、数据的定义 2、数据库的定义 3、什么是数据模式? 4、数据模式举例 5、什么是数据纲要? 6、数据组织的层次 二、矢量数据 1、什么是

    2024年02月14日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包