42、Flink 的table api与sql之Hive Catalog

这篇具有很好参考价值的文章主要介绍了42、Flink 的table api与sql之Hive Catalog。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文以一个详细的示例介绍了Flink与hive的集成,其中涉及的版本在示例部分有消息的说明。
本文依赖有hadoop、hive、kafka、mysql、flink等所有环境可用。
本分分为6个部分,即hivecatalog介绍、依赖、怎么使用和详细示例、flink与hive的胡数据类型映射等。

一、Hive Catalog

多年来,Hive Metastore已经发展成为Hadoop生态系统中事实上的元数据中心。许多公司在其生产中只有一个 Hive Metastore service实例来管理其所有元数据(Hive 元数据或非 Hive 元数据)。

对于同时拥有 Hive 和 Flink 部署的用户,HiveCatalog 使他们能够使用 Hive Metastore 来管理 Flink 的元数据。

对于刚刚部署 Flink 的用户,HiveCatalog 是 Flink 提供的唯一开箱即用的持久目录。如果没有持久目录,使用 Flink SQL CREATE DDL 的用户必须在每个会话中重复创建像 Kafka 表这样的元对象,这会浪费大量时间。HiveCatalog 填补了这一空白,使用户能够只创建一次表和其他元对象,并在以后跨会话方便地引用和管理它们。

二、Set up HiveCatalog

1、Dependencies

在 Flink 中设置 HiveCatalog 需要与整个 Flink-Hive 集成相同的依赖关系。具体参考:16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

2、Configuration

在 Flink 中设置 HiveCatalog 需要与整个 Flink-Hive 集成相同的配置。
具体参考:16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

三、How to use HiveCatalog

正确配置后,HiveCatalog 应该可以开箱即用。用户可以用 DDL 创建 Flink 元对象,之后应该会立即看到它们。

HiveCatalog 可用于处理两种类型的表:与 Hive 兼容的表和泛型表(Hive-compatible tables and generic tables)。与 Hive 兼容的表是以 Hive 兼容方式存储的表,就存储层中的元数据和数据而言。因此,通过 Flink 创建的 Hive 兼容表可以从 Hive 端查询。

另一方面,Generic tables是特定于 Flink 的。使用 HiveCatalog 创建Generic tables时,我们只是使用 HMS 来保存元数据。虽然这些表对 Hive 可见,但 Hive 不太可能理解元数据。因此,在 Hive 中使用此类表会导致未定义的行为。

Flink 使用属性 ‘is_generic’ 来判断表是与 Hive 兼容还是泛型。使用 HiveCatalog 创建表时,默认情况下将其视为泛型表。如果要创建与 Hive 兼容的表,请确保在表属性中将 is_generic 设置为 false。

如上所述,不应从 Hive 使用泛型表。在 Hive CLI 中,可以为表调用描述格式,并通过检查 is_generic 属性来确定它是否为泛型。泛型表将具有 is_generic=true。

四、示例-Flink 集成 Hive

本示例是介绍flink集成hive的内容,主要体现的是flink和hive共用hive的元数据。
本示例中hive的版本是3.1.2
flink的版本是1.13.6
hadoop的版本是3.1.4
kafka的版本是2.12-3.0.0
mysql的版本是5+,非8

1、修改hive的配置文件

下面的配置在我们部署hive的时候已经配置过,可能内容不一样,详见1、apache-hive-3.1.2简介及部署(三种部署方式-内嵌模式、本地模式和远程模式)及验证详解

非常重要

网络上很多都是介绍jar文件的,没有该种情况介绍

如果hive的其他机器上如果hive-site.xml文件中配置的如下远程访问hiveserver2的,则需要按照如下方式配置,否则flink不能访问hive的metadata服务。
网络上很多都是介绍jar文件的,没有该种情况介绍

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    
    <!-- 远程模式部署metastore 服务地址 -->
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://server4:9083</value>
    </property>

</configuration>

不能访问元数据的错误提示

Flink SQL> show tables;
+----------------------+
|           table name |
+----------------------+
| alanchan_kafka_table |
|         source_table |
+----------------------+
2 rows in set

Flink SQL> desc source_table;
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hive.metastore.api.MetaException: Your client does not appear to support Hive tests. To skip capability checks, please set metastore.client.capability.check to false. This setting can be set globally, or on the client for the current metastore session. Note that this may lead to incorrect results, data loss, undefined behavior, etc. if your client is actually incompatible. You can also specify custom client capabilities via get_table_req API.

我的环境配置文件本地路径是 /usr/local/bigdata/apache-hive-3.1.2-bin/conf/hive-site.xml ,配置如下所示:

<configuration>
   <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true</value>
      <description>metadata is stored in a MySQL server</description>
   </property>

   <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
      <description>MySQL JDBC driver class</description>
   </property>

   <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>root</value>
      <description>user name for connecting to mysql server</description>
   </property>

   <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>123456</value>
      <description>password for connecting to mysql server</description>
   </property>

   <property>
       <name>hive.metastore.uris</name>
       <value>thrift://server4:9083</value>
       <description>IP address (or fully-qualified domain name) and port of the metastore host</description>
   </property>

   <property>
       <name>hive.metastore.schema.verification</name>
       <value>true</value>
   </property>

</configuration>

将hive的配置文件配置好后,进行启动,然后验证hive运行情况。

  • 启动命令
nohup /usr/local/bigdata/apache-hive-3.1.2-bin/bin/hive --service metastore > /usr/local/bigdata/apache-hive-3.1.2-bin/logs/metastore.log --hiveconf hive.root.logger=WARN,console 2>&1 &
	nohup /usr/local/bigdata/apache-hive-3.1.2-bin/bin/hive --service hiveserver2 > /usr/local/bigdata/apache-hive-3.1.2-bin/logs/hiveserver2.log --hiveconf hive.root.logger=WARN,console 2>&1 &
	或
	hive-metastore service hive-metastore start
	hive-server2 service hive-server2 start

	! connect jdbc:hive2://server4:10000
  • 验证hive
[alanchan@server4 apache-hive-3.1.2-bin]$ beeline
Beeline version 3.1.2 by Apache Hive
beeline> ! connect jdbc:hive2://server4:10000
Connecting to jdbc:hive2://server4:10000
Enter username for jdbc:hive2://server4:10000: alanchan(根据自己当初配置的用户名和密码输入)
Enter password for jdbc:hive2://server4:10000: ********(根据自己当初配置的用户名和密码输入)
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://server4:10000> show databases;
+--------------------------+
|      database_name       |
+--------------------------+
| alan_hivecatalog_hivedb  |
| default                  |
| test                     |
| testhive                 |
+--------------------------+
4 rows selected (0.14 seconds)
0: jdbc:hive2://server4:10000> use test;
No rows affected (0.025 seconds)
0: jdbc:hive2://server4:10000> show tables;
+----------------+
|    tab_name    |
+----------------+
| dim_address    |
| dim_channel    |
| dim_date       |
| dim_product    |
| dim_region     |
| dim_user       |
| dms_content_t  |
| dw_sales       |
| fact_order     |
| fact_order2    |
+----------------+
10 rows selected (0.031 seconds)
0: jdbc:hive2://server4:10000> 

2、配置Flink集群和SQL cli

1)、将所有 Hive 依赖项添加到 Flink 发行版中的 /lib 文件夹中

注意版本和名称,flink不同的版本要求不同。我的环境下总计如下jar包,

antlr-runtime-3.5.2.jar
flink-connector-hive_2.12-1.13.6.jar
flink-connector-jdbc_2.11-1.13.6.jar
flink-csv-1.13.5.jar
flink-dist_2.11-1.13.5.jar
flink-json-1.13.5.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar
flink-shaded-zookeeper-3.4.14.jar
flink-sql-connector-elasticsearch7_2.11-1.13.6.jar
flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar
flink-sql-connector-kafka_2.11-1.13.5.jar
flink-table_2.11-1.13.5.jar
flink-table-blink_2.11-1.13.5.jar
guava-27.0-jre.jar
hive-exec-3.1.2.jar
libfb303-0.9.3.jar
mysql-connector-java-6.0.6.jar

而集成hive的jar如下

antlr-runtime-3.5.2.jar
flink-connector-hive_2.12-1.13.6.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar
flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar
guava-27.0-jre.jar
hive-exec-3.1.2.jar
libfb303-0.9.3.jar
mysql-connector-java-6.0.6.jar

2)、修改 SQL CLI 的 yaml 配置文件 sql-cli-defaults.yaml

  • 配置
    如下所示(仅示例部分):
execution:
    planner: blink
    type: streaming
    ...
    current-catalog: myhive  # set the HiveCatalog as the current catalog of the session
    current-database: mydatabase
    
catalogs:
   - name: myhive
     type: hive
     hive-conf-dir: /opt/hive-conf  # contains hive-site.xml
     
-------------------具体示例---------------------------
# 定义 catalogs
catalogs:
   - name: alan_hivecatalog
     type: hive
     property-version: 1
     hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf  # 须包含 hive-site.xml


# 改变表程序基本的执行行为属性。
execution:
 planner: blink                            # 可选: 'blink' (默认)或 'old'
 type: streaming                           # 必选:执行模式为 'batch' 或 'streaming'
 result-mode: table                        # 必选:'table' 或 'changelog'
 max-table-result-rows: 1000000            # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)
 time-characteristic: event-time           # 可选: 'processing-time' 或 'event-time' (默认)
 parallelism: 1                            # 可选:Flink 的并行数量(默认为 1)
 periodic-watermarks-interval: 200         # 可选:周期性 watermarks 的间隔时间(默认 200 ms)
 max-parallelism: 16                       # 可选:Flink 的最大并行数量(默认 128)
 min-idle-state-retention: 0               # 可选:表程序的最小空闲状态时间
 max-idle-state-retention: 0               # 可选:表程序的最大空闲状态时间
 current-catalog: alan_hivecatalog         # 可选:当前会话 catalog 的名称(默认为 'default_catalog')
 current-database: alan_hivecatalog_hivedb # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)
 restart-strategy:                         # 可选:重启策略(restart-strategy)
    type: fallback                          # 默认情况下“回退”到全局重启策略


# 用于调整和调优表程序的配置选项。
# 在专用的”配置”页面上可以找到完整的选项列表及其默认值。
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# 描述表程序提交集群的属性。
deployment:
  response-timeout: 5000
  • 验证配置内容
-- 1、启动flink sql(如果之前的sql cli启动中则需要重启,本示例使用的是yarn-session模式)
[alanchan@server1 bin]$ sql-client.sh -s yarn-session
2023-08-30 00:22:54,215 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
2023-08-30 00:22:54,215 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
No default environment specified.
Searching for '/usr/local/bigdata/flink-1.13.5/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/usr/local/bigdata/flink-1.13.5/conf/sql-client-defaults.yaml
Command history file path: /home/alanchan/.flink-sql-history
-- 2、验证配置的catalog和database
Flink SQL> SHOW CURRENT CATALOG;
+----------------------+
| current catalog name |
+----------------------+
|     alan_hivecatalog |
+----------------------+
1 row in set

Flink SQL> SHOW CURRENT DATABASE;
+-------------------------+
|   current database name |
+-------------------------+
| alan_hivecatalog_hivedb |
+-------------------------+
1 row in set

Flink SQL> SHOW TABLES;
Empty set


3、验证kafka集群生产-消费功能

前提是kafka集群功能正常,以下仅仅是简单的验证指定的主题test_kafka_hive收发消息功能。

1)、kafka发送消息

[alanchan@server3 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic test_kafka_hive --partitions 1 --replication-factor 1
Created topic test_kafka_hive.
[alanchan@server3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_kafka_hive
>hello alan
>hello alanchan
>good morning
>

2)、kafka接收消息

[alanchan@server3 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic test_kafka_hive --from-beginning
hello alan
hello alanchan
good morning

4、在Flink Cli创建kafka表

CREATE TABLE alanchan_kafka_table (
    `id` INT,
    name STRING,
    age BIGINT,
    t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'test_kafka_hive',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
    'format' = 'csv'
);


Flink SQL> CREATE TABLE alanchan_kafka_table (
>     `id` INT,
>     name STRING,
>     age BIGINT,
>     t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp',
>     WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'test_kafka_hive',
>     'scan.startup.mode' = 'earliest-offset',
>     'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>     'format' = 'csv'
> );
[INFO] Execute statement succeed.

Flink SQL> show tables;
+----------------------+
|           table name |
+----------------------+
| alanchan_kafka_table |
|         source_table |
+----------------------+
2 rows in set

Flink SQL> desc alanchan_kafka_table;
+---------------+------------------------+------+-----+---------------------------+---------------------------------------+
|          name |                   type | null | key |                    extras |                             watermark |
+---------------+------------------------+------+-----+---------------------------+---------------------------------------+
|            id |                    INT | true |     |                           |                                       |
|          name |                 STRING | true |     |                           |                                       |
|           age |                 BIGINT | true |     |                           |                                       |
| t_insert_time | TIMESTAMP(3) *ROWTIME* | true |     | METADATA FROM 'timestamp' | `t_insert_time` - INTERVAL '5' SECOND |
+---------------+------------------------+------+-----+---------------------------+---------------------------------------+
4 rows in set

0: jdbc:hive2://server4:10000> use alan_hivecatalog_hivedb;
No rows affected (0.049 seconds)
0: jdbc:hive2://server4:10000> show tables;
+-----------------------+
|       tab_name        |
+-----------------------+
| alanchan_kafka_table  |
| source_table          |
+-----------------------+
2 rows selected (0.041 seconds)
0: jdbc:hive2://server4:10000> describe formatted alanchan_kafka_table;
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
|           col_name            |                     data_type                      |                      comment                       |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name                    | data_type                                          | comment                                            |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | alan_hivecatalog_hivedb                            | NULL                                               |
| OwnerType:                    | USER                                               | NULL                                               |
| Owner:                        | null                                               | NULL                                               |
| CreateTime:                   | Wed Aug 30 08:28:11 CST 2023                       | NULL                                               |
| LastAccessTime:               | UNKNOWN                                            | NULL                                               |
| Retention:                    | 0                                                  | NULL                                               |
| Location:                     | hdfs://HadoopHAcluster/user/hive/warehouse/alan_hivecatalog_hivedb.db/alanchan_kafka_table | NULL                                               |
| Table Type:                   | MANAGED_TABLE                                      | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | flink.connector                                    | kafka                                              |
|                               | flink.format                                       | csv                                                |
|                               | flink.properties.bootstrap.servers                 | 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092 |
|                               | flink.scan.startup.mode                            | earliest-offset                                    |
|                               | flink.schema.0.data-type                           | INT                                                |
|                               | flink.schema.0.name                                | id                                                 |
|                               | flink.schema.1.data-type                           | VARCHAR(2147483647)                                |
|                               | flink.schema.1.name                                | name                                               |
|                               | flink.schema.2.data-type                           | BIGINT                                             |
|                               | flink.schema.2.name                                | age                                                |
|                               | flink.schema.3.data-type                           | TIMESTAMP(3)                                       |
|                               | flink.schema.3.metadata                            | timestamp                                          |
|                               | flink.schema.3.name                                | t_insert_time                                      |
|                               | flink.schema.3.virtual                             | false                                              |
|                               | flink.schema.watermark.0.rowtime                   | t_insert_time                                      |
|                               | flink.schema.watermark.0.strategy.data-type        | TIMESTAMP(3)                                       |
|                               | flink.schema.watermark.0.strategy.expr             | `t_insert_time` - INTERVAL '5' SECOND              |
|                               | flink.topic                                        | test_kafka_hive                                    |
|                               | transient_lastDdlTime                              | 1693355291                                         |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL                                               |
| InputFormat:                  | org.apache.hadoop.mapred.TextInputFormat           | NULL                                               |
| OutputFormat:                 | org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | -1                                                 | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
| Storage Desc Params:          | NULL                                               | NULL                                               |
|                               | serialization.format                               | 1                                                  |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
42 rows selected (0.279 seconds)

5、通过kafka发送消息,同时在flink中查询

1)、kafka发送消息

[alanchan@server3 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic test_kafka_hive --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic test_kafka_hive.
[alanchan@server3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_kafka_hive
>1,alan,15
>2,alanchan,20
>3,alanchanchn,25
>4,alan_chan,30
>5,alan_chan_chn,45
>

2)、Flink sql cli查询数据

Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

Flink SQL> select * from alanchan_kafka_table;

+----+-------------+--------------------------------+----------------------+-------------------------+
| op |          id |                           name |                  age |           t_insert_time |
+----+-------------+--------------------------------+----------------------+-------------------------+
| +I |           1 |                           alan |                   15 | 2023-08-30 09:29:33.993 |
| +I |           2 |                       alanchan |                   20 | 2023-08-30 09:29:48.793 |
| +I |           3 |                    alanchanchn |                   25 | 2023-08-30 09:29:54.795 |
| +I |           4 |                      alan_chan |                   30 | 2023-08-30 09:30:01.480 |
| +I |           5 |                  alan_chan_chn |                   45 | 2023-08-30 09:30:07.161 |

五、支持的数据类型

对于与 Hive 兼容的表,HiveCatalog 需要将 Flink 数据类型映射到相应的 Hive 类型,如下表所述:
42、Flink 的table api与sql之Hive Catalog,# Flink专栏,flink sql,flink 流批一体化,flink kafka,flink catalog,hivecatalog,flink hadoop,flink 实时计算
关于类型映射需要注意的事项:

  • Hive CHAR § 类型的最大长度为255
  • Hive VARCHAR§类型的最大长度为65535
  • Hive MAP类型的key仅支持基本类型,而Flink’s MAP 类型的key执行任意类型
  • Hive不支持联合数据类型,比如STRUCT
  • Hive TIMESTAMP 的精度是 9 , Hive UDFs函数只能处理 precision <= 9的 TIMESTAMP 值
  • Hive 不支持 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型
  • Flink INTERVAL 类型与 Hive INTERVAL 类型不一样

六、Scala Shell

注意:由于 Scala Shell 目前不支持 blink planner,因此不建议在 Scala Shell 中使用 Hive 连接器。
以上,以一个详细的示例介绍了Flink与hive的集成,其中有很多的坑都解决了。文章来源地址https://www.toymoban.com/news/detail-685433.html

到了这里,关于42、Flink 的table api与sql之Hive Catalog的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

    flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

    ⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。 ⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。 经过测试 在fl

    2024年02月22日
    浏览(16)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月11日
    浏览(12)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(11)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(11)
  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(9)
  • Flink集成Hive之Hive Catalog

    流程流程: Flink消费Kafka,逻辑处理后将实时流转换为表视图,利用HiveCataLog创建Hive表,将实时流 表insert进Hive,注意分区时间字段需要为 yyyy-MM-dd形式,否则抛出异常:java.time.format.DateTimeParseException: Text \\\'20240111\\\' could not be parsed 写入到hive分区表 streamEnv需要开启checkpoint,保证flink写入

    2024年01月16日
    浏览(8)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(12)
  • Flink Hive Catalog操作案例

    Flink Hive Catalog操作案例

    在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。 Hive-3.1.3 Flink-1.17.1 1、上传依赖jar包到flink/lib目录下 2、更换planner依赖(Hive集成的推荐设置) 3、启动Hive MetaStore 4、启动flink集群和sql-client 5、在flink sql-cl

    2024年02月08日
    浏览(7)
  • 《十堂课学习 Flink》第五章:Table API 以及 Flink SQL 入门

    《十堂课学习 Flink》第五章:Table API 以及 Flink SQL 入门

    第四章中介绍了 DataStream API 以及 DataSet API 的入门案例,本章开始介绍 Table API 以及基于此的高层应用 Flink SQL 的基础。 Flink 提供了两个关系API——Table API 和 SQL——用于统一的流和批处理。Table API 是一种针对Java、Scala和Python的语言集成查询API,它允许以非常直观的方式组合来

    2024年02月03日
    浏览(22)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(19)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包