ClickHouse--11--ClickHouse API操作

这篇具有很好参考价值的文章主要介绍了ClickHouse--11--ClickHouse API操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


1.Java 读写 ClickHouse API

1.1 首先需要加入 maven 依赖

<!-- 连接 ClickHouse 需要驱动包-->
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
     <artifactId>clickhouse-jdbc</artifactId>
     <version>0.2.4</version>
</dependency>

1.2 Java 读取 ClickHouse 集群表数据

JDBC–01–简介

ClickHouse--11--ClickHouse API操作,数据库,clickhouse

public class Test01 {

    public static void main(String[] args) throws Exception {
        //1.注册数据库驱动
        Class.forName("com.mysql.jdbc.Driver");
        //2.获取数据库连接
        Connection conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/jt_db?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8",
                "root", "root");

        //3.获取传输器
        Statement stat = conn.createStatement();
        //4.发送SQL到服务器执行并返回执行结果
        String sql = "select * from account";
        ResultSet rs = stat.executeQuery( sql );
        //5.处理结果
        while( rs.next() ) {
            int id = rs.getInt("id");
            String name = rs.getString("name");
            double money = rs.getDouble("money");
            System.out.println(id+" : "+name+" : "+money);
        }
        //6.释放资源
        rs.close();
        stat.close();
        conn.close();
        System.out.println("TestJdbc.main()....");
    }

}
                                            

ClickHouse java代码


import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.ResultSet;
import java.sql.SQLException;

public class test01 {
    public static void main(String[] args) throws SQLException {
        ClickHouseProperties props = new ClickHouseProperties();
        props.setUser("default");
        props.setPassword("");
        //1.注册数据库驱动配置
        BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123,node2:8123,node3:8123/default", props);
        //2.获取数据库连接
        ClickHouseConnection conn = dataSource.getConnection();
        //3.获取传输器
        ClickHouseStatement statement = conn.createStatement();
        //4.发送SQL到服务器执行并
        ResultSet rs = statement.executeQuery("select id,name,age from test");
        //5.处理结果
        while (rs.next()) {
            int id = rs.getInt("id");
            String name = rs.getString("name");
            int age = rs.getInt("age");
            System.out.println("id = " + id + ",name = " + name + ",age = " + age);
        }

        //6.释放资源
        conn.close();
        statement.close();
        rs.close();
    }
}

1.3 Java 向 ClickHouse 表中写入数据

package com.cy.demo;

import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.ResultSet;
import java.sql.SQLException;

public class test01 {
    public static void main(String[] args) throws SQLException {
        ClickHouseProperties props = new ClickHouseProperties();
        props.setUser("default");
        props.setPassword("");
        //1.注册数据库驱动配置
        BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123/default", props);
        //2.获取数据库连接
        ClickHouseConnection conn = dataSource.getConnection();
        //3.获取传输器
        ClickHouseStatement statement = conn.createStatement();
        //4.发送SQL到服务器执行并
        statement.execute("insert into test values (100,'王五',30)");//可以拼接批量插入多条
       
        //6.释放资源
        conn.close();
        statement.close();
        rs.close();
    }
}

ClickHouse--11--ClickHouse API操作,数据库,clickhouse

2.Spark 写入 ClickHouse API

  • SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse对应的表中。在 ClickHouse 中需要预先创建好对应的结果表

2.1 导入依赖

        <!-- 连接 ClickHouse 需要驱动包-->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
            <!-- 去除与 Spark 冲突的包 -->
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- Spark-core -->
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL ON Hive-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>

2.2 代码编写

val session: SparkSession =
SparkSession.builder().master("local").appName("test").getOrCreate()
val jsonList = List[String](
"{\"id\":1,\"name\":\"张三\",\"age\":18}",
"{\"id\":2,\"name\":\"李四\",\"age\":19}",
"{\"id\":3,\"name\":\"王五\",\"age\":20}"
)
//将 jsonList 数据转换成 DataSet
import session.implicits._
val ds: Dataset[String] = jsonList.toDS()
val df: DataFrame = session.read.json(ds)
df.show()
//将结果写往 ClickHouse
val url = "jdbc:clickhouse://node1:8123/default"
val table = "test"
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "")
properties.put("socket_timeout", "300000")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url,
table, properties)

3.Flink 写入 ClickHouse API

  • 可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink 在1.11.0 版本对其 JDBC Connnector 进行了重构:

ClickHouse--11--ClickHouse API操作,数据库,clickhouse文章来源地址https://www.toymoban.com/news/detail-832694.html

3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API

  1. maven 中需要导入以下包:
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码:
/**
* 通过 flink-jdbc API 将 Flink 数据结果写入到 ClickHouse 中,只支持 Table API
*
* 注意:
* 1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数
据。
*/
case class PersonInfo(id:Int,name:String,age:Int)
object FlinkWriteToClickHouse1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1,后期每个并行度满批次需要的条数时,会插入 click 中
env.setParallelism(1)
val settings: EnvironmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取 Socket 中的数据
val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
val ds: DataStream[PersonInfo] = sourceDS.map(line => {
val arr: Array[String] = line.split(",")
PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
})
//将 ds 转换成 table 对象
import org.apache.flink.table.api.scala._
val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)
//将 table 对象写入 ClickHouse 中
//需要在 ClickHouse 中创建表:create table flink_result(id Int,name String,age Int) engine =
MergeTree() order by id;
val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//准备 ClickHouse table sink
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node1:8123/default")
.setUsername("default")
.setPassword("")
.setQuery(insertIntoCkSql)
.setBatchSize(2) //设置批次量,默认 5000 条
.setParameterTypes(Types.INT, Types.STRING, Types.INT)
.build()
//注册 ClickHouse table Sink,设置 sink 数据的字段及 Schema 信息
tableEnv.registerTableSink("ck-sink",
sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))
//将数据插入到 ClickHouse Sink 中
tableEnv.insertInto(table,"ck-sink")
//触发以上执行
env.execute("Flink Table API to ClickHouse Example")
}
}

3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API

  1. 在 Maven 中导入以下依赖包
<!-- Flink1.11 后需要 Flink-client 包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码
/**
* Flink 通过 flink-connector-jdbc 将数据写入 ClickHouse ,目前只支持 DataStream API
*/
object FlinkWriteToClickHouse2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val ds: DataStream[String] = env.socketTextStream("node5",9999)
val result: DataStream[(Int, String, Int)] = ds.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
//准备向 ClickHouse 中插入数据的 sql
val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//设置 ClickHouse Sink
val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
//插入数据 SQL
insetIntoCkSql,
//设置插入 ClickHouse 数据的参数
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
ps.setInt(1, tp._1)
ps.setString(2, tp._2)
ps.setInt(3, tp._3)
}
},
//设置批次插入数据
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
//设置连接 ClickHouse 的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://node1:8123/default")
.withUsername("default")
.withUsername("")
.build()
)
//针对数据加入 sink
result.addSink(ckSink)
env.execute("Flink DataStream to ClickHouse Example")
}
}

到了这里,关于ClickHouse--11--ClickHouse API操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 统一观测|借助 Prometheus 监控 ClickHouse 数据库

    ClickHouse 作为用于联机分析(OLAP)的列式数据库管理系统(DBMS), 最核心的特点是极致压缩率和极速查询性能。同时,ClickHouse 支持 SQL 查询,在基于大宽表的聚合分析查询场景下展现出优异的性能。因此,获得了广泛的应用。本文旨在分享阿里云可观测监控 Prometheus 版对开源 Cli

    2024年02月14日
    浏览(42)
  • 分布式数据库(DorisDB、Clickhouse、TiDB)调研

    B站视频:DorisDB VS ClickHouse OLAP PK 1.1 DorisDB 场量:线上数据应用 访问官方网站 DorisDB企业版文档 单表/多表查询,DorisDB总体时间最短 单表查询:DorisDB最快次数最多,ClickHouse次之 多表查询:DorisDB所有执行均最快 DorisDB多表关联效率好 支持各种主流分布式Join,不仅支持大宽表模

    2024年02月06日
    浏览(45)
  • clickhouse数据库 使用http 方式交付查询sql

    今天使用clickhouse 的HTTP 方式进行查询语句 clickhouse  服务  搭建在192.168.0.111 上面 那么我们如何快速的去查询呢   如下 我们可以使用curl 功能 或者直接在浏览器上输入对应的查询命令  如下: 说明: 前面的IP 是我们clickhouse所在的服务器IP底子 端口      8123     默认的H

    2024年01月25日
    浏览(45)
  • Spring Boot集成JPA和ClickHouse数据库

    Spring Boot是一个用于创建独立的、基于Spring的应用程序的框架。它具有快速开发特性,可以大大减少开发人员的工作量。JPA(Java Persistence API)是Java中处理关系型数据库持久化的标准规范,而ClickHouse是一个高性能、分布式的列式数据库。 本文将介绍如何在Spring Boot项目中集成

    2024年02月09日
    浏览(53)
  • [1180]clickhouse查看数据库和表的容量大小

    在mysql中information_schema这个数据库中保存了mysql服务器所有数据库的信息, 而在clickhouse,我们可以通过system.parts查看clickhouse数据库和表的容量大小、行数、压缩率以及分区信息。 在此通过测试数据库来说明。 结果为:这种结果显示的大小size是字节,我们如何转换为常见的

    2024年02月05日
    浏览(55)
  • Python 连接clickhouse数据库以及新建表结构,csv导入数据

    目录 一、Python 连接clickhouse数据库 ◼ clickhouse对外的接口协议通常有两种形式: ◼ 代码实现部分: 二、使用客户端工具DBeaver连接clickhouse ◼ 新建clickhouse表 三、DBeaver 连接clickhouse 用csv文件导入数据 ◼ 导入方式: 方法一:使用DBeaver自带导入数据功能; 方法二:具体方式如

    2024年02月08日
    浏览(98)
  • (三十六)大数据实战——ClickHouse数据库的部署安装实现

    ClickHouse是俄罗斯的Yandex于2016年开源的列式存储数据库 DBMS ),使用C语言编写,主要用于在线分析处理查询( OLAP ),能够使用SQL查询实时生成分析数据报告。 列式存储 :数据按列进行存储,这使得 ClickHouse 能够高效地处理聚合查询和分析操作; 高性能 :ClickHouse 被设计用

    2024年02月19日
    浏览(40)
  • OLAP型数据库 ClickHouse的简介 应用场景 优势 不足

    ClickHouse 是一个开源的分布式列式数据库管理系统 (DBMS),专门用于在线分析处理 (OLAP)。它最初由 Yandex 开发,并且在处理大规模数据分析和实时查询方面表现出色。以下是关于 ClickHouse 的简介、应用场景、优势和不足的概述: ClickHouse 是一个高性能的列式数据库管理系统,专

    2024年02月02日
    浏览(52)
  • docker安装mysql、clickhouse、oracle等各种数据库汇总

    1:docker 安装mongo数据库并使用 官网:https://www.mongodb.com/docs/manual/ mongo shell教程1:http://c.biancheng.net/mongodb2/connection.html 安装1 :https://www.zhihu.com/question/54602953/answer/3047452434?utm_id=0 安装2:https://www.duidaima.com/Group/Topic/ArchitecturedDesign/9182 使用驱动进行java开发:https://mongodb.github.

    2024年02月10日
    浏览(53)
  • mysql、clickhouse查询数据库所有的表以及字段信息

    mysql查询数据库所有的表以及字段信息 SELECT     table_schema 数据库名,   table_name 表名,   COLUMN_NAME 列名,   COLUMN_TYPE 数据类型,   DATA_TYPE 字段类型,   CHARACTER_MAXIMUM_LENGTH 长度,   IS_NULLABLE 是否为空,   COLUMN_DEFAULT 默认值,   COLUMN_COMMENT 备注  FROM  INFORMATION_SCHEMA.COLUMNS where -- tab

    2024年02月08日
    浏览(70)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包