轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

这篇具有很好参考价值的文章主要介绍了轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为高吞吐低延迟的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费 Kafka 中的数据方式和源码实现。

Flink 如何消费 Kafka

Flink 在和 Kafka 对接的过程中,跟 Kafka 的版本是强相关的。上一课时也提到了,我们在使用 Kafka 连接器时需要引用相对应的 Jar 包依赖,对于某些连接器比如 Kafka 是有版本要求的,一定要去官方网站找到对应的依赖版本。

我们本地的 Kafka 版本是 2.1.0,所以需要对应的类是 FlinkKafkaConsumer。首先需要在 pom.xml 中引入 jar 包依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

下面将对 Flink 消费 Kafka 数据的方式进行分类讲解。

消费单个 Topic

上一课时我们在本地搭建了 Kafka 环境,并且手动创建了名为 test 的 Topic,然后向名为 test 的 Topic 中写入了数据。

那么现在我们要消费这个 Topic 中的数据,该怎么做呢?

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.enableCheckpointing(5000);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
    // 如果你是0.8版本的Kafka,需要配置
    //properties.setProperty("zookeeper.connect", "localhost:2181");
    //设置消费组
    properties.setProperty("group.id", "group_test");
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
    //设置从最早的ffset消费
    consumer.setStartFromEarliest();
    //还可以手动指定相应的 topic, partition,offset,然后从指定好的位置开始消费
    //HashMap<KafkaTopicPartition, Long> map = new HashMap<>();
    //map.put(new KafkaTopicPartition("test", 1), 10240L);
    //假如partition有多个,可以指定每个partition的消费位置
    //map.put(new KafkaTopicPartition("test", 2), 10560L);
    //然后各个partition从指定位置消费
    //consumer.setStartFromSpecificOffsets(map);
    env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out) throws Exception {
            System.out.println(value);
        }
    });
    env.execute("start consumer...");
}

在设置消费 Kafka 中的数据时,可以显示地指定从某个 Topic 的每一个 Partition 中进行消费。

消费多个 Topic

我们的业务中会有这样的情况,同样的数据根据类型不同发送到了不同的 Topic 中,比如线上的订单数据根据来源不同分别发往移动端和 PC 端两个 Topic 中。但是我们不想把同样的代码复制一份,需重新指定一个 Topic 进行消费,这时候应该怎么办呢?

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//设置消费组
properties.setProperty("group.id", "group_test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
ArrayList<String> topics = new ArrayList<>();
        topics.add("test_A");
        topics.add("test_B");
       // 传入一个 list,完美解决了这个问题
        FlinkKafkaConsumer<Tuple2<String, String>> consumer = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties);
...

我们可以传入一个 list 来解决消费多个 Topic 的问题,如果用户需要区分两个 Topic 中的数据,那么需要在发往 Kafka 中数据新增一个字段,用来区分来源。

消息序列化

我们在上述消费 Kafka 消息时,都默认指定了消息的序列化方式,即 SimpleStringSchema。这里需要注意的是,在我们使用 SimpleStringSchema 的时候,返回的结果中只有原数据,没有 topic、parition 等信息,这时候可以自定义序列化的方式来实现自定义返回数据的结构。

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
    //是否表示流的最后一条元素,设置为false,表示数据会源源不断地到来
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }
    //这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new ConsumerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.offset(),
                new String(record.key()),
                new String(record.value())
        );
    }
    //指定数据的输入类型
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
    }
}

这里自定义了 CustomDeSerializationSchema 信息,就可以直接使用了。

Parition 和 Topic 动态发现

在很多场景下,随着业务的扩展,我们需要对 Kafka 的分区进行扩展,为了防止新增的分区没有被及时发现导致数据丢失,消费者必须要感知 Partition 的动态变化,可以使用 FlinkKafkaConsumer 的动态分区发现实现。

我们只需要指定下面的配置,即可打开动态分区发现功能:每隔 10ms 会动态获取 Topic 的元数据,对于新增的 Partition 会自动从最早的位点开始消费数据。

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");

如果业务场景需要我们动态地发现 Topic,可以指定 Topic 的正则表达式:

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(Pattern.compile("^test_([A-Za-z0-9]*)$"), new SimpleStringSchema(), properties);
Flink 消费 Kafka 设置 offset 的方法

Flink 消费 Kafka 需要指定消费的 offset,也就是偏移量。Flink 读取 Kafka 的消息有五种消费方式:

  • 指定 Topic 和 Partition

  • 从最早位点开始消费

  • 从指定时间点开始消费

  • 从最新的数据开始消费

  • 从上次消费位点开始消费

/**
* Flink从指定的topic和parition中指定的offset开始
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("test", 0), 10000L);
offsets.put(new KafkaTopicPartition("test", 1), 20000L);
offsets.put(new KafkaTopicPartition("test", 2), 30000L);
consumer.setStartFromSpecificOffsets(offsets);
/**
* Flink从topic中最早的offset消费
*/
consumer.setStartFromEarliest();
/**
* Flink从topic中指定的时间点开始消费
*/
consumer.setStartFromTimestamp(1559801580000l);
/**
* Flink从topic中最新的数据开始消费
*/
consumer.setStartFromLatest();
/**
* Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
*/
consumer.setStartFromGroupOffsets();

源码解析

从上面的类图可以看出,FlinkKafkaConsumer 继承了 FlinkKafkaConsumerBase,而 FlinkKafkaConsumerBase 最终是对 SourceFunction 进行了实现。

整体的流程:FlinkKafkaConsumer 首先创建了 KafkaFetcher 对象,然后 KafkaFetcher 创建了 KafkaConsumerThread 和 Handover,KafkaConsumerThread 负责直接从 Kafka 中读取 msg,并交给 Handover,然后 Handover 将 msg 传递给 KafkaFetcher.emitRecord 将消息发出。

因为 FlinkKafkaConsumerBase 实现了 RichFunction 接口,所以当程序启动的时候,会首先调用 FlinkKafkaConsumerBase.open 方法:

public void open(Configuration configuration) throws Exception {
   // 指定offset的提交方式
   this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
         getIsAutoCommitEnabled(),
         enableCommitOnCheckpoints,
         ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
   // 创建分区发现器
   this.partitionDiscoverer = createPartitionDiscoverer(
         topicsDescriptor,
         getRuntimeContext().getIndexOfThisSubtask(),
         getRuntimeContext().getNumberOfParallelSubtasks());
   this.partitionDiscoverer.open();
   subscribedPartitionsToStartOffsets = new HashMap<>();
   final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
   if (restoredState != null) {
      for (KafkaTopicPartition partition : allPartitions) {
         if (!restoredState.containsKey(partition)) {
            restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
         }
      }
      for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
         if (!restoredFromOldState) {
        <span class="hljs-keyword">if</span> (KafkaTopicPartitionAssigner.assign(
           restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
              == getRuntimeContext().getIndexOfThisSubtask()){
           subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
        }
     } <span class="hljs-keyword">else</span> {
       subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
     }
  }
  <span class="hljs-keyword">if</span> (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
     subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -&gt; {
        <span class="hljs-keyword">if</span> (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
           LOG.warn(
              <span class="hljs-string">"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution."</span>,
              entry.getKey());
           <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;
        }
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;
     });
  }
  LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading {} partitions with offsets in restored state: {}"</span>,
     getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);

} else {

  <span class="hljs-keyword">switch</span> (startupMode) {
     <span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
        <span class="hljs-keyword">if</span> (specificStartupOffsets == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
              <span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.SPECIFIC_OFFSETS +
                 <span class="hljs-string">", but no specific offsets were specified."</span>);
        }
        <span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
           Long specificOffset = specificStartupOffsets.get(seedPartition);
           <span class="hljs-keyword">if</span> (specificOffset != <span class="hljs-keyword">null</span>) {
                             subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - <span class="hljs-number">1</span>);
           } <span class="hljs-keyword">else</span> {
           subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
           }
        }
        <span class="hljs-keyword">break</span>;
     <span class="hljs-keyword">case</span> TIMESTAMP:
        <span class="hljs-keyword">if</span> (startupOffsetsTimestamp == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
              <span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.TIMESTAMP +
                 <span class="hljs-string">", but no startup timestamp was specified."</span>);
        }
        <span class="hljs-keyword">for</span> (Map.Entry&lt;KafkaTopicPartition, Long&gt; partitionToOffset
              : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
           subscribedPartitionsToStartOffsets.put(
              partitionToOffset.getKey(),
              (partitionToOffset.getValue() == <span class="hljs-keyword">null</span>)
                  KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                    : partitionToOffset.getValue() - <span class="hljs-number">1</span>);
        }
        <span class="hljs-keyword">break</span>;
     <span class="hljs-keyword">default</span>:
        <span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
           subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
        }
  }
  <span class="hljs-keyword">if</span> (!subscribedPartitionsToStartOffsets.isEmpty()) {
     <span class="hljs-keyword">switch</span> (startupMode) {
        <span class="hljs-keyword">case</span> EARLIEST:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> LATEST:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> TIMESTAMP:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              startupOffsetsTimestamp,
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              specificStartupOffsets,
              subscribedPartitionsToStartOffsets.keySet());
           List&lt;KafkaTopicPartition&gt; partitionsDefaultedToGroupOffsets = <span class="hljs-keyword">new</span> ArrayList&lt;&gt;(subscribedPartitionsToStartOffsets.size());
           <span class="hljs-keyword">for</span> (Map.Entry&lt;KafkaTopicPartition, Long&gt; subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
              <span class="hljs-keyword">if</span> (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                 partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
              }
           }
           <span class="hljs-keyword">if</span> (partitionsDefaultedToGroupOffsets.size() &gt; <span class="hljs-number">0</span>) {
              LOG.warn(<span class="hljs-string">"Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"</span> +
                    <span class="hljs-string">"; their startup offsets will be defaulted to their committed group offsets in Kafka."</span>,
                 getRuntimeContext().getIndexOfThisSubtask(),
                 partitionsDefaultedToGroupOffsets.size(),
                 partitionsDefaultedToGroupOffsets);
           }
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> GROUP_OFFSETS:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
     }
  } <span class="hljs-keyword">else</span> {
     LOG.info(<span class="hljs-string">"Consumer subtask {} initially has no partitions to read from."</span>,
        getRuntimeContext().getIndexOfThisSubtask());
  }

}
}

对 Kafka 中的 Topic 和 Partition 的数据进行读取的核心逻辑都在 run 方法中:

public void run(SourceContext<T> sourceContext) throws Exception {
   if (subscribedPartitionsToStartOffsets == null) {
      throw new Exception("The partitions were not set for the consumer");
   }
   this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
   this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
   final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
   this.offsetCommitCallback = new KafkaCommitCallback() {
      @Override
      public void onSuccess() {
         successfulCommits.inc();
      }
      @Override
      public void onException(Throwable cause) {
         LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
         failedCommits.inc();
      }
   };

if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
LOG.info(“Consumer subtask {} creating fetcher with offsets {}.”,
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);

this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
kafkaFetcher.runFetchLoop();
} else {
runWithPartitionDiscovery();
}
}

Flink 消费 Kafka 数据代码

上面介绍了 Flink 消费 Kafka 的方式,以及消息序列化的方式,同时介绍了分区和 Topic 的动态发现方法,那么回到我们的项目中来,消费 Kafka 数据的完整代码如下:

public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        //设置消费组
        properties.setProperty("group.id", "group_test");
        properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        //设置从最早的ffset消费
        consumer.setStartFromEarliest();
        env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                System.out.println(value);
            }
        });
        env.execute("start consumer...");
    }
}

我们可以直接右键运行代码,在控制台中可以看到数据的正常打印,如下图所示:

通过代码可知,我们之前发往 Kafka 的消息被完整地打印出来了。

总结

这一课时介绍了 Flink 消费 Kafka 的方式,比如从常用的指定单个或者多个 Topic、消息的序列化、分区的动态发现等,还从源码上介绍了 Flink 消费 Kafka 的原理。通过本课时的学习,相信你可以对 Flink 消费 Kafka 有一个较为全面地了解,根据业务场景可以正确选择消费的方式和配置。


精选评论

**6513:

项目中也是用main方法作为入口吗?有集成springboot的案例吗

    讲师回复:

    在编写Flink代码时,尽量避免使用spring类的框架,因为没有必要。只要依赖Flink必要的包和一些工具类即可。

**冰:

老师,这里可以从指定offset消费,怎么在程序终止或停止时保存offset,以便启时使用了?

    讲师回复:

    如果实际情况需要保存位点,那么一般是自己管理位点,每次停止重启后从自己管理的位点消费,比如你可以存储在mysql中,自己去读取

**良:

我是用flink消费Kafka从指定的topic和partition中指定的offset处开始消费,实际结果与预期不一致,消费的分区对不上是啥原因呢?示例代码:Mapoffsets.put(new KafkaTopicPartition(topic, 0), 1L);offsets.put(new KafkaTopicPartition(topic, 1), 2L);offsets.put(new KafkaTopicPartition(topic, 2), 3L);consumer.setStartFromSpecificOffsets(offsets);返回结果:ConsumerRecord(topic=new-topic-config-test, partition=0, offset=10105849, key=, value=Python从入门到放弃!ConsumerRecord(topic=new-topic-config-test, partition=3, offset=10107121, key=, value=Python从入门到放弃!ConsumerRecord(topic=new-topic-config-test, partition=2, offset=10102806, key=, value=Java从入门到放弃!

    讲师回复:

    从两个原因查询,第一看下并行度的设置要和kafka分区设置保持一致。第二,要保证kafka4个分区都有数据。

*轩:

请问哪里可以下载项目用的数据,不是源码

    讲师回复:

    在项目中有数据,也可以自己造一些数据

**7324:

flink如何将key相同的数据写入到kafka的同一个partition呢?

    讲师回复:

    你可以自定义自己的kafka分区器,可以查一下FlinkKafkaPartitioner的用法,但是一般我们不会这么用,如果你需要自定义写入kafka的分区器,要保证数据尽量均匀,不要引起kafka端的数据倾斜文章来源地址https://www.toymoban.com/news/detail-712510.html

到了这里,关于轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(64)
  • 轻松通关Flink第34讲:Flink 和 Redis 整合以及 Redis Sink 实现

    上一课时我们使用了 3 种方法进行了 PV 和 UV 的计算,分别是全窗口内存统计、使用分组和过期数据剔除、使用 BitMap / 布隆过滤器。到此为止我们已经讲了从数据清洗到水印、窗口设计,PV 和 UV 的计算,接下来需要把结果写入不同的目标库供前端查询使用。 下面我们分别讲

    2024年02月08日
    浏览(30)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(29)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(41)
  • flink如何初始化kafka数据源的消费偏移

    我们知道在日常非flink场景中消费kafka主题时,我们只要指定了消费者组,下次程序重新消费时是可以从上次消费停止时的消费偏移开始继续消费的,这得益于kafka的_offset_主题保存的关于消费者组和topic偏移位置的具体偏移信息,那么flink应用中重启flink应用时,flink是从topic的什

    2024年02月16日
    浏览(36)
  • Idea本地跑flink任务时,总是重复消费kafka的数据(kafka->mysql)

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 Idea中执行任务时,没法看到JobManager的错误,以至于我以为是什么特殊的原因导致任务总是反复消费。在close方法中,增加日志,发现jdbc连接被关闭了。 重新消费,jdbc连接又启动了。 注意,在Flink的函数中,open和close方法

    2024年02月07日
    浏览(27)
  • 实战Flink Java api消费kafka实时数据落盘HDFS

    在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件: 确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。 确保 Kafka Serve

    2024年01月24日
    浏览(38)
  • Flink1.17.1消费kafka3.5中的数据出现问题Failed to get metadata for topics [flink].

    问题呈现 Failed to get metadata for topics [flink]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) at org.apache.flink.connecto

    2024年02月11日
    浏览(41)
  • 大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层

    前言 题目: 一、读题分析 二、处理过程 三、重难点分析 总结  本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理 注:由于设备问题,代码执行结果以及数据的展示无法给出,可参照我以往的博客其中有相同数据源展示     提示:以下是本

    2024年02月04日
    浏览(32)
  • 大数据之使用Flink消费Kafka中topic为ods_mall_log的数据,根据不同的表前缀区分在存入Kafka的topic当中

    前言 题目: 一、读题分析 二、处理过程   1.数据处理部分: 2.HBaseSink(未经测试,不能证明其正确性,仅供参考!) 三、重难点分析 总结  什么是HBase? 本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理 注:由于设备问题,代码执行结果

    2024年02月03日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包