Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,其他依赖如下:
hadoop的版本是3.1.4
hive的版本是3.1.2
flink的环境版本是1.3.6
一、创建 Flink 表并将其注册到 Catalog
1、使用 SQL DDL
用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。
JdbcCatalog不能创建库或表,官方示例写的不明确;hivecatalog可以创建表。
本示例是以mysql为基础,flink 版本为1.17。
// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);
-----Jdbccatalog不能创建表,hivecatalog可以创建表----
Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
Flink SQL> SHOW TABLES;
mytable
-----------------------具体示例如下-----------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
> 'type' = 'jdbc',
> 'default-database' = 'test?useSSL=false',
> 'username' = 'root',
> 'password' = 'root',
> 'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| alan_catalog |
| default_catalog |
+-----------------+
2 rows in set
Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.
Flink SQL> show databases;
+------------------+
| database name |
+------------------+
| azkaban |
| cdhhive |
| cdhhue |
......
| spring_boot_plus |
| springbootmall |
| test |
| zipkin |
+------------------+
29 rows in set
Flink SQL> use test;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+------------------------------+
| table name |
+------------------------------+
| permissions |
| person |
| personinfo |
| role |
| user |
+------------------------------+
34 rows in set
Flink SQL> select * from person;
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.
Flink SQL> select * from person;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 11 | 测试修改go语言 | 30 |
| +I | 13 | NameUpdate | 22 |
| +I | 14 | updatejson | 23 |
| +I | 189 | 再试一试 | 12 |
| +I | 191 | test-full-update | 3333 |
| +I | 889 | zhangsanswagger2 | 88 |
| +I | 892 | update | 189 |
| +I | 1001 | testupdate | 19 |
| +I | 1002 | 测试go语言 | 23 |
| +I | 1013 | slene | 0 |
| +I | 1014 | testing | 0 |
| +I | 1015 | testing | 18 |
| +I | 1016 | astaxie | 19 |
| +I | 1017 | alan | 18 |
| +I | 1018 | chan | 19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows
2、maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<scope>provided</scope>
<!--<version>8.0.20</version> -->
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
3、使用 Table API 创建hive表并注册到hivecatalog示例
用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。
下文示例是以hivecatalog为例,关于更多的hivecatalog将在其他的专题中介绍。
需要说明的是本示例运行时需要将hadoop环境中的/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar复制一份到flink的lib目录(/usr/local/bigdata/flink-1.13.5/lib),此处做法的原因是本人的hadoop环境中配置了lzo的压缩方式。
hadoop的版本是3.1.4
hive的版本是3.1.2
flink的环境版本是1.3.6
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestHiveCatalogDemo {
/**
* @param args
* @throws DatabaseNotExistException
* @throws CatalogException
* @throws DatabaseAlreadyExistException
* @throws TableAlreadyExistException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String name = "alan_hive";
// testhive 数据库名称
String defaultDatabase = "testhive";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
// 使用注册的catalog
tenv.useCatalog("alan_hive");
List<String> tables = hiveCatalog.listTables(defaultDatabase);
for (String table : tables) {
System.out.println("Database:testhive tables:" + table);
}
//创建数据库
// public CatalogDatabaseImpl(Map<String, String> properties, @Nullable String comment) {
// this.properties = checkNotNull(properties, "properties cannot be null");
// this.comment = comment;
// }
Map<String, String> properties = new HashMap();
// properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
// properties.put("connector", "COLLECTION");
CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
hiveCatalog.createDatabase(newDatabaseName, cd, true);
//创建表
String tableName = "alan_hivecatalog_hivedb_testTable";
// public ObjectPath(String databaseName, String objectName)
ObjectPath path = new ObjectPath(newDatabaseName, tableName);
// public CatalogTableImpl( TableSchema tableSchema, Map<String, String> properties, String comment)
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
// public CatalogTableImpl(TableSchema tableSchema, Map<String, String> properties, String comment)
CatalogTable catalogTable = new CatalogTableImpl(schema, properties, "this is table comment");
hiveCatalog.createTable(path, catalogTable, true);
List<String> newTables = hiveCatalog.listTables(newDatabaseName);
for (String table : newTables) {
System.out.println("Database:alan_hivecatalog_hivedb tables:" + table);
}
//插入数据
String insertSQL = "insert into " + newDatabaseName + "." + tableName + " values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查询数据
String selectSQL = "select * from " + newDatabaseName + "." + tableName;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
4、使用 SQL语句 创建hive表并注册到hivecatalog示例
本示例功能与上述的示例功能一样,其区别是使用的实现方式不同,即一个是通过api建表,一个是通过sql建表。
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestCreateHiveTable {
public static final String tableName = "alan_hivecatalog_hivedb_testTable";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws DatabaseAlreadyExistException
* @throws CatalogException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
String name = "alan_hive";
// default 数据库名称
String defaultDatabase = "default";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
tenv.useCatalog("alan_hive");
// Map<String, String> properties = new HashMap();
// CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
// if (hiveCatalog.databaseExists(newDatabaseName)) {
// hiveCatalog.dropDatabase(newDatabaseName, true);
// }
// hiveCatalog.createDatabase(newDatabaseName, cd, true);
tenv.useDatabase(newDatabaseName);
// 创建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
// if(hiveCatalog.tableExists( new ObjectPath(newDatabaseName, tableName))) {
// hiveCatalog.dropTable( new ObjectPath(newDatabaseName, tableName), true);
// }
tenv.executeSql(hive_create_table_sql);
// 插入数据
// String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查询数据
// String selectSQL = "select * from " + tableName;
String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
5、验证
本示例是在flink集群中以命令形式提交的任务,其实通过web ui页面提交任务一样,不再赘述。
前提:
1、hadoop环境好用
2、hive环境好用
3、flink与hive集成环境完成且好用
4、启动flink集群,本文是以yarn-session形式启动的
1)、打包、上传
pom.xml文件中配置打包插件
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding> -->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置jar包的入口类(可选) -->
<mainClass> org.table_sql.TestCreateHiveTable</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
在cmd中打包或在开发工具中打包,本处是以cmd命令行打包
mvn package -Dmaven.test.skip=true
# 直到看到
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 18.304 s
将打包后的jar文件上传至flink集群中并运行即可。文章来源:https://www.toymoban.com/news/detail-832679.html
2)、提交任务
#文件位置 /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar
#如果配置了flink的环境变量直接运行下面的命令;如果没有配置flink的环境变量则需要切换到flink的bin目录运行下面命令
flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar org.table_sql.TestHiveCatalogDemo
3)、验证
# 1、提交任务后运行情况
[alanchan@server1 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar org.table_sql.TestHiveCatalogDemo
2023-08-31 00:18:01,185 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
2023-08-31 00:18:01,185 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
Hive Session ID = 4c3ab8b5-d99e-4e2f-9362-fcbcae8047fa
Hive Session ID = d3fc6679-9b60-47a9-b9e7-d125e3240196
2023-08-31 00:18:07,578 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/usr/local/bigdata/flink-1.13.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-08-31 00:18:07,778 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:07,787 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:07,860 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 2161b431ad0310df06417a3232ca5e60
Hive Session ID = 90444eb0-7fc9-4ac9-adb1-44df145739c7
(
`id` INT,
`name` STRING,
`age` INT
)
2023-08-31 00:18:17,806 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2023-08-31 00:18:17,871 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 0
2023-08-31 00:18:18,115 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:18,116 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:18,119 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 16a85c80862dac9035c62563b39a9fb7
Program execution finished
Job with JobID 16a85c80862dac9035c62563b39a9fb7 has finished.
Job Runtime: 6652 ms
# 2、在flink sql cli中查询表及其数据
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.
Flink SQL> select * from alan_hivecatalog_hivedb_testtable;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 1 | alan | 18 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row
#以上,验证完毕
以上,本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。文章来源地址https://www.toymoban.com/news/detail-832679.html
到了这里,关于【flink番外篇】21、Flink 通过SQL client 和 table api注册catalog示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!