flink 1.18 sql demo

这篇具有很好参考价值的文章主要介绍了flink 1.18 sql demo。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink 1.18 sql demo

更换flink-table-planner 为 flink-table-planner-loader pom.xml

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-uber</artifactId>
            <version>1.18.0</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- 官网给的是flink-connector-kafka 但是flink on k8s 会缺包然后有个sql-connector jar 引入后正常 两个保留一个即可 -->
     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka</artifactId>
            <version>3.0.2-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.0.2-1.18</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-table-planner_2.12</artifactId>-->
<!--            <version>1.18.0</version>-->
<!--        </dependency>-->
<!--         https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId> </artifactId>
            <version>1.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- Replace this with the main class of your job 这里是你的主类地址-->
                                    <mainClass>com.cn.App</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

demo

package com.cn;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


/**
 * @Classname app
 * @Description TODO
 * @Date 2024/1/12 11:26
 * @Created by typezhou
 */
public class App {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000L);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        String str = "CREATE TABLE KafkaTable (\n" +
                "  `user_id` STRING,\n" +
                "  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'aaaa',\n" +
                "  'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',\n" +
                "  'properties.group.id' = 'testGrou1p',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'csv'\n" +
                ")";
        tableEnv.executeSql(str);
        Table tableResult = tableEnv.sqlQuery("SELECT user_id  FROM KafkaTable group by user_id");
//        DataStream<ResultBean> tuple2DataStream = tableEnv.toDataStream(result, ResultBean.class);
//        SingleOutputStreamOperator<ResultBean> map = tuple2DataStream.map(new MapFunction<ResultBean, ResultBean>() {
//            @Override
//            public ResultBean map(ResultBean s) throws Exception {
//                Thread.sleep(3000L);
//                return s;
//            }
//        });
//        tuple2DataStream.print();
        String sqlPri = "CREATE TABLE print_table (\n" +
                "  `user_id` STRING \n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'bbbb',\n" +
                "  'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',\n" +
                "  'format' = 'csv'\n" +
                ")";
        tableEnv.executeSql(sqlPri);
        tableEnv.executeSql("insert into  print_table SELECT user_id FROM KafkaTable");

    }


}

文章来源地址https://www.toymoban.com/news/detail-799540.html

到了这里,关于flink 1.18 sql demo的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • mORMot 1.18 第07章 简单的读写操作

    本章描述了典型的数据读写操作。首先,我们将注意力集中在数据上,而不是函数。 读取操作返回一个TID,它是一个32位或64位整数(取决于你的内存模型),反映了表的信息。TID在表中的每一行都是唯一的。 ORM的新手可能会感到惊讶,但通常你不需要创建SQL查询来过滤请求

    2024年04月28日
    浏览(41)
  • 数据结构_复杂度+之后的事-1.18

    本质是个 函数 ,表示复杂度的函数。 用 O 渐进粗略表示,如O(1), O(N)。(这个符号以前在学拓扑结构时见过,现在回想,也确实是算法相关的): 1)常数用O(1)表示; 2)保留最高阶项,并去掉系数。2N^3+N+10-----O(N^3)。 3)对于多情况复杂度,按最复杂情况的计算。 时间复

    2024年01月19日
    浏览(44)
  • k8s 1.18 VS 1.24

    Kubernetes是一个开源的容器编排平台,它致力于自动化容器的部署、扩展和管理。1.24和1.18是Kubernetes的两个版本,它们之间的区别包括以下几个方面: API版本:Kubernetes 1.24支持API版本为v1.22,而Kubernetes 1.18支持API版本为v1.17。 功能特性:Kubernetes 1.24相对于1.18增加了许多新的功

    2023年04月23日
    浏览(46)
  • 【Linux】在centos快速搭建K8S1.18集群

    使用 kubeadm 创建集群帮助文档 如果您需要以下几点,该工具是很好的选择:kubeadm 一种简单的方法,让你尝试 Kubernetes,可能是第一次。 现有用户自动设置群集并测试其应用程序的一种方式。 其他生态系统和/或安装程序工具中的构建块,具有更大的 范围。 一台或多台机器,

    2024年04月29日
    浏览(54)
  • flink1.18.0 macos sql-client.sh启动报错

    2024年01月23日
    浏览(37)
  • flink sql1.18.0连接SASL_PLAINTEXT认证的kafka3.3.1

    阅读此文默认读者对docker、docker-compose有一定了解。 docker-compose运行了一个jobmanager、一个taskmanager和一个sql-client。 如下: 注意三个容器都映射了/opt/flink目录。需要先将/opt/flink目录拷贝到跟docker-compose.yml同一目录下,并分别重命名,如下图: 三个文件夹内容是一样的,只是

    2024年02月03日
    浏览(27)
  • Kubernetes - CentOS7搭建k8s_v1.18集群高可用(kubeadm/二进制包部署方式)实测配置验证手册

    一、Kubernetes—k8s是什么 Kubernetes 这个名字源于希腊语,意为“舵手“或”飞行员\\\"。 Kubernetes,简称K8s,中间有8个字符用8代替缩写。 Google于2014年开源项目,为容器化应用提供集群和管理的开源工具,Kubernetes目标是让部署容器化的应用简单并且高效,提供了应用部署,规划,更

    2024年04月27日
    浏览(44)
  • 24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月06日
    浏览(50)
  • 24、Flink 的table api与sql之Catalogs(java api操作视图)-3

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月07日
    浏览(38)
  • 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

    目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil 7、自定义sink 交由spring管理,处理变更数据         我的场景是从SQL Server数据库获取指定表的增量数据,查

    2024年02月10日
    浏览(83)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包