【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置

这篇具有很好参考价值的文章主要介绍了【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。

SQL 提示一般可以用于以下:

  • 增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行;
  • 增加元数据(或者统计信息):如"已扫描的表索引"和"一些混洗键(shuffle keys)的倾斜信息"的一些统计数据对于查询来说是动态的,用提示来配置它们会非常方便,因为我们从 planner
    获得的计划元数据通常不那么准确;
  • 算子(Operator)资源约束:在许多情况下,我们会为执行算子提供默认的资源配置,即最小并行度或托管内存(UDF 资源消耗)或特殊资源需求(GPU 或 SSD 磁盘)等,可以使用 SQL 提示非常灵活地为每个查询(非作业)配置资源

 文章来源地址https://www.toymoban.com/news/detail-858073.html

一. create table hints

动态表选项允许动态地指定或覆盖表选项,不同于用 SQL DDL 或 连接 API 定义的静态表选项,这些选项可以在每个查询的每个表范围内灵活地指定。

因此,它非常适合用于交互式终端中的特定查询,例如,在 SQL-CLI 中,你可以通过添加动态选项/*+ OPTIONS('csv.ignore-parse-errors'='true') */来指定忽略 CSV 源的解析错误。

 

1. 语法

为了不破坏 SQL 兼容性,我们使用 Oracle 风格的 SQL hints 语法:

table_path /*+ OPTIONS(key=val [, key=val]*) */

key: string字符
val: string字符

 

2. 示例


CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);

-- `覆盖`查询语句中源表的选项
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

-- 覆盖 join 中源表的选项
select * from
    kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
    join
    kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
    on t1.id = t2.id;

-- 覆盖插入语句中结果表的选项
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

 

3. 注意

create table hints 传递的连接器中catalog的相关参数,即create table with下参数,具体到源代码是:context.getCatalogTable().getOptions()

 

如果传参无效且在日志中看到参数已经设置成功,那

可能将context.getConfiguration()中的参数传递到with参数下,比如:
hive连接器下:table.exec.hive.sink.statistic-auto-gather.enable 参数由DefaultDynamicTableContext的configuration来接收。此参数为flink sql的全局参数,此时可以通过set table.exec.hive.sink.statistic-auto-gather.enable=false 语法来设定参数。

 

二. 实战:简化hive连接器参数设置

对于hive连接器,Flink实现了通过catalog的方式来管理hive表,在使用hive表时需要使用hive相关语法,此时需要声明,hive dialect,如下:


CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'aaa',
    'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);

SET table.sql-dialect=hive;

-- 因为需要使用hive连接器中的写特性,所以需要create table ,此时sql语法为hive语法
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- 对于某些框架例如chunjun,此处不能很好的适配:
--
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列声明 watermark。
) WITH (...);

-- streaming sql, insert into hive table
INSERT INTO TABLE myhive.aaa.hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;

如下可以把写hive的一些行为通过sql hint方式,放到Flink sql语句中,如下整个Flink sql 会清爽很多。

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'database_name',
    'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);


CREATE TABLE source_kafka (

    `pv` string,
    `uv` string,
    `p_day_id` string
) WITH (
      'connector' = 'kafka-x'
      ,'topic' = 'hive_kafka'
      ,'properties.bootstrap.servers' = 'xxx:9092'
      ,'properties.group.id' = 'luna_g'
      ,'scan.startup.mode' = 'earliest-offset'
      ,'json.timestamp-format.standard' = 'SQL'
      ,'json.ignore-parse-errors' = 'true'
      ,'format' = 'json'
      ,'scan.parallelism' = '1'
      );

insert into 
 myhive.database_name.table_name /*+ OPTIONS('partition.time-extractor.timestamp-pattern'='$p_day_id:00:00','sink.partition-commit.policy.kind'='metastore,success-file','sink.partition-commit.success-file.name'='_SUCCESS_gao111') */
    select *  from source_kafka; 

 

三. select hints(ing)

查询提示(Query Hints)用于为优化器修改执行计划提供建议,该修改只能在当前查询提示所在的查询块中生效(Query block, 什么是查询块)。 目前,Flink 查询提示只支持联接提示(Join Hints)。

具体见:官网

https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/queries/hints/#%E6%9F%A5%E8%AF%A2%E6%8F%90%E7%A4%BA

 

到了这里,关于【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink-sql实战】flink 主键声明与upsert功能实战

    主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的 某个(些)列 是唯一的 并且不包含 Null 值 。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。 主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,

    2024年02月03日
    浏览(42)
  • 【SQL开发实战技巧】系列(八):聊聊如何插入数据时比约束更灵活的限制数据插入以及怎么一个insert语句同时插入多张表

    【SQL开发实战技巧】系列(一):关于SQL不得不说的那些事 【SQL开发实战技巧】系列(二):简单单表查询 【SQL开发实战技巧】系列(三):SQL排序的那些事 【SQL开发实战技巧】系列(四):从执行计划讨论UNION ALL与空字符串UNION与OR的使用注意事项 【SQL开发实战技巧】系列

    2024年01月15日
    浏览(53)
  • (增加细粒度资源管理)深入理解flink的task slot相关概念

    之前对flink的task slot的理解太浅了,重新捋一下相关知识点 我们知道,flink中每个TaskManager都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask(线程)。 但是TaskManager 的计算资源是有限的,并不是所有任务都可以放在同一个 TaskManager 上并行执行。并行的任务越多

    2024年03月11日
    浏览(41)
  • 达梦sql执行计划、HINT、索引简单应用

    目录 收集统计信息. 3 1. 通过DBMS_STATS包中的方法. 3 2、删除指定表的统计信息. 3 执行计划. 3 常用执行计划操作符. 4 统计指定sql 执行号的所有操作符的执行时间 . 5 HINT 5 并行操作:. 6 查询计划重用、结果集重用. 7 示例. 8 1、收集统计信息:. 8 3、对sql搜集统计信息. 9 2、添加

    2024年02月15日
    浏览(33)
  • 细粒度图像分类模型(含实战代码)

    来源:投稿 作者:lsc  编辑:学姐 1.1细粒度图片分类特点 可判别区域往往只是在图像中很小的一块区域内。 1.2细粒度图像分类数据集 1.3细粒度图像分类竞赛 1.4细粒度图像分类模型分类: (1)强监督模型: 需要类别以外的标签进行监督 (2)弱监督模型: 不需要类别以外的标签 P

    2024年02月07日
    浏览(45)
  • 大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)

    每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。 1.统计每个商品的点击量, 开窗 2.分组窗口分组 3.over窗口 3.1、创建数据源示例 input/UserBehavior.csv 3.2、创建目标表 3.3、导入JDBC Connector依赖 3.4、代码实现 执行结果: Flink 使用 OVER 窗口条件和过滤条件相结合

    2024年02月07日
    浏览(37)
  • Flink Table/Sql自定义Kudu Sink实战(其它Sink可参考)

    使用第三方的org.apache.bahir » flink-connector-kudu,batch模式写入数据到Kudu会有FlushMode相关问题 具体可以参考我的这篇博客通过Flink SQL操作创建Kudu表,并读写Kudu表数据 Flink的Dynamic table能够统一处理batch和streaming 实现自定义Source或Sink有两种方式: 通过对已有的connector进行拓展。比

    2024年02月14日
    浏览(46)
  • flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

    ⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。 ⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。 经过测试 在fl

    2024年02月22日
    浏览(51)
  • 【设计模式与范式:行为型】62 | 职责链模式(上):如何实现可灵活扩展算法的敏感信息过滤框架?

    前几节课中,我们学习了模板模式、策略模式,今天,我们来学习职责链模式。这三种模式具有相同的作用:复用和扩展,在实际的项目开发中比较常用,特别是框架开发中,我们可以利用它们来提供框架的扩展点,能够让框架的使用者在不修改框架源码的情况下,基于扩展

    2024年02月10日
    浏览(45)
  • 【Oracle】设置FGA(Fine-Grained Audit)细粒度审计

    【声明】文章仅供学习交流,观点代表个人,与任何公司无关。 编辑|SQL和数据库技术(ID:SQLplusDB) 收集Oracle数据库内存相关的信息 【Oracle】ORA-32017和ORA-00384错误处理 FGA(Fine-Grained Audit)细粒度审计是Oracle提供的一种数据库审计方法,用于创建定制的审计设置。 可以通过调用Or

    2024年01月24日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包