Flink写入数据到ClickHouse

这篇具有很好参考价值的文章主要介绍了Flink写入数据到ClickHouse。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.ClickHouse建表

ClickHouse中建表

CREATE TABLE default.test_write
(
    id   UInt16,
    name String,
    age  UInt16
) ENGINE = TinyLog();

2.ClickHouse依赖

Flink开发相关依赖

    <properties>
        <flink.version>1.12.1</flink.version>
        <scala.version>2.12.13</scala.version>
        <clickhouse-jdbc.version>0.1.54</clickhouse-jdbc.version>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <dependencies>
        <!-- 写入数据到clickhouse -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>${clickhouse-jdbc.version}</version>
        </dependency>
        <!-- flink核心API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

3.Bean实体类

User.java

package com.daniel.bean;

import lombok.Builder;
import lombok.Data;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:35
 * @Description
 **/

@Data
@Builder
public class User {
    public int id;
    public String name;
    public int age;
}

4.ClickHouse业务写入逻辑

ClickHouseSinkFunction.java

package com.daniel.util;

import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:36
 * @Description
 **/


public class ClickHouseSinkFunction extends RichSinkFunction<User> {
    Connection conn = null;
    String sql;

    public ClickHouseSinkFunction(String sql) {
        this.sql = sql;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = getConn("localhost", 8123, "default");
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (conn != null) {
            conn.close();
        }
    }

    // 定义具体的操作
    @Override
    public void invoke(User user, Context context) throws Exception {
        // 批量插入
        PreparedStatement preparedStatement = conn.prepareStatement(sql);
        preparedStatement.setLong(1, user.id);
        preparedStatement.setString(2, user.name);
        preparedStatement.setLong(3, user.age);
        preparedStatement.addBatch();

        long startTime = System.currentTimeMillis();
        int[] ints = preparedStatement.executeBatch();
        conn.commit();
        long endTime = System.currentTimeMillis();
        System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + ints.length);
    }

    public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
        conn = DriverManager.getConnection(address);
        return conn;
    }
}
  • open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。

  • invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。

  • close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。

5.测试写入类

ClickHouseWriteTest.java

package com.daniel;

import com.daniel.bean.User;
import com.daniel.util.ClickHouseSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * @Author daniel
 * @Date: 2023/7/3 15:37
 * @Description
 **/

public class ClickHouseWriteTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        // Source
        DataStream<String> ds = env.socketTextStream("localhost", 9999);

        // Transform
        SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {
            String[] split = data.split(",");
            return User.builder()
                    .id(Integer.parseInt(split[0]))
                    .name(split[1])
                    .age(Integer.parseInt(split[2]))
                    .build();
        });

        // Sink
        String sql = "INSERT INTO default.test_write (id, name, age) VALUES (?,?,?)";
        ClickHouseSinkFunction jdbcSink = new ClickHouseSinkFunction(sql);
        dataStream.addSink(jdbcSink);
        env.execute("flink-clickhouse-write");
    }
}

6.发送数据

使用nc或者任意工具向指定端口发送数据
例如

nc -L -p 9999

发送数据

1,Daniel,25
2,David,38
3,James,16
4,Robert,27

然后启动ClickHouseWriteTest.java程序

Flink写入数据到ClickHouse,ClickHouse,flink,clickhouse

查询数据

select *
from default.test_write;

由于这里是并行插入,所以没有顺序可言

Flink写入数据到ClickHouse,ClickHouse,flink,clickhouse文章来源地址https://www.toymoban.com/news/detail-523876.html

到了这里,关于Flink写入数据到ClickHouse的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包