- 搭建Kafka
参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客
- 启动zookeeper
[root@localhost kafka_2.12-2.8.1]# pwd
/usr/local/wyh/kafka/kafka_2.12-2.8.1
[root@localhost kafka_2.12-2.8.1]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- 启动kafka
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-server-start.sh config/server.properties &
- 查看进程
- 创建topic
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-flink-topic
- 查看topic列表
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181 test-flink-topic
- 导入pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
- 新建类
package test01;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestReadKafka {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//从kafka读
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//这里的泛型需要指定从kafka读数据的数据类型
.setBootstrapServers("192.168.126.128:9092")//设置kafka server
.setGroupId("test-consumer-group")//设置consumer groupid
.setTopics("test-flink-topic")//设置要读取数据的topic
.setValueOnlyDeserializer(new SimpleStringSchema())//从Kafka读数据时需要进行反序列化,由于kafka的数据一般是存在value中的,不是key中,所以这里我们使用的序列化器是只对Value进行反序列化。这里的参数是用的String类型的反序列化器,因为在前面build时我们设置了要读取的数据类型是String类型。
.setStartingOffsets(OffsetsInitializer.latest())//设置flink读取kafka数据的读取策略,这里设置的是从最新数据消费
.build();
streamExecutionEnvironment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
.print();
streamExecutionEnvironment.execute();
}
}
- 启动程序
- 在终端向kafka生产数据,同时观察程序控制台flink的读取情况
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-console-producer.sh --broker-list 192.168.126.128:9092 --topic test-flink-topic
文章来源:https://www.toymoban.com/news/detail-536761.html
如图说明flink从kafka成功读取数据。文章来源地址https://www.toymoban.com/news/detail-536761.html
到了这里,关于Flink DataStream之从Kafka读数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!