7.1 本章概述
本章基于Elastic Search 以及 Kafka 用于介绍 Flink 的 sinkTo / addSink 的 API 的使用方法,此外我们还会实现几个通用的方法,在实际应用场景中,针对不同的实体类可以通过这个通用的方法来完成,而不需要一对一地实现。
7.2 效果展示
flink 写数据到ES
此外,还将编写一个通用的工具类,用于 kafka 的序列化与反序列化,即对于某实体类(不管是什么类型的实体类均可),我们通过监听kafka的topic进行序列化,得到期望的实体类;并将flink执行结果进行反序列化,转换为json字符串,写回 kafka 。
7.3 代码编写
根据实际情况添加相关依赖,与 Flink / Kafka 相关的依赖我们在前面章节已经陈述过,这里我们只添加额外添加的依赖,也就是
其中 flink.version
是 1.14.6
。
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.45</version>
</dependency>
</dependencies>
7.3.1 sinkTo ES
这个案例中,只是把三个字符串写入 ES。
package cn.smileyan.demo;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 创建索引 hello-world 并向这个索引中写入数据
* @author Smileyan
*/
public class FlinkElasticsearchDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final MultipleParameterTool parameters = MultipleParameterTool.fromArgs(args);
// 设置 Elasticsearch 集群的连接信息 比如 http://localhost:9200 | http://localhost:9200,http://localhost:9200
String urls = parameters.get("es.hosts", "http://localhost:9200");
final String regex = ",";
List<HttpHost> httpHosts = Arrays.stream(urls.split(regex)).map(HttpHost::create).collect(Collectors.toList());
// 创建 ElasticsearchSinkFunction 用于将数据写入 Elasticsearch
final String index = parameters.get("index", "hello-world");
ElasticsearchSink.Builder<String> esSinkBuilder = buildSinkEs(httpHosts, index);
// 创建数据流
DataStream<String> resultStream = env.fromElements("data1", "data2", "data3");
// 将结果 Sink 到 Elasticsearch
resultStream.addSink(esSinkBuilder.build());
// 执行 Flink 作业
env.execute("Flink Elasticsearch Example");
}
/**
* 获取用于将字符串数据 Sink 到 Elasticsearch 的 ElasticsearchSink.Builder 对象。
*
* @param httpHosts Elasticsearch 集群的连接信息
* @param index 存储到 Elasticsearch 中的索引
* @return ElasticsearchSink.Builder 对象
*/
private static ElasticsearchSink.Builder<String> buildSinkEs(List<HttpHost> httpHosts, String index) {
ElasticsearchSinkFunction<String> elasticsearchSinkFunction = (element, ctx, indexer) -> {
// 将数据写入 Elasticsearch
Map<String, String> json = new HashMap<>(1);
json.put("data", element);
IndexRequest source = Requests.indexRequest().index(index).source(json);
indexer.add(source);
};
// 创建 ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
// 设置批量写入的缓冲区大小,可根据实际情况调整
esSinkBuilder.setBulkFlushMaxActions(1);
return esSinkBuilder;
}
}
7.3.2 将实体类反序列化后写 ES
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* @author Smileyan
*/
public class GenericFlinkElasticsearchDemo {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final MultipleParameterTool parameters = MultipleParameterTool.fromArgs(args);
// 设置 Elasticsearch 集群的连接信息 比如 http://localhost:9200 | http://localhost:9200,http://localhost:9200
String urls = parameters.get("es.hosts", "http://localhost:9200");
final String regex = ",";
List<HttpHost> httpHosts = Arrays.stream(urls.split(regex)).map(HttpHost::create).collect(Collectors.toList());
// 创建 ElasticsearchSinkFunction 用于将数据写入 Elasticsearch
final String index = parameters.get("es.index", "hello-world-2");
GenericElasticsearchSinkFunction<Student> elasticsearchSinkFunction = new GenericElasticsearchSinkFunction<>(index);
// 创建 ElasticsearchSink
ElasticsearchSink.Builder<Student> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
DataStream<Student> inputDataStream = env.fromElements(
new Student(1, "张三", 18),
new Student(2, "李四", 20),
new Student(3, "王五", 22)
);
// 将结果 Sink 到 Elasticsearch
inputDataStream.addSink(esSinkBuilder.build());
// 执行 Flink 作业
env.execute("Flink Elasticsearch Example");
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Student {
private Integer id;
private String name;
private Integer age;
}
}
其中我们实现了一个可复用的类方法。
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
/**
* 将实体类数据写入到ES中
* @author Smileyan
*/
public class GenericElasticsearchSinkFunction<T> implements ElasticsearchSinkFunction<T> {
private final String indexName;
public GenericElasticsearchSinkFunction(String indexName) {
this.indexName = indexName;
}
@Override
public void process(T element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
// 将数据写入 Elasticsearch 的逻辑
if (element != null) {
JSONObject jsonMap = JSONObject.from(element);
IndexRequest indexRequest = Requests.indexRequest()
.index(indexName)
.source(jsonMap);
// 将 IndexRequest 添加到 RequestIndexer
requestIndexer.add(indexRequest);
}
}
}
7.3.3 通用 kafka 的序列化、反序列化方法
这个案例中,我们通过实现两个关键接口,使得我们在应用 kafka 的sink和source的过程中更加方便。通过泛型来泛化序列化和反序列化的类类型,从而实现高可复用性。
package cn.smileyan.demos;
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.nio.charset.StandardCharsets;
/**
* 将字节码数据进行序列化,以及将实体类转换
* @author smileyan
* @param <O> 实体类
*/
@Slf4j
public class CommonEntitySchema<O> implements DeserializationSchema<O>, SerializationSchema<O> {
private final Class<O> clazz;
public CommonEntitySchema(Class<O> clazz) {
this.clazz = clazz;
}
@Override
public O deserialize(byte[] message) {
try {
String str = new String(message, StandardCharsets.UTF_8);
log.info("kafka received message: {}", str);
return JSON.parseObject(str, clazz);
} catch (Exception e) {
log.error(e.getMessage());
}
return null;
}
@Override
public boolean isEndOfStream(O nextElement) {
return false;
}
@Override
public TypeInformation<O> getProducedType() {
return TypeInformation.of(clazz);
}
@Override
public byte[] serialize(O element) {
return JSON.toJSONBytes(element);
}
}
接着我们基于这个已经实现的类,完成 kafka 的 source 和 sink 的例子。
package cn.smileyan.demos;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 将 kafka 数据进行序列化,转换为实体类
* @author smileyan
*/
@Slf4j
public class FlinkKafkaEntitySinkToExample {
/**
* 参数解释:
* -bs broker 地址
* -kcg kafka consumer group
* -it kafka 输入数据 topic
* -ot kafka 输出数据 topic
* -ct 是否自动创建输入 topic
* -pt topic 分区数
* -rf topic 副本数
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args);
final String bootstrapServer = cmd.get("bs", "localhost:9092");
final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer");
final String inputTopic = cmd.get("it", "quickstart-events");
final String outputTopic = cmd.get("ot", "quickstart-results");
final boolean createTopic = cmd.getBoolean("ct", false);
final Long transactionTimeout = cmd.getLong("tt", 300000L);
log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);
// 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数
if (createTopic) {
final int partitions = cmd.getInt("pt", 1);
final short replicationFactor = cmd.getShort("rf", (short) 1);
createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);
}
final KafkaSource<Student> kafkaSource = KafkaSource.<Student>builder()
.setGroupId(kafkaConsumerGroup)
.setStartingOffsets(OffsetsInitializer.latest())
.setBootstrapServers(bootstrapServer)
.setTopics(inputTopic)
.setValueOnlyDeserializer(new CommonEntitySchema<>(Student.class))
.build();
Properties properties = new Properties();
properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(transactionTimeout));
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
final KafkaSink<Student> kafkaSink = KafkaSink.<Student>builder()
.setKafkaProducerConfig(properties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new CommonEntitySchema<>(Student.class))
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
final DataStreamSource<Student> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 过滤掉反序列化失败的对象,只保留正确的对象
SingleOutputStreamOperator<Student> out1 = kafkaStream.filter(Objects::nonNull)
.map(student -> {
log.info("filter none objects is {}", student);
return student;
});
// 只选择年纪小于 10 的对象
SingleOutputStreamOperator<Student> out2 = out1.filter(student -> student.getAge() != null && student.getAge() < 10)
.map(student -> {
log.info("filter age < 10: {}", student);
return student;
});
out2.sinkTo(kafkaSink);
env.execute("Flink Kafka Example");
}
/**
* 如果 TOPIC 不存在则创建该 TOPIC
* @param bootstrapServer kafka broker 地址
* @param topic 想要创建的 TOPIC
* @param partitions 并行度
* @param replicationFactor 副本数
*/
public static void createTopic(String bootstrapServer,
String topic,
int partitions,
int replicationFactor) throws ExecutionException, InterruptedException {
Properties adminProperties = new Properties();
adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
try (AdminClient adminClient = AdminClient.create(adminProperties)) {
if (!adminClient.listTopics().names().get().contains(topic)) {
NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
log.info("created topic: {}", topic);
}
}
}
@Data
static class Student {
private String name;
private Integer age;
}
}
这个代码中,我们还通过 filter 方法过滤掉反序列化失败的、反序列化后不满足要求的实体。也就是把满足条件的实体类 sinkTo Kafka。
此外,官方推荐使用的也是这类方法,而不是以前官方实现的 FlinkKafkaXXXXX 之类的。
7.3.4 实现一个通用的KafkaSourceBuilder 以及 KafkaSinkBuilder 方法
假设,我们需要开发多个 Flink Job ,也就是多个不同的 main 方法类,需要针对不同类型的 kafka 消息完成对应的计算过程。也就是说 Kafka 的 source 以及 Kafka 的 sink 的实体类可能大不相同,但是这个过程以及参数是一样的。
所以我们不妨实现一个通过的 source 方法以及 sink 方法,通过 main 方法中的 String[] args 来确定 kafka 的 source 地址以及 kafka 的 sink 地址。
具体的实现方法如下:
package cn.smileyan.demos;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 通用的基于args的kafka构建
* @author smileyan
*/
@Slf4j
public class KafkaArgsBuilder {
/**
* 构建参数
*/
private final MultipleParameterTool parameterTool;
public KafkaArgsBuilder(String[] args) {
parameterTool = MultipleParameterTool.fromArgs(args);
}
/**
* 构建kafka sink
* @param clazz 实体类class
* @param <E> 实体类泛型
* @return kafka sink 对象
*/
public <E> KafkaSink<E> buildSink(Class<E> clazz) {
final String bs = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);
final String ot = parameterTool.getRequired(KafkaArgs.OUTPUT_TOPIC.key);
return KafkaSink.<E>builder()
.setBootstrapServers(bs)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(ot)
.setValueSerializationSchema(new CommonEntitySchema<>(clazz))
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
}
/**
* 构建kafka source
* @param clazz 实体类class
* @param <E> 实体类泛型
* @return kafka source 对象
*/
public <E> KafkaSource<E> buildSource(Class<E> clazz) throws ExecutionException, InterruptedException {
final String kafkaConsumerGroup = parameterTool.getRequired(KafkaArgs.KAFKA_CONSUMER_GROUP.key);
final String bootstrapServer = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);
final String inputTopic = parameterTool.getRequired(KafkaArgs.INPUT_TOPIC.key);
final boolean createTopic = parameterTool.has(KafkaArgs.CREATE_TOPIC.key);
if (createTopic) {
final int partition = parameterTool.getInt(KafkaArgs.CREATE_TOPIC_PARTITION.key, 1);
final short replicationFactor = parameterTool.getShort(KafkaArgs.REPLICATION_FACTOR.key, (short) 1);
createTopic(bootstrapServer, inputTopic, partition, replicationFactor);
}
return KafkaSource.<E>builder()
.setGroupId(kafkaConsumerGroup)
.setStartingOffsets(OffsetsInitializer.latest())
.setBootstrapServers(bootstrapServer)
.setTopics(inputTopic)
.setValueOnlyDeserializer(new CommonEntitySchema<>(clazz))
.build();
}
public enum KafkaArgs {
/*
* kafka 服务地址
*/
BOOTSTRAP_SERVER("bs"),
/*
* kafka 消费者组
*/
KAFKA_CONSUMER_GROUP("kcg"),
/*
* kafka 输入主题
*/
INPUT_TOPIC("it"),
/*
* kafka 输出主题
*/
OUTPUT_TOPIC("ot"),
/*
* 是否自动创建主题
*/
CREATE_TOPIC("ct"),
/*
* 分区数
*/
CREATE_TOPIC_PARTITION("pt"),
/*
* 副本数
*/
REPLICATION_FACTOR("rf");
private final String key;
KafkaArgs(String key) {
this.key = key;
}
}
/**
* 如果 TOPIC 不存在则创建该 TOPIC
* @param bootstrapServer kafka broker 地址
* @param topic 想要创建的 TOPIC
* @param partitions 并行度
* @param replicationFactor 副本数
*/
public static void createTopic(String bootstrapServer,
String topic,
int partitions,
int replicationFactor) throws ExecutionException, InterruptedException {
Properties adminProperties = new Properties();
adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
try (AdminClient adminClient = AdminClient.create(adminProperties)) {
if (!adminClient.listTopics().names().get().contains(topic)) {
NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
log.info("created topic: {}", topic);
}
}
}
}
对应地,添加一个使用的案例:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 将实体类序列化并写入 kafka
* @author smileyan
*/
public class KafkaArgsSinkDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final KafkaArgsBuilder kafkaArgsBuilder = new KafkaArgsBuilder(args);
DataStream<Student> dataStream = env.fromElements(
new Student(1, "张三", 18),
new Student(2, "李四", 20),
new Student(3, "王五", 22)
);
KafkaSink<Student> sinker = kafkaArgsBuilder.buildSink(Student.class);
dataStream.sinkTo(sinker);
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Student {
private Integer id;
private String name;
private Integer age;
}
}
构建一个 kafka source 例子。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 使用 KafkaArgsBuilder 的 source 案例
* @author smileyan
*/
public class KafkaArgsSourceDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final KafkaArgsBuilder kafkaArgsBuilder = new KafkaArgsBuilder(args);
final KafkaSource<String> source = kafkaArgsBuilder.buildSource(String.class);
final DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafkaStream.print();
env.execute("Flink Kafka Example");
}
}
7.4 本章小结
前面章节已经比较详细的介绍了项目的基础环境以及注意事项,本章也作为一个承接点,把最基本的内容大致过一遍以后,将会基于我们假设的需求场景,开发对应的处理流程。包括对多条 kafka 消息的聚合操作,滑动窗口操作,自定义的 map 以及 flatMap 操作,实际业务场景中可能用的同步、异步 http 请求等。
希望可以帮助到您,非常感谢小伙伴们的支持 !
感谢阅读 ~
感谢点赞 ~文章来源:https://www.toymoban.com/news/detail-858849.html
文章来源地址https://www.toymoban.com/news/detail-858849.html
到了这里,关于《十堂课学习 Flink》第七章:Flink 流计算保存结果env.sinkTo(以 Kafka / ES 为例)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!