使用Flink1.16.0的SQLGateway迁移Hive SQL任务

这篇具有很好参考价值的文章主要介绍了使用Flink1.16.0的SQLGateway迁移Hive SQL任务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用Flink的SQL Gateway迁移Hive SQL任务

前言

我们有数万个离线任务,主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务,当然也有PySpark、打Jar包的Spark和打Jar包的Flink任务这种高成本的任务【Java和Scala都有】。毕竟SQL上手门槛极低,是个人都能写几下并且跑起来,还可以很容易看到run成功的数据长得像不像。其实HQL任务的性能并不会好到哪里去,主要是SQL Boy便宜,无脑堆人天就可以线性提升开发速度。DataPhin的底层基本可以确认就是beeline -f包了一层,而它本身作为二级队列,并不是真正意义上的网关。

我们之前做大数据基础平台时,也有为数据中台租户部署Kyuubi这个网关组件。

Apache Kyuubi:https://kyuubi.apache.org/

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

这货现在发育的灰常好:

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

已经不局限于一个霸占Yarn的资源锁定一个Session ID,然后提交Spark任务了。。。这货现在还可以支持Flink和Hudi。。。湖仓一体就需要这货。

燃鹅,新版Flink1.16.0新增了一个和Kyuubi、Spark、Tez抢饭碗的重磅功能:SQL Gateway:

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

众所周知,Flink的SQL和标准Hive SQL不太一样,新版Flink主动向Hive的dialect看齐:

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

从而提高了堆HQL的兼容性。官方号称可以97%的HQL任务无需修改直接迁移到Flink!!!还是比较唬人的。

常规的Spark SQL:https://lizhiyong.blog.csdn.net/article/details/120064874

只是让Spark去读Hive9083端口MetaStore的元数据,SQL解析AST、CBO优化和Task执行都是Spark的Catalyst负责。

Hive On Tez【或者MR、Spark】:https://lizhiyong.blog.csdn.net/article/details/123436630

这种方式只是Hive把解析完的任务提交给不同的计算引擎去具体运算。但是很少有听说过Hive On Flink【虽然翻Hive的源码好像可以去实现它】。

所以本文重点就是这个Hive On Flink。用流批一体的运算引擎去跑批也是个有趣的事情。有生之年有望看到Flink一统江湖了。。。

Hive On Flink原理

新增的支持

Hive任务能使用Flink来跑,Flink当然是做了很多支持:

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

Hive的MetaStore在大数据领域的地位相当于K8S在云原生容器编排领域的地位,或者Alluxio在云原生存算分离架构统一存储层的地位,都是事实上的标准了。能解析Hive的Metastore就可以管理Hadoop集群绝大多数的Hive表了。。。当然Hudi的一些表、Flink的一些SQL流式表也可能被管控到。

而支持Hive的UDF,天然就拥有了Hive的那几百个系统函数:https://lizhiyong.blog.csdn.net/article/details/127501392

当然就可以减少很多写UDF的平台组件二开攻城狮或者部分资深SQL Boy的工作量。UDF函数们是公司的资产,轻易不可以弃用的。

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

作为一个运算引擎,在Source端和Sink端都支持流式和批式操作Hive表,毫不意外。还可以自动小文件合并,有点像Hudi的Merge On Read这种写多读少的模式了。

SQL解析

在SQL Boy们眼里最重要的SQL,其实在Java和C#种也就是个普通的String字符串,走JDBC传参或者ADO.NET,如果是开发个AD Hoc即席查询平台,单从功能角度,其实都不需要关心租户们传的select语句的具体内容。但是执行引擎必须能把SQL字符串给解析成具体的执行计划或者底层任务。

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

Flink1.16.0使用了这么一个可插拔的插件,将HQL解析为Logical Plan逻辑计划。后续的ROB、CBO优化生成Physical Plan物理计划,还有转换为Flink最终的Job Graph都是与普通的Blink执行套路一致。

效果

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

可以满足大部分应用场景了。

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

命令行和API、运行时、底层资源调度,都可以实现一致,运维起来应该要方便不少。

Gateway

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

Flink自带了Flink SQL Gateway,显而易见的好处是平台和组件二开人员不需要去自己写Gateway去Dispatch分发任务了,甚至二级调度都可以省了。。。

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

本身后端就可以多租户了。。。还可以支持多种Cluster,K8S和Yarn或者Docker的Standalone混合云考虑一下???

前端支持Rest和Hive Server2,对Java开发人员和SQL Boy们都很友好。

HS2Endpoint

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

有点区别:

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

优势

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

尤其是处理异构数据源:

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

优势很明显。做联邦查询的改动也只是需要+个Catalog。

Demo

FFA2022的罗宇侠&方盛凯两位大佬带来个Demo,展示了Flink如何使用Hive和Flink的dialect分别按流式和批式跑任务。

为了方便查看,笔者手动敲出来了:

流式

建表:

--创建目标表
create table if not exists dwd_category_by_day(
	`i_category` string,
	`cate_sales` double,
	`cayehory_day_order_cnt` bigint
) 
partitioned by (
	`year` bigint,
	`day` bigint
)
TBLPROPERTIES(
	'sink.partition-commit.policy.kind'='metastore,success-file'
)
;

--创建源表
set table.sql-dialect=default;

create table if not exists s_dwd_store_sales(
	`ss_item_sk` bigint,
	`i_brand` string,
	`i_class` string,
	`i_category` string,
	`ss_sales_price` double,
	`d_date` date,
	`d_timestamp` as cast(d_date as timestamp(3)),
	watermark for `d_timestamp` as `d_timestamp`
) with (
	'connector'='kafka',
	'topic'='dwd_store_sales',
	'properties.bootstrap.servers'='192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092',
	'properties.group.id'='FFA',
	'key.fields'='ss_item_sk',
	'scan.startup_mode'='earlist-offset',
	'key.format'='json',
)
;

根据Demo的建表DDL,可以看出按照Hive语法建表时,Flink需要设置表的属性。

而使用传统Flink的语法建流式表时,反倒需要手动指定dialect。说明默认的dialect其实是:

set table.sql-dialect=hive;

每日类销量以及订单数统计:

set table.sql-dialect=default;
set execution.runtime-mode=streaming;
set table.cml-sync=false;--异步提交作业

--开启检查点
set execution.checkpointing.interval=30s;

insert into dwd_category_by_day
select
	i_category,
	sum(ss_sales_price) as month_sales,
	count(1) as order_cnt,
	year(window_start) as `year`,
	dayofyear(window_start) as `day`
from TABLE(
	TUMBLE(
		TABLE s_dwd_store_sales,DESCRIPTOR(d_timestamp),INTERVAL '1' DAY
	)
)
group by
	window_start,
	window_end,
	i_category
;

流式的SQL需要设置滑动的时间窗口,貌似没啥子毛病。

销量最佳Top3:

set table.sql_dialect=default;

select
	i_category,
	categoru_day_order_cnt,
	rownum
from(
	select
		i_category,
		categoru_day_order_cnt,
		row_number() over (order by categoru_day_order_cnt desc) as rownum
	from
		dwd_category_by_day
)
where
	rownum<=3
;

Flink的SQL不用像Hive的SQL那样每个子查询都要起别名【Spark SQL也不用】,太棒了!!!

可以看到流式的SQL任务,开发成本肯定比Java和Scala写DataStreaming算子低!!!利好SQL Boy。

批式

desc tpcds_bin_orc_2.dwd_store_sales;

这个表2位大佬已经灌过数据,根据表结构,笔者大概知道大概也是长这样:

create table if not exists tpcds_bin_orc_2.dwd_store_sales(
	`ss_item_sk` bigint,
	`i_brand` string,
	`i_class` string,
	`i_category` string,
	`ss_sales_price` double
)
partitioned by (
	`d_date` date
)
;

每日大类销量以及订单数统计:

insert overwrite dwd_category_by_day
select
	i_category,
	sum(ss_sales_price) as month_sales,
	count(1) as order_cnt,
	year(d_date) as `year`,
	datediff(d_date,concat(year(d_date)-1,'-12-31'))
from
	tpcds_bin_orc_2.dwd_store_sales
group by
	year(d_date),
	datediff(d_date,concat(year(d_date)-1,'-12-31')),
	i_category
;

销量最佳Top3:

select
	i_category,
	categoru_day_order_cnt,
	rownum
from(
	select
		i_category,
		categoru_day_order_cnt,
		row_number() over (order by categoru_day_order_cnt desc) as rownum
	from
		dwd_category_by_day
)
where
	rownum<=3
;

可以看到批式的SQL任务由于数据不会在运算时发生变化,不用考虑各种事件时间和水位线还有滑动时间窗口,直接替换即可,更简单!!!

宣传的97%HQL任务可以不加改动,直接迁移到Flink,还算有希望的。不过底层做了什么惊天地泣鬼神的大事,对于只会写业务脚本的SQL Boy们来说,也无关痛痒。

Github参考资料

Flink sql Gateway有个Github地址:https://github.com/ververica/flink-sql-gateway

作者Ververica:https://www.ververica.com/

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

它就是Flink的公司。

Github的这个Flink sql Gateway貌似很久没有更新了。。。但是它毕竟只是与BE交互的FE,还是可以参考。

启动Gateway

./bin/sql-gateway.sh -h

The following options are available:
     -d,--defaults <default configuration file>   The properties with which every new session is initialized. 
                                                  Properties might be overwritten by session properties.
     -h,--help                                    Show the help message with descriptions of all options.
     -j,--jar <JAR file>                          A JAR file to be imported into the session. 
                                                  The file might contain user-defined classes needed for 
                                                  statements such as functions, the execution of table sources,
                                                  or sinks. Can be used multiple times.
     -l,--library <JAR directory>                 A JAR file directory with which every new session is initialized. 
                                                  The files might contain user-defined classes needed for 
                                                  the execution of statements such as functions,
                                                  table sources, or sinks. Can be used multiple times.
     -p,--port <service port>                     The port to which the REST client connects to.

下Flink集群有这个角标。

典型的yaml

默认的配置文件:

# Define server properties.

server:
  bind-address: 127.0.0.1           # optional: The address that the gateway binds itself (127.0.0.1 by default)
  address: 127.0.0.1                # optional: The address that should be used by clients to connect to the gateway (127.0.0.1 by default)
  port: 8083                        # optional: The port that the client connects to  (8083 by default)
  jvm_args: "-Xmx2018m -Xms1024m"   # optional: The JVM args for SQL gateway process


# Define session properties.

session:
  idle-timeout: 1d                  # optional: Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero. The minimum unit is in milliseconds. (1d by default)
  check-interval: 1h                # optional: The check interval for session idle timeout, which can be disabled by setting to zero. The minimum unit is in milliseconds. (1h by default)
  max-count: 1000000                # optional: Max count of active sessions, which can be disabled by setting to zero. (1000000 by default)


# Define tables here such as sources, sinks, views, or temporal tables.

tables:
  - name: MyTableSource
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/path/to/something.csv"
    format:
      type: csv
      fields:
        - name: MyField1
          type: INT
        - name: MyField2
          type: VARCHAR
      line-delimiter: "\n"
      comment-prefix: "#"
    schema:
      - name: MyField1
        type: INT
      - name: MyField2
        type: VARCHAR
  - name: MyCustomView
    type: view
    query: "SELECT MyField2 FROM MyTableSource"

# Define user-defined functions here.

functions:
  - name: myUDF
    from: class
    class: foo.bar.AggregateUDF
    constructor:
      - 7.6
      - false

# Define available catalogs

catalogs:
   - name: catalog_1
     type: hive
     property-version: 1
     hive-conf-dir: ...
   - name: catalog_2
     type: hive
     property-version: 1
     default-database: mydb2
     hive-conf-dir: ...
     hive-version: 1.2.1


# Properties that change the fundamental execution behavior of a table program.

execution:
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
  current-catalog: catalog_1        # optional: name of the current catalog of the session ('default_catalog' by default)
  current-database: mydb1           # optional: name of the current database of the current catalog
                                    #   (default database of the current catalog by default)


# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  response-timeout: 5000

支持的语法

statement comment
SHOW CATALOGS List all registered catalogs
SHOW DATABASES List all databases in the current catalog
SHOW TABLES List all tables and views in the current database of the current catalog
SHOW VIEWS List all views in the current database of the current catalog
SHOW FUNCTIONS List all functions
SHOW MODULES List all modules
USE CATALOG catalog_name Set a catalog with given name as the current catalog
USE database_name Set a database with given name as the current database of the current catalog
CREATE TABLE table_name … Create a table with a DDL statement
DROP TABLE table_name Drop a table with given name
ALTER TABLE table_name Alter a table with given name
CREATE DATABASE database_name … Create a database in current catalog with given name
DROP DATABASE database_name … Drop a database with given name
ALTER DATABASE database_name … Alter a database with given name
CREATE VIEW view_name AS … Add a view in current session with SELECT statement
DROP VIEW view_name … Drop a table with given name
SET xx=yy Set given key’s session property to the specific value
SET List all session’s properties
RESET ALL Reset all session’s properties set by SET command
DESCRIBE table_name Show the schema of a table
EXPLAIN PLAN FOR … Show string-based explanation about AST and execution plan of the given statement
SELECT … Submit a Flink SELECT SQL job
INSERT INTO … Submit a Flink INSERT INTO SQL job
INSERT OVERWRITE … Submit a Flink INSERT OVERWRITE SQL job

功能还算齐全。

Beeline

beeline> !connect jdbc:flink://localhost:8083?planner=blink

Beeline version 2.2.0 by Apache Hive
beeline> !connect jdbc:flink://localhost:8083?planner=blink
Connecting to jdbc:flink://localhost:8083?planner=blink
Enter username for jdbc:flink://localhost:8083?planner=blink: 
Enter password for jdbc:flink://localhost:8083?planner=blink: 
Connected to: Apache Flink (version 1.10.0)
Driver: Flink Driver (version 0.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:flink://localhost:8083> CREATE TABLE T(
. . . . . . . . . . . . . . . >   a INT,
. . . . . . . . . . . . . . . >   b VARCHAR(10)
. . . . . . . . . . . . . . . > ) WITH (
. . . . . . . . . . . . . . . >   'connector.type' = 'filesystem',
. . . . . . . . . . . . . . . >   'connector.path' = 'file:///tmp/T.csv',
. . . . . . . . . . . . . . . >   'format.type' = 'csv',
. . . . . . . . . . . . . . . >   'format.derive-schema' = 'true'
. . . . . . . . . . . . . . . > );
No rows affected (0.158 seconds)
0: jdbc:flink://localhost:8083> INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello');
No rows affected (4.747 seconds)
0: jdbc:flink://localhost:8083> SELECT * FROM T;
+----+--------+--+
| a  |   b    |
+----+--------+--+
| 1  | Hi     |
| 2  | Hello  |
+----+--------+--+
2 rows selected (0.994 seconds)
0: jdbc:flink://localhost:8083> 

这是比较老的语法了,传统的Flink SQL。

JDBC

当然可以使用Java走JDBC调用:

Jar包:https://github.com/ververica/flink-jdbc-driver/releases

Demo:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class Sample {
	public static void main(String[] args) throws Exception {
		Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
		Statement statement = connection.createStatement();

		statement.executeUpdate("CREATE TABLE T(\n" +
			"  a INT,\n" +
			"  b VARCHAR(10)\n" +
			") WITH (\n" +
			"  'connector.type' = 'filesystem',\n" +
			"  'connector.path' = 'file:///tmp/T.csv',\n" +
			"  'format.type' = 'csv',\n" +
			"  'format.derive-schema' = 'true'\n" +
			")");
		statement.executeUpdate("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')");
		ResultSet rs = statement.executeQuery("SELECT * FROM T");
		while (rs.next()) {
			System.out.println(rs.getInt(1) + ", " + rs.getString(2));
		}

		statement.close();
		connection.close();
	}
}

传统的Flink SQL就是这么写。。。相当古老了。。。

Shell脚本

启动sql gateway的shell较新版本:

function usage() {
  echo "Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]"
  echo "  commands:"
  echo "    start               - Run a SQL Gateway as a daemon"
  echo "    start-foreground    - Run a SQL Gateway as a console application"
  echo "    stop                - Stop the SQL Gateway daemon"
  echo "    stop-all            - Stop all the SQL Gateway daemons"
  echo "    -h | --help         - Show this help message"
}

################################################################################
# Adopted from "flink" bash script
################################################################################

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

################################################################################
# SQL gateway specific logic
################################################################################

ENTRYPOINT=sql-gateway

if [[ "$1" = *--help ]] || [[ "$1" = *-h ]]; then
  usage
  exit 0
fi

STARTSTOP=$1

if [ -z "$STARTSTOP" ]; then
  STARTSTOP="start"
fi

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  usage
  exit 1
fi

# ./sql-gateway.sh start --help, print the message to the console
if [[ "$STARTSTOP" = start* ]] && ( [[ "$*" = *--help* ]] || [[ "$*" = *-h* ]] ); then
  FLINK_TM_CLASSPATH=`constructFlinkClassPath`
  SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
  "$JAVA_RUN"  -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.gateway.SqlGateway "${@:2}"
  exit 0
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${@:2}"
else
    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${@:2}"
fi

有空的时候,可以从这个脚本找到入口类【org.apache.flink.table.gateway.SqlGateway】继续钻研。。。

Java类

入口类就是这个:

package org.apache.flink.table.gateway;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.cli.SqlGatewayOptions;
import org.apache.flink.table.gateway.cli.SqlGatewayOptionsParser;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.session.SessionManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/** Main entry point for the SQL Gateway. */
public class SqlGateway {

    private static final Logger LOG = LoggerFactory.getLogger(SqlGateway.class);

    private final List<SqlGatewayEndpoint> endpoints;
    private final Properties dynamicConfig;
    private final CountDownLatch latch;

    private SessionManager sessionManager;

    public SqlGateway(Properties dynamicConfig) {
        this.endpoints = new ArrayList<>();
        this.dynamicConfig = dynamicConfig;
        this.latch = new CountDownLatch(1);
    }

    public void start() throws Exception {
        DefaultContext context =
                DefaultContext.load(ConfigurationUtils.createConfiguration(dynamicConfig));
        sessionManager = new SessionManager(context);

        sessionManager.start();
        SqlGatewayService sqlGatewayService = new SqlGatewayServiceImpl(sessionManager);

        try {
            endpoints.addAll(
                    SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
                            sqlGatewayService, context.getFlinkConfig()));
            for (SqlGatewayEndpoint endpoint : endpoints) {
                endpoint.start();
            }
        } catch (Throwable t) {
            LOG.error("Failed to start the endpoints.", t);
            throw new SqlGatewayException("Failed to start the endpoints.", t);
        }
    }

    public void stop() {
        for (SqlGatewayEndpoint endpoint : endpoints) {
            stopEndpointSilently(endpoint);
        }
        if (sessionManager != null) {
            sessionManager.stop();
        }
        latch.countDown();
    }

    public void waitUntilStop() throws Exception {
        latch.await();
    }

    public static void main(String[] args) {
        startSqlGateway(System.out, args);
    }

    @VisibleForTesting
    static void startSqlGateway(PrintStream stream, String[] args) {
        SqlGatewayOptions cliOptions = SqlGatewayOptionsParser.parseSqlGatewayOptions(args);

        if (cliOptions.isPrintHelp()) {
            SqlGatewayOptionsParser.printHelpSqlGateway(stream);
            return;
        }

        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        SqlGateway gateway = new SqlGateway(cliOptions.getDynamicConfigs());
        try {
            Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway));
            gateway.start();
            gateway.waitUntilStop();
        } catch (Throwable t) {
            // User uses ctrl + c to cancel the Gateway manually
            if (t instanceof InterruptedException) {
                LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down.");
                return;
            }
            // make space in terminal
            stream.println();
            stream.println();

            if (t instanceof SqlGatewayException) {
                // Exception that the gateway can not handle.
                throw (SqlGatewayException) t;
            } else {
                LOG.error(
                        "SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
                        t);
                throw new SqlGatewayException(
                        "Unexpected exception. This is a bug. Please consider filing an issue.", t);
            }
        } finally {
            gateway.stop();
        }
    }

    private void stopEndpointSilently(SqlGatewayEndpoint endpoint) {
        try {
            endpoint.stop();
        } catch (Exception e) {
            LOG.error("Failed to stop the endpoint. Ignore.", e);
        }
    }

    // --------------------------------------------------------------------------------------------

    private static class ShutdownThread extends Thread {

        private final SqlGateway gateway;

        public ShutdownThread(SqlGateway gateway) {
            this.gateway = gateway;
        }

        @Override
        public void run() {
            // Shutdown the gateway
            System.out.println("\nShutting down the Flink SqlGateway...");
            LOG.info("Shutting down the Flink SqlGateway...");

            try {
                gateway.stop();
            } catch (Exception e) {
                LOG.error("Failed to shut down the Flink SqlGateway: " + e.getMessage(), e);
                System.out.println("Failed to shut down the Flink SqlGateway: " + e.getMessage());
            }

            LOG.info("Flink SqlGateway has been shutdown.");
            System.out.println("Flink SqlGateway has been shutdown.");
        }
    }
}

等有空的时候再研究。

总结

从Flink1.16.0开始,就可以使用Hive On Flink了,SQL Boy们可以依旧只关心所谓的逻辑,只写几个Join。平台和组件二开人员可以尝试下Sql Gateway的方式了,简化Spark的Thrift Server和Hive的Hive Server2,架构简单化以后,组件运维起来应该要容易一些。暂时不清楚Hive On Flink和Spark SQL在性能上的区别,还停留在Flink1.13老版本不敢吃螃蟹的公司也可以先吃瓜,看看大白鼠们直接上生产环境的稳定性来判断这个特性是否GA。

Apache Flink的公众号还是有不少干货,灰常适合笔者这样的学徒工观摩和学习。

Flink1.16.0基本是2022年的收官之作了。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/128195438

使用Flink1.16.0的SQLGateway迁移Hive SQL任务文章来源地址https://www.toymoban.com/news/detail-403506.html

到了这里,关于使用Flink1.16.0的SQLGateway迁移Hive SQL任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink1.14 sql基础语法(二) flink sql表定义详解

    每一个表的标识由 3 部分组成: catalog name (常用于标识不同的“源”,比如 hive catalog,inner catalog 等) database name(通常语义中的“库”) table name (通常语义中的“表”) 1个flinksql程序在运行时,tableEnvironment 通过持有一个 map 结构来记录所注册的 catalog; Flinksql中的表,可以是 vi

    2024年02月14日
    浏览(39)
  • flink1.14 sql基础语法(一) flink sql表查询详解

    语法示例: flink从1.13开始,提供了时间窗口聚合计算的TVF语法。 表值函数的使用约束: (1)在窗口上做 分组聚合 ,必须带上window_start 和 window_end 作为分组的key; (2)在窗口上做 topn计算 ,必须带上window_start 和 window_end 作为partition的key; (3)带条件的 join ,必须包含

    2024年02月06日
    浏览(42)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月11日
    浏览(51)
  • Flink1.14提交任务报错classloader.check-leaked-classloader问题解决

    我的hadoop版本是3.1.3,Flink版本是1.14。不知道是hadoop版本的原因还是Flink版本更新的原因。当我运行一个简单的Flink测试时,虽然结果出来了但是后面还跟着一段报错信息。 测试命令: flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar 报错信息: Trying to acce

    2024年02月11日
    浏览(39)
  • Flink SQL Hive Connector使用场景

    目录 1.介绍 2.使用 2.1注册HiveCatalog 2.2Hive Read 2.2.1流读关键配置 2.2.2示例

    2024年02月06日
    浏览(42)
  • 实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

    实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的 OLAP 分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。本文会分享基于 Flink

    2024年02月16日
    浏览(42)
  • flink1.18.0 macos sql-client.sh启动报错

    2024年01月23日
    浏览(40)
  • 基于数据湖的流批一体:flink1.15.3与Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    前言:为实现基于数据湖的流批一体,采用业内主流技术栈hudi、flink、CDH(hive、spark)。flink使用sql client与hive的catalog打通,可以与hive共享元数据,使用sql client可操作hive中的表,实现批流一体;flink与hudi集成可以实现数据实时入湖;hudi与hive集成可以实现湖仓一体,用flink实

    2024年02月12日
    浏览(55)
  • iOS16.0:屏幕旋转

    此文写于2022年08月03日,距离 iOS16 正式版推出还有一个多月的时间, iOS16 beta 版本有很多API的修改,今天讨论的是屏幕旋转,基于 Xcode 14.0 beta4 。 之前的屏幕旋转会报错: [Orientation] BUG IN CLIENT OF UIKIT: Setting UIDevice.orientation is not supported. Please use UIWindowScene.requestGeometryUpdate(_

    2024年02月12日
    浏览(55)
  • 2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月05日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包