【flink番外篇】21、Flink 通过SQL client 和 table api注册catalog示例

这篇具有很好参考价值的文章主要介绍了【flink番外篇】21、Flink 通过SQL client 和 table api注册catalog示例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,其他依赖如下:

hadoop的版本是3.1.4
hive的版本是3.1.2
flink的环境版本是1.3.6

一、创建 Flink 表并将其注册到 Catalog

1、使用 SQL DDL

用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。

JdbcCatalog不能创建库或表,官方示例写的不明确;hivecatalog可以创建表。

本示例是以mysql为基础,flink 版本为1.17。

// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);
-----Jdbccatalog不能创建表,hivecatalog可以创建表----
Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);

Flink SQL> SHOW TABLES;
mytable

-----------------------具体示例如下-----------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = 'root',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in set

Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
|          azkaban |
|          cdhhive |
|           cdhhue |
......
| spring_boot_plus |
|   springbootmall |
|             test |
|           zipkin |
+------------------+
29 rows in set

Flink SQL> use test;
[INFO] Execute statement succeed.

Flink SQL> show tables;

+------------------------------+
|                   table name |
+------------------------------+
|                  permissions |
|                       person |
|                   personinfo |
|                         role |
|                         user |
+------------------------------+
34 rows in set

Flink SQL> select * from person;

Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.

Flink SQL> select * from person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 测试修改go语言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再试一试 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     测试go语言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows

2、maven依赖

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.13.6</flink.version>
</properties>

<dependencies>
	<dependency>
		<groupId>jdk.tools</groupId>
		<artifactId>jdk.tools</artifactId>
		<version>1.8</version>
		<scope>system</scope>
		<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-scala_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-scala_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-java-bridge_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<!-- blink执行计划,1.11+默认的 -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-planner-blink_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-common</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<!-- flink连接器 -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-kafka_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-sql-connector-kafka_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-jdbc_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-hive_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.hive</groupId>
		<artifactId>hive-metastore</artifactId>
		<version>2.1.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hive</groupId>
		<artifactId>hive-exec</artifactId>
		<version>3.1.2</version>
		<scope>provided</scope>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-shaded-hadoop-2-uber</artifactId>
		<version>2.7.5-10.0</version>
		<scope>provided</scope>
	</dependency>

	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.38</version>
		<scope>provided</scope>
		<!--<version>8.0.20</version> -->
	</dependency>

	<!-- 日志 -->
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
		<version>1.7.7</version>
		<scope>runtime</scope>
	</dependency>
	<dependency>
		<groupId>log4j</groupId>
		<artifactId>log4j</artifactId>
		<version>1.2.17</version>
		<scope>runtime</scope>
	</dependency>

	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.44</version>
	</dependency>

	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.2</version>
		<scope>provided</scope>
	</dependency>

</dependencies>

3、使用 Table API 创建hive表并注册到hivecatalog示例

用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。

下文示例是以hivecatalog为例,关于更多的hivecatalog将在其他的专题中介绍。

需要说明的是本示例运行时需要将hadoop环境中的/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar复制一份到flink的lib目录(/usr/local/bigdata/flink-1.13.5/lib),此处做法的原因是本人的hadoop环境中配置了lzo的压缩方式。

hadoop的版本是3.1.4
hive的版本是3.1.2
flink的环境版本是1.3.6

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class TestHiveCatalogDemo {

	/**
	 * @param args
	 * @throws DatabaseNotExistException
	 * @throws CatalogException
	 * @throws DatabaseAlreadyExistException
	 * @throws TableAlreadyExistException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String name = "alan_hive";
		// testhive 数据库名称
		String defaultDatabase = "testhive";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		// 使用注册的catalog
		tenv.useCatalog("alan_hive");

		List<String> tables = hiveCatalog.listTables(defaultDatabase);
		for (String table : tables) {
			System.out.println("Database:testhive  tables:" + table);
		}
//创建数据库
//	    public CatalogDatabaseImpl(Map<String, String> properties, @Nullable String comment) {
//	        this.properties = checkNotNull(properties, "properties cannot be null");
//	        this.comment = comment;
//	    }
		Map<String, String> properties = new HashMap();
//		properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
//		properties.put("connector", "COLLECTION");
		CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
		String newDatabaseName = "alan_hivecatalog_hivedb";

		hiveCatalog.createDatabase(newDatabaseName, cd, true);
//创建表
		String tableName = "alan_hivecatalog_hivedb_testTable";
		// public ObjectPath(String databaseName, String objectName)
		ObjectPath path = new ObjectPath(newDatabaseName, tableName);

//		public CatalogTableImpl( TableSchema tableSchema, Map<String, String> properties, String comment) 
		TableSchema schema = TableSchema.builder()
				.field("id", DataTypes.INT())
				.field("name", DataTypes.STRING())
				.field("age", DataTypes.INT())
				.build();
		
//		public CatalogTableImpl(TableSchema tableSchema, Map<String, String> properties, String comment) 
		CatalogTable catalogTable = new CatalogTableImpl(schema, properties, "this is table comment");
		hiveCatalog.createTable(path, catalogTable, true);

		List<String> newTables = hiveCatalog.listTables(newDatabaseName);
		for (String table : newTables) {
			System.out.println("Database:alan_hivecatalog_hivedb  tables:" + table);
		}
		
//插入数据
		String insertSQL = "insert into " + newDatabaseName + "." + tableName + " values (1,'alan',18)";
		tenv.executeSql(insertSQL);
// 查询数据
		String selectSQL = "select * from " + newDatabaseName + "." + tableName;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();
	}

}

4、使用 SQL语句 创建hive表并注册到hivecatalog示例

本示例功能与上述的示例功能一样,其区别是使用的实现方式不同,即一个是通过api建表,一个是通过sql建表。


import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class TestCreateHiveTable {
	public static final String tableName = "alan_hivecatalog_hivedb_testTable";
	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
																					  "  id INT,\n" + 
																					  "  name STRING,\n" + 
																					  "  age INT" + ") " + 
																					  "TBLPROPERTIES (\n" + 
																					  "  'sink.partition-commit.delay'='5 s',\n" + 
																					  "  'sink.partition-commit.trigger'='partition-time',\n" + 
																					  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";

	/**
	 * @param args
	 * @throws DatabaseAlreadyExistException
	 * @throws CatalogException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
		String name = "alan_hive";
		// default 数据库名称
		String defaultDatabase = "default";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		tenv.useCatalog("alan_hive");

//		Map<String, String> properties = new HashMap();
//		CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
		String newDatabaseName = "alan_hivecatalog_hivedb";
//		if (hiveCatalog.databaseExists(newDatabaseName)) {
//			hiveCatalog.dropDatabase(newDatabaseName, true);
//		}
//		hiveCatalog.createDatabase(newDatabaseName, cd, true);
		tenv.useDatabase(newDatabaseName);

		// 创建表
		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
//		if(hiveCatalog.tableExists( new ObjectPath(newDatabaseName, tableName))) {
//			hiveCatalog.dropTable( new ObjectPath(newDatabaseName, tableName), true);
//		}
		tenv.executeSql(hive_create_table_sql);

		// 插入数据
//		String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
		String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
		tenv.executeSql(insertSQL);

		// 查询数据
//		String selectSQL = "select * from " + tableName;
		String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();
	}

}

5、验证

本示例是在flink集群中以命令形式提交的任务,其实通过web ui页面提交任务一样,不再赘述。

前提:
1、hadoop环境好用
2、hive环境好用
3、flink与hive集成环境完成且好用
4、启动flink集群,本文是以yarn-session形式启动的

1)、打包、上传

pom.xml文件中配置打包插件

<build>
	<sourceDirectory>src/main/java</sourceDirectory>
	<plugins>
		<!-- 编译插件 -->
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>3.5.1</version>
			<configuration>
				<source>1.8</source>
				<target>1.8</target>
				<!--<encoding>${project.build.sourceEncoding}</encoding> -->
			</configuration>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-surefire-plugin</artifactId>
			<version>2.18.1</version>
			<configuration>
				<useFile>false</useFile>
				<disableXmlReport>true</disableXmlReport>
				<includes>
					<include>**/*Test.*</include>
					<include>**/*Suite.*</include>
				</includes>
			</configuration>
		</plugin>
		<!-- 打包插件(会包含所有依赖) -->
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>2.3</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
					<configuration>
						<filters>
							<filter>
								<artifact>*:*</artifact>
								<excludes>
									<!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
									<exclude>META-INF/*.SF</exclude>
									<exclude>META-INF/*.DSA</exclude>
									<exclude>META-INF/*.RSA</exclude>
								</excludes>
							</filter>
						</filters>
						<transformers>
							<transformer
								implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
								<!-- 设置jar包的入口类(可选) -->
								<mainClass> org.table_sql.TestCreateHiveTable</mainClass>
							</transformer>
						</transformers>
					</configuration>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

在cmd中打包或在开发工具中打包,本处是以cmd命令行打包


mvn package  -Dmaven.test.skip=true

# 直到看到
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  18.304 s

将打包后的jar文件上传至flink集群中并运行即可。

2)、提交任务

#文件位置 /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar
#如果配置了flink的环境变量直接运行下面的命令;如果没有配置flink的环境变量则需要切换到flink的bin目录运行下面命令
flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar  org.table_sql.TestHiveCatalogDemo

3)、验证

# 1、提交任务后运行情况
[alanchan@server1 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar  org.table_sql.TestHiveCatalogDemo
2023-08-31 00:18:01,185 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
2023-08-31 00:18:01,185 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
Hive Session ID = 4c3ab8b5-d99e-4e2f-9362-fcbcae8047fa
Hive Session ID = d3fc6679-9b60-47a9-b9e7-d125e3240196
2023-08-31 00:18:07,578 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/usr/local/bigdata/flink-1.13.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-08-31 00:18:07,778 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:07,787 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:07,860 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 2161b431ad0310df06417a3232ca5e60
Hive Session ID = 90444eb0-7fc9-4ac9-adb1-44df145739c7
(
  `id` INT,
  `name` STRING,
  `age` INT
)
2023-08-31 00:18:17,806 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2023-08-31 00:18:17,871 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 0
2023-08-31 00:18:18,115 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:18,116 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:18,119 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 16a85c80862dac9035c62563b39a9fb7
Program execution finished
Job with JobID 16a85c80862dac9035c62563b39a9fb7 has finished.
Job Runtime: 6652 ms

# 2、在flink sql cli中查询表及其数据
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

Flink SQL> select * from alan_hivecatalog_hivedb_testtable;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                           alan |          18 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row

#以上,验证完毕

以上,本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。文章来源地址https://www.toymoban.com/news/detail-832679.html

到了这里,关于【flink番外篇】21、Flink 通过SQL client 和 table api注册catalog示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包