flinksql kafka到mysql累计指标练习

这篇具有很好参考价值的文章主要介绍了flinksql kafka到mysql累计指标练习。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flinksql 累计指标练习

数据流向:kafka ->kafka ->mysql

模拟写数据到kafka topic:wxt中

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) throws Exception {
        // 设置kafka服务器地址和端口号
        String kafkaServers = "localhost:9092";

        // 设置producer属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaServers);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka producer对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 发送消息
        String topic = "wxt";
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", 9);
        jsonObject.put("name", "王大大");
        jsonObject.put("age", 11);

        // 将JSON对象转换成字符串
        String jsonString = jsonObject.toString();

        // 输出JSON字符串
        System.out.println("JSON String: " + jsonString);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);
        producer.send(record);

        // 关闭producer
        producer.close();
    }
}

kafka topic :wxt1
flinksql kafka到mysql累计指标练习,flink,flink,sql

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;


public class KafkaToMysqlJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 定义Kafka连接属性
        String kafkaBootstrapServers = "localhost:9092";
        String kafkaTopic = "wxt";
        String groupId = "wxt1";

        // 注册Kafka表
        tEnv.executeSql("CREATE TABLE kafka_table (\n" +
                "  id INT,\n" +
                "  name STRING,\n" +
                "  age INT,\n" +
                "  proctime as PROCTIME()\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = '" + kafkaTopic + "',\n" +
                "  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +
                "  'properties.group.id' = '" + groupId + "',\n" +
                "  'format' = 'json',\n" +
                "  'scan.startup.mode' = 'earliest-offset'\n" +
                ")");

        // 注册Kafka表
        // latest-offset
        //earliest-offset
        tEnv.executeSql("CREATE TABLE kafka_table2 (\n" +
                "  window_start STRING,\n" +
                "  window_end STRING,\n" +
                "  name STRING,\n" +
                "  age INT\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'wxt2',\n" +
                "  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +
                "  'properties.group.id' = 'kafka_table2',\n" +
                "  'format' = 'json',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'value.format' = 'csv'\n" +
                ")");


        tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +
                "  window_start String,\n" +
                "   window_end String,\n" +
                "    name String,\n" +
                "    age INT\n" +
                ") WITH (\n" +
                "   'connector' = 'jdbc',\n" +
                "   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '12345678',\n" +
                "   'table-name' = 'leiji_age'\n" +
                ")");

         tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +
                 "from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +
                 "group by  window_start,window_end,name");

        tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");

        env.execute("KafkaToMysqlJob");
    }
}

kafka topic :wxt2
flinksql kafka到mysql累计指标练习,flink,flink,sql
mysql结果数据:
flinksql kafka到mysql累计指标练习,flink,flink,sql
pom文件文章来源地址https://www.toymoban.com/news/detail-715711.html

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flinksql</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.14.3</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.15</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.15</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
            <version>3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>8.0.31</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>org.antlr</groupId>
            <artifactId>antlr-runtime</artifactId>
            <version>3.5.2</version>
        </dependency>


        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libfb303</artifactId>
            <version>0.9.3</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.15</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <configuration>
                    <!-- put your configurations here -->
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

到了这里,关于flinksql kafka到mysql累计指标练习的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

    本文基于flink-1.13.6 SQL Client: Init scripts and Statement Sets 这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 通过Java 应用程序执行的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能

    2023年04月27日
    浏览(41)
  • 使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    目录 1. 环境介绍 2. mysql建表 3. flinksql建表 3.1 进入flinksql客户端  ​3.2 配置输出格式 ​3.3 flink建表 3.4 任务流配置 4. 测试 4.1 插入测试数据 4.2 查看结果表数据​ 4.3 新增测试数据 4.4 再次查看结果表数据 服务 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 连接器

    2024年02月11日
    浏览(42)
  • Flink SQL和Table API实现消费kafka写入mysql

    1、构建 table环境 2、构建source kafka 方式一:API 方式二:Flink SQL 3、构建sink mysql  4、写入将source表写入sink表 方式一:API 方式二:Flink SQL 5、手动执行 6、测试 (1)连接kafka生产者 (2)造数据 (3)mysql查看入库情况

    2024年01月16日
    浏览(44)
  • 【Flink】FlinkSQL读取Mysql表中时间字段相差13个小时

    问题:Flink版本1.13,在我们使用FlinkSQL读取Mysql中数据的时候,发现读取出来的时间字段中的数据和Mysql表中的数据相差13个小时,Mysql建表语句及插入的数据如下; CREATE TABLE `mysql_example` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT \\\'自增ID\\\', `name` varchar(64) DEFAULT NULL COMMENT \\\'姓名\\\'

    2024年01月19日
    浏览(36)
  • 【flink sql】kafka连接器

    Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 前面已经介绍了flink sql创建表的语法及说明:【flink sql】创建表 这篇博客聊聊怎么通过flink sql连接kafka 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(

    2024年02月08日
    浏览(52)
  • Flink Upsert Kafka SQL Connector 介绍

    在某些场景中,比方GROUP BY聚合之后的后果,须要去更新之前的结果值。这个时候,须要将 Kafka 记录的 key 当成主键解决,用来确定一条数据是应该作为插入、删除还是更新记录来解决。在 Flink1.11 中,能够通过 flink-cdc-connectors 项目提供的 changelog-json format 来实现该性能。 在

    2024年02月20日
    浏览(45)
  • Flink 学习十 FlinkSQL

    flink sql 基于flink core ,使用sql 语义方便快捷的进行结构化数据处理的上层库; 类似理解sparksql 和sparkcore , hive和mapreduce 1.1 工作流程 整体架构和工作流程 数据流,绑定元数据 schema ,注册成catalog 中的表 table / view 用户使用table Api / table sql 来表达计算逻辑 table-planner利用 apache calci

    2024年02月10日
    浏览(46)
  • Flink 优化(六) --------- FlinkSQL 调优

    FlinkSQL 官网配置参数: https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景: ➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在状态里,不

    2024年02月14日
    浏览(42)
  • Flink系列之:Upsert Kafka SQL 连接器

    Scan Source: Unbounded 、 Sink: Streaming Upsert Mode Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。 作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一

    2024年01月16日
    浏览(45)
  • Flink系列之:Apache Kafka SQL 连接器

    Scan Source: Unbounded Sink: Streaming Append Mode Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 以下示例展示了如何创建 Kafka 表: 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(W)。 只读列必须声明为 VI

    2024年02月01日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包