flink-cdc,clickhouse写入,多路输出

这篇具有很好参考价值的文章主要介绍了flink-cdc,clickhouse写入,多路输出。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、场景

kafka日志数据从kafka读取

1、关联字典表:完善日志数据

2、判断日志内容级别:多路输出

低级:入clickhouse

高级:入clickhouse的同时推送到kafka供2次数据流程处理。文章来源地址https://www.toymoban.com/news/detail-703762.html

2、实现

package com.ws.kafka2clickhouse;

import cn.hutool.json.JSONUtil;
import com.ws.kafka2clickhouse.bean.CompanyInfo;
import com.ws.kafka2clickhouse.bean.LogEvent;
import com.ws.kafka2clickhouse.sink.MyClickHouseSink;
import org.apache.avro.data.Json;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

public class Kafka2ClickHouse {


    public static void main(String[] args) throws Exception {
        System.setProperty("java.net.preferIPv4Stack", "true");
        System.setProperty("HADOOP_USER_NAME", "hdfs");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
//        env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
//        env.getCheckpointConfig().setCheckpointStorage("file:///D:/out_test/ck");
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hdp01:8020/tmp/kafka2hdfs/");
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // 1、读取主流日志数据
        KafkaSource<String> build = KafkaSource.<String>builder()
                .setTopics("dataSource")
                .setGroupId("group1")
                .setBootstrapServers("hdp01:6667")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> kafka = env.fromSource(build, WatermarkStrategy.noWatermarks(), "kafka");
        // 2、主流数据json转换成POJO对象
        SingleOutputStreamOperator<LogEvent> beans = kafka.map((MapFunction<String, LogEvent>) s -> JSONUtil.toBean(s, LogEvent.class));
        // 3、加载字典表cdc流
        tenv.executeSql(
                "CREATE TABLE dmpt_base_oper_log (\n" +
                        "id bigInt primary key," +
                        "title String" +
                        ") WITH (\n" +
                        "'connector' = 'mysql-cdc',\n" +
                        "'hostname' = 'localhost',\n" +
                        "'port' = '3306',\n" +
                        "'username' = 'root',\n" +
                        "'password' = 'root',\n" +
                        "'database-name' = 'test',\n" +
                        "'table-name' = 'test_recursive'\n" +
                        ")"
        );
        Table result = tenv.sqlQuery("select * from dmpt_base_oper_log");
        DataStream<Row> dict = tenv.toChangelogStream(result);
        dict.print();
        // 4、加工字典数据,并组装上 字典表更新类型
        SingleOutputStreamOperator<CompanyInfo> companyDict = dict.map(new RichMapFunction<Row, CompanyInfo>() {
            @Override
            public CompanyInfo map(Row row) throws Exception {
                Long id = (Long) row.getField("id");
                String title = (String) row.getField("title");
                // 携带上cdc数据的数据类型,《新增,删除,修改》
                RowKind kind = row.getKind();
                return new CompanyInfo(id, title, kind);
            }
        });
        // 5、对字典数据进行广播
        MapStateDescriptor<Long, CompanyInfo> company_info_desc = new MapStateDescriptor<>("company_info_dict", Long.class, CompanyInfo.class);
        BroadcastStream<CompanyInfo> broadcastStream = companyDict.broadcast(company_info_desc);
        // 6、创建测流
        OutputTag<String> tokafka = new OutputTag<String>("tokafka") {
        };


        SingleOutputStreamOperator<LogEvent> beans_company = beans.connect(broadcastStream).process(new BroadcastProcessFunction<LogEvent, CompanyInfo, LogEvent>() {
            @Override
            public void processElement(LogEvent logEvent, ReadOnlyContext readOnlyContext, Collector<LogEvent> collector) throws Exception {
                // 新来一条数据流,处理方法
                ReadOnlyBroadcastState<Long, CompanyInfo> broadcastState = readOnlyContext.getBroadcastState(company_info_desc);
                CompanyInfo companyInfo = broadcastState.get(logEvent.getMessageId());
                // 7、如果有单位信息,代表为高级用户数据,将消息同时吐到kafka,因此再输出到主流的同时往测流中也输出一份
                if (companyInfo != null) {
                    logEvent.setCompanyInfo(companyInfo);
                    readOnlyContext.output(tokafka, JSONUtil.toJsonStr(logEvent));
                }
                collector.collect(logEvent);
            }

            @Override
            public void processBroadcastElement(CompanyInfo companyInfo, Context context, Collector<LogEvent> collector) throws Exception {
                // 新来一条广播流,处理方法
                BroadcastState<Long, CompanyInfo> broadcastState = context.getBroadcastState(company_info_desc);
                // 新增
                if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.INSERT.name())) {
                    broadcastState.put(companyInfo.getId(), companyInfo);
                } else if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.DELETE.name())) {
                    // 删除
                    broadcastState.remove(companyInfo.getId());
                } else {
                    // 修改
                    broadcastState.remove(companyInfo.getId());
                    broadcastState.put(companyInfo.getId(), companyInfo);
                }
            }
        });

        //准备向ClickHouse中插入数据的sql
        String insetIntoCkSql = "insert into default.dns_logs values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
        //设置ClickHouse Sink
        SinkFunction<LogEvent> sink = JdbcSink.sink(
                //插入数据SQL
                insetIntoCkSql,
                //设置插入ClickHouse数据的参数
                new JdbcStatementBuilder<LogEvent>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, LogEvent logEvent) throws SQLException {
                        try {
                            preparedStatement.setString(1, logEvent.getMessageType());
                            preparedStatement.setLong(2, logEvent.getMessageId());
                            preparedStatement.setString(3, logEvent.getDeviceId());
                            preparedStatement.setString(4, logEvent.getCol1());
                            preparedStatement.setString(5, logEvent.getCol2());
                            preparedStatement.setString(6, logEvent.getCol3());
                            preparedStatement.setString(7, logEvent.getCol4());
                            preparedStatement.setString(8, logEvent.getHeaders().getDeviceTime());
                            preparedStatement.setLong(9, logEvent.getHeaders().get_uid());
                            preparedStatement.setString(10, logEvent.getHeaders().getProductId());
                            preparedStatement.setString(11, logEvent.getHeaders().getOrgId());
                            if (logEvent.getCompanyInfo() != null) {
                                preparedStatement.setString(12, logEvent.getCompanyInfo().getTitle());
                            } else {
                                preparedStatement.setString(12, null);
                            }
                            preparedStatement.setString(13, logEvent.getRegion());
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                },
                //设置批次插入数据
                new JdbcExecutionOptions.Builder()
                        // 批次大小,默认5000
                        .withBatchSize(10000)
                        // 批次间隔时间
                        .withBatchIntervalMs(5000).
                        withMaxRetries(3).build(),
                //设置连接ClickHouse的配置
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUrl("jdbc:clickhouse://192.16.10.118:1111")
                        .withUsername("default")
                        .withPassword("xxxx")
                        .build()
        );
        // 8、所有数据进入基础库
        beans_company.addSink(sink);
        beans_company.print("基础库clickhouse");
        // 9、高级用户同时推送到分析kafka
        DataStream<String> sideOutput = beans_company.getSideOutput(tokafka);
        sideOutput.print("增强分析kafka");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hdp01:6667");
        // 10、构建kafka sink
        KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                return new ProducerRecord<>(
                        "dataZengQiang", // target topic
                        element.getBytes(StandardCharsets.UTF_8)); // record contents
            }
        };

        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
                "dataZengQiang",             // target topic
                serializationSchema,    // serialization schema
                properties,             // producer config
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
        // 11、写入kafka
        sideOutput.addSink(myProducer);
        env.execute();
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>test</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink</artifactId>
    <properties>
        <flink.version>1.13.2</flink.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flinkSql 需要的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>

        <!-- clickhouse驱动 -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>
        <!-- flink-cdc-mysql 连接器-->
        <dependency>
            <groupId>com.ws</groupId>
            <artifactId>mysql-cdc</artifactId>
            <version>2.2.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/flink-connector-mysql-cdc-2.3-SNAPSHOT.jar</systemPath>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <!-- 把依赖打进jar包 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

到了这里,关于flink-cdc,clickhouse写入,多路输出的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(31)
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka。 监听数据变化,进行异步通知,做系统内异步任务。 架构方案(懒得写了,看图吧): -- 创建pg 高线数据同步用

    2024年02月02日
    浏览(33)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、达梦等数据库开启日志方法

    目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 2.5达梦 2.4.1安装 2.4.2CDC配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验

    2024年02月05日
    浏览(38)
  • 【Flink-CDC】Flink CDC 介绍和原理概述

    CDC是( Change Data Capture 变更数据获取 )的简称。 核心思想是, 监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 主要分为基于查询和基于

    2024年01月20日
    浏览(33)
  • flink-cdc-学习笔记(一)

    Flink 1.11 引入了 CDC. Flink CDC 是一款基于 Flink 打造一系列数据库的连接器。Flink 是流处理的引擎,其主要消费的数据源是类似于一些点击的日志流、曝光流等数据,但在业务场景中,点击流的日志数据只是一部分,具有更大价值的数据隐藏在用户的业务数据库中。Flink CDC 弥补

    2024年04月10日
    浏览(67)
  • ApacheStreamPark2.1.0部署及执行flink-cdc任务同步mysql表的数据到es的实践

    ApacheStreamPark是流处理极速开发框架,流批一体 湖仓一体的云原生平台,一站式流处理计算平台。   特性中的简单易用和文档详尽这两点我也是深有体会的,部署一点都不简单,照着官方文档都不一定能搞出来,下面部署环节慢慢来吐槽吧。   之前我们写 Flink SQL 基本上

    2024年02月11日
    浏览(33)
  • Flinkx/Datax/Flink-CDC 优劣势对比

    Flinkx/Datax/Flink-CDC 优劣势对比_HiBoyljw的博客-CSDN博客        FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步,其由袋鼠云于2016年初步研发完成,目前有稳定的研发团队持续维护,已在Github上开源(开源地址详见文章末尾),并维

    2024年02月07日
    浏览(30)
  • 【Flink】【ClickHouse】写入流式数据到ClickHouse

    Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析 Flink 1.13.2, ClickHouse 22.1.3.7 这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可( 已经安装了docker的可以忽略 ) 四指

    2024年02月03日
    浏览(30)
  • Flink写入数据到ClickHouse

    1.ClickHouse建表 ClickHouse中建表 2.ClickHouse依赖 Flink开发相关依赖 3.Bean实体类 User.java 4.ClickHouse业务写入逻辑 ClickHouseSinkFunction.java open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。 invoke():定义了在每个元素到达Sink操

    2024年02月12日
    浏览(40)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包