flink学习35:flinkSQL查询mysql

这篇具有很好参考价值的文章主要介绍了flink学习35:flinkSQL查询mysql。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

总览:     import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions}

object sqlQueryTable {
  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //set parallelism
    env.setParallelism(1)

    //env setting
    val envSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //create table env
    val tableEnv = StreamTableEnvironment.create(env, envSettings)

    //create flink table
    val flink_table_sql =
      """
        |create table student2_flink_table
        |(
        |  code varchar(20) null,
        |  name varchar(20) null,
        | score int null
        |)with(
        |'connector.type'='jdbc',
        |'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
        |'connector.table'='student2',
        |'connector.driver'='com.mysql.jdbc.Driver',
        |'connector.username'='root',
        |'connector.password'='123456'
        |)
        |""".stripMargin

    //execute create flink table sql
    tableEnv.executeSql(flink_table_sql)

    //register table
    val myStudent_Flink_Table = tableEnv.from("student2_flink_table")

    //query table
    val result = tableEnv.sqlQuery(s"select * from $myStudent_Flink_Table where code < 10")

    //print
    //result.toRetractStream[(String,String)].print()
    result.toAppendStream[(String,String,Int)].print()

    //create view
    tableEnv.createTemporaryView("student2_flink_view", myStudent_Flink_Table)

    //query use sql
    val querySQL =
      """
        |select code,
        |name,
        |sum(score) as total_score
        |from student2_flink_view
        |group by code, name
        |""".stripMargin

    //query
    val total_score_resut = tableEnv.sqlQuery(querySQL)

    //print
    total_score_resut.toRetractStream[(String,String,Int)].print("view-")

    //execute
    env.execute()

  }

}

toAppendStream 和  toRetractStream区别

toAppendStream 的输出结果

flink学习35:flinkSQL查询mysql

 

toRetractStream的输出结果

flink学习35:flinkSQL查询mysql

 感觉 toRetractStream支持更新

  从输出得结果看,每条结果前都会有true,当接收到新得数据时会更新原先得数据,并在原先得数据前面标记false,也就是失效或者作废得意思,从而得到新得数据,到此应该也能很清晰得区分 toAppendStream与toRetractStream的区别了文章来源地址https://www.toymoban.com/news/detail-422033.html

到了这里,关于flink学习35:flinkSQL查询mysql的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink:FlinkSql解析嵌套Json

    日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下 1,数据是网上摘抄,但包含里常用的大部分格式 {     \\\"afterColumns\\\": {         \\\"created\\\": \\\"1589186680\\\",         \\\"extra\\\": {             \\\"

    2023年04月09日
    浏览(30)
  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(49)
  • 【Flink系列七】TableAPI和FlinkSQL初体验

    Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理

    2024年02月03日
    浏览(33)
  • 【Flink】FlinkSQL中执行计划以及如何用代码看执行计划

    FilnkSQL怎么查询优化 Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化,例如: • 基于 Apache Calcite 的子查询解相关 • 投影剪裁 • 分区剪裁 • 过滤器下推 • 子计划消除重复数据以避免重复计算 • 特殊子查询重写,包括两部

    2023年04月11日
    浏览(47)
  • 一、Flink使用异步算子+线程池查询MySQL

    目录 Flink异步算子使用介绍 使用Flink异步算子+多线程异步查询MySQL 相关阅读 1 Flink使用异步算子请求高德地图获取位置信息 1、概述 1)Flink异步算子使用介绍 1.异步与同步概述 同步:向数据库发送一个请求然后一直等待,直到收到响应。在许多情况下,等待占据了函数运行的

    2024年02月14日
    浏览(37)
  • Flink程序富函数中使用定时任务查询mysql

    问题描述 具体案例 在open方法中的案例 说明 例如1: 例如2 (重点关注代码中的while循环那块,会产生内存堆积就在这块): 解释:

    2024年02月14日
    浏览(26)
  • FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

    本文基于flink-1.13.6 SQL Client: Init scripts and Statement Sets 这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 通过Java 应用程序执行的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能

    2023年04月27日
    浏览(37)
  • flinksql kafka到mysql累计指标练习

    数据流向:kafka -kafka -mysql 模拟写数据到kafka topic:wxt中 kafka topic :wxt1 kafka topic :wxt2 mysql结果数据: pom文件

    2024年02月08日
    浏览(24)
  • 机器学习总览

    2024年02月13日
    浏览(37)
  • 机器学习-隐私保护总览

    这段时间有项目在进行,所以对斯坦福Dan Boneh密码学的阅读进度有所放缓,之后会继续更新,这段时间对当前机器学习领域隐私保护的方向做了一点小总结。 近年来,隐私保护机器学习的研究方向大致可以分为三类 : 一是以k -匿名 为代表的基于等价类的方法 。 二是以差分

    2023年04月15日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包