Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文简单的介绍了Flink 以clickhouse为数据源或者将数据sink到clickhouse的实现示例。
本文依赖clickhouse的环境好用
本文分为2个部分,即sink到clickhouse和以clickhouse作为数据源。
一、Flink sink Clickhouse
关于clickhouse的基本知识详见该系列文章,关于该部分不再赘述。
ClickHouse系列文章
1、ClickHouse介绍
2、clickhouse安装与简单验证(centos)
3、ClickHouse表引擎-MergeTree引擎
4、clickhouse的Log系列表引擎、外部集成表引擎和其他特殊的表引擎介绍及使用
5、ClickHouse查看数据库容量、表的指标、表分区、数据大小等
1、maven依赖
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.3.1</version>
</dependency>
2、创建clickhouse表
-- 1、创建数据库 tutorial
略
-- 2、创建表
CREATE TABLE t_flink_sink_clickhouse (
id UInt16 COMMENT '员工id',
name String COMMENT '员工姓名',
age UInt8 COMMENT '员工年龄' )
ENGINE = MergeTree
ORDER BY id;
3、验证clickhouse web页面是否正常
http://192.168.10.42:8123/
4、实现
1)、user bean
import lombok.Data;
@Data
public class User {
private int id;
private String name;
private int age;
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
// Java Bean 必须实现的方法,信息通过字符串进行拼接
public static String convertToCsv(User user) {
StringBuilder builder = new StringBuilder();
builder.append("(");
// add user.id
builder.append(user.id);
builder.append(", ");
// add user.name
builder.append("'");
builder.append(String.valueOf(user.name));
builder.append("', ");
// add user.age
builder.append(user.age);
builder.append(" )");
return builder.toString();
}
}
2)、sink实现
package org.clickhouse.test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.clickhouse.ClickHouseSink;
import org.clickhouse.model.ClickHouseClusterSettings;
import org.clickhouse.model.ClickHouseSinkConst;
/**
* @author alanchan
*
*/
public class TestFinkSinkClickhouse {
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// source
// nc
DataStream<String> inputStream = env.socketTextStream("192.168.10.42", 9999);
// Transform
SingleOutputStreamOperator<String> dataStream = inputStream.map(new MapFunction<String, String>() {
@Override
public String map(String data) throws Exception {
String[] split = data.split(",");
User user = new User(Integer.parseInt(split[0]), split[1], Integer.parseInt(split[2]));
return User.convertToCsv(user);
}
});
// create props for sink
Map<String, String> globalParameters = new HashMap<>();
// clickhouse 的服务地址,该链接访问返回ok
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://192.168.10.42:8123/");
// common
globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "/usr/local/bigdata/testdata/clickhouse_failpath");
globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "10");
globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
// set global paramaters
ParameterTool parameters = ParameterTool.fromMap(globalParameters);
env.getConfig().setGlobalJobParameters(parameters);
// env.setParallelism(1);
Properties props = new Properties();
// 数据库tutorial和表名称t_flink_sink_clickhouse
// 需要先创建数据库和表
// CREATE TABLE t_flink_sink_clickhouse (id UInt16 COMMENT '员工id',name String
// COMMENT '员工姓名',age UInt8 COMMENT '员工年龄' ) ENGINE = MergeTree ORDER BY id;
props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "tutorial.t_flink_sink_clickhouse");
props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
ClickHouseSink sink = new ClickHouseSink(props);
dataStream.addSink(sink);
dataStream.print();
env.execute();
}
}
5、验证
1)、nc 输入
[root@server2 etc]# nc -lk 9999
1,alanchan,19
2,alan,20
3,chan,21
2)、启动应用程序
3)、观察应用程序控制台输出
4)、查看clickhouse表中的数据
server2 :) select * from t_flink_sink_clickhouse;
SELECT *
FROM t_flink_sink_clickhouse
Query id: aea358e8-8d9d-4caa-98b1-54903356a7d0
┌─id─┬─name─┬─age─┐
│ 2 │ alan │ 20 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│ 3 │ chan │ 21 │
└────┴──────┴─────┘
┌─id─┬─name─────┬─age─┐
│ 1 │ alanchan │ 19 │
└────┴──────────┴─────┘
3 rows in set. Elapsed: 0.003 sec.
以上,与预期一致。
二、Flink source Clickhouse
1、maven依赖
本处依赖与本文上一个示例中的依赖不一样,另外,如果该示例中的依赖如果不够,则需要自己去maven上找到相关的即可。
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.40</version>
</dependency>
2、实现
1)、user bean
import lombok.Data;
/**
* @author alanchan
*
*/
@Data
public class UserSource {
private int id;
private String name;
private int age;
public UserSource(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public UserSource() {
}
}
2)、source实现
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;
/**
* @author alanchan
*
*/
public class Source_Clickhouse {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<UserSource> users = env.addSource(new ClickhouseSource());
// transformation
// sink
users.print();
// execute
env.execute();
}
private static class ClickhouseSource extends RichParallelSourceFunction<UserSource> {
private boolean flag = true;
private ClickHouseConnection conn = null;
private ClickHouseStatement stmt = null;
private ResultSet rs = null;
private Map<ClickHouseQueryParam, String> additionalDBParams = new HashMap<>();
UserSource user = null;
private String sql = "select id,name,age from t_flink_sink_clickhouse";
// open只执行一次,适合开启资源
@Override
public void open(Configuration parameters) throws Exception {
ClickHouseProperties properties = new ClickHouseProperties();
String url = "jdbc:clickhouse://192.168.10.42:8123/tutorial";
properties.setSessionId(UUID.randomUUID().toString());
// properties.setDatabase("tutorial");
// properties.setHost("192.168.10.42");
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
// ClickHouseProperties
additionalDBParams.put(ClickHouseQueryParam.SESSION_ID, UUID.randomUUID().toString());
conn = dataSource.getConnection();
stmt = conn.createStatement();
}
@Override
public void run(SourceContext<UserSource> ctx) throws Exception {
while (flag) {
rs = stmt.executeQuery(sql, additionalDBParams);
while (rs.next()) {
user = new UserSource(rs.getInt(1), rs.getString(2), rs.getInt(3));
ctx.collect(user);
}
}
}
// 接收到cancel命令时取消数据生成
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
if (conn != null)
conn.close();
if (stmt != null)
stmt.close();
if (rs != null)
rs.close();
}
}
}
3、验证
启动应用程序,查看应用程序控制台输出是不是与期望的一致即可。文章来源:https://www.toymoban.com/news/detail-635574.html
以上,本文简单的介绍了Flink 以clickhouse为数据源或者将数据sink到clickhouse的实现示例。文章来源地址https://www.toymoban.com/news/detail-635574.html
到了这里,关于12、Flink source和sink 的 clickhouse 详细示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!