Flink RocketMQ Connector实现

这篇具有很好参考价值的文章主要介绍了Flink RocketMQ Connector实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。

一、自定义FlinkRocketMQConsumer

参考FlinkKafkaConsumer:

public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T>{}

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {}

public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> {}

public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {}

public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceFunction.SourceContext<T> var1) throws Exception;

    void cancel();

    @Public
    public interface SourceContext<T> {
        void collect(T var1);

        @PublicEvolving
        void collectWithTimestamp(T var1, long var2);

        @PublicEvolving
        void emitWatermark(Watermark var1);

        @PublicEvolving
        void markAsTemporarilyIdle();

        Object getCheckpointLock();

        void close();
    }
}

可以看到,自定义的Source,只需要实现SourceFunction。

创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法

public class FlinkRocketMQConsumer implements SourceFunction<String> {
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
         
    }

    @Override
    public void cancel() {
         
    }
}

需要准备一个RocketMQ的消费者客户端,在pom中添加如下依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
    <scope>provided</scope>
</dependency>

对于FlinkRocketMQConsumer来说,需要初始化一个consumer,代码如下:

public class FlinkRocketMQConsumer implements SourceFunction<String> {

    private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");

}

这样,在FlinkRocketMQConsumer类加载的时候,就会初始化一个consumer。

另外,还需要对consumer进行初始化,需要知道nameSrvAddr和topic,所以添加一个构造方法,对consumer进行初始化

public class FlinkRocketMQConsumer implements SourceFunction<String> {
  private String nameSrvAddr;
  private String topic;  
  public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {
    this.nameSrvAddr = nameSrvAddr;
    this.topic = topic;
  }
  ...
}

重写run方法和cancel方法

@Override
public void run(SourceContext<String> ctx) throws Exception {
    consumer.setNamesrvAddr(nameSrvAddr);
    consumer.subscribe(topic, "*");

    consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
        msgs.forEach(msg -> {
            ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));
        });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();

    // 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭
    // 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭
    // 所以,不能让程序走完
    while (true) {
        Thread.sleep(10);
    }
}

@Override
public void cancel() {
    consumer.shutdown();
}

完整代码如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import java.nio.charset.Charset;

/**
 * @author Johnson
 * @version 1.0
 * @description
 * @create 2023-03-20 10:02
 */
public class FlinkRocketMQConsumer implements SourceFunction<String> {

    private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");

    private String nameSrvAddr;
    private String topic;

    public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {
        this.nameSrvAddr = nameSrvAddr;
        this.topic = topic;
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        consumer.setNamesrvAddr(nameSrvAddr);
        consumer.subscribe(topic, "*");

        consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
            msgs.forEach(msg -> {
                ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();

        // 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭
        // 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭
        // 所以,不能让程序走完
        while (true) {
            Thread.sleep(10);
        }
    }

    @Override
    public void cancel() {
        consumer.shutdown();
    }
}

二、方法调用

package rocketmq;

import com.source.FlinkRocketMQConsumer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author Johnson
 * @version 1.0
 * @description
 * @create 2023-03-21 10:30
 */
public class FlinkRocketMQConsumerDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> rmqDS = env.addSource(new FlinkRocketMQConsumer("***:9876", "test_rmq"));
        rmqDS .print("**********");
        env.execute("FlinkRocketMQConsumerDemo");
    }
}

到这来,就可以正常消费RocketMQ中的数据,控制台输出如下。

三、隐患

在FlinkRocketMQConsumer中,为了正常调用SourceContext(ctx),使用可一个线程一直占用,不让run方法结束,目前是可以正常运行,但是能不能经受得起时间检验,会不会给以后埋下隐患,还有待观察。

关于这一点,是否有更好的实现方法,欢迎各位技术大佬留言发表见解。。。文章来源地址https://www.toymoban.com/news/detail-668503.html

到了这里,关于Flink RocketMQ Connector实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink Connector 开发

    Flink 是新一代流 批统一的计算引擎 ,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。 Connector 的作用就相当于一个连接器 ,连接 Flink 计算引擎跟外界存储系统。 Flink 里有以下几种方式,当然也不限于这几种方式可以跟外界进行

    2024年02月03日
    浏览(32)
  • 生态扩展:Flink Doris Connector

    官网地址: https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector flink的安装: flink环境配置:vim /etc/profile 复制到flink的lib目录 doris官网:https://doris.apache.org/docs/ecosystem/flink-doris-connector

    2024年02月06日
    浏览(32)
  • Flink Kafka[输入/输出] Connector

    本章重点介绍生产环境中最常用到的 Flink kafka connector 。使用 Flink 的同学,一定会很熟悉 kafka ,它是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也经常会跟 kafka 进行一些数据的交换,比如利用 kafka consumer 读取数据,然后进行一系

    2024年02月04日
    浏览(31)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(43)
  • Flink Oracle CDC Connector源码解读

    flink cdc是在flink的基础上对oracle的数据进行实时采集,底层使用的是debezium框架来实现,debezium使用oracle自带的logminer技术来实现。logminer的采集需要对数据库和采集表添加补充日志,由于oracle18c不支持对数据添加补充日志,所以目前支持的oracle11、12、19三个版本。 flink oracle

    2024年02月02日
    浏览(35)
  • Flink SQL Hive Connector使用场景

    目录 1.介绍 2.使用 2.1注册HiveCatalog 2.2Hive Read 2.2.1流读关键配置 2.2.2示例

    2024年02月06日
    浏览(32)
  • Flink Upsert Kafka SQL Connector 介绍

    在某些场景中,比方GROUP BY聚合之后的后果,须要去更新之前的结果值。这个时候,须要将 Kafka 记录的 key 当成主键解决,用来确定一条数据是应该作为插入、删除还是更新记录来解决。在 Flink1.11 中,能够通过 flink-cdc-connectors 项目提供的 changelog-json format 来实现该性能。 在

    2024年02月20日
    浏览(37)
  • Flink CDC系列之:Oracle CDC Connector

    2023年08月23日
    浏览(43)
  • 关于flink-sql-connector-phoenix的重写逻辑

    目录 重写意义 代码结构  调用链路 POM文件配置 代码解析 一、PhoenixJdbcD

    2024年02月12日
    浏览(29)
  • Apache Doris (六十四): Flink Doris Connector - (1)-源码编译

     🏡 个人主页:IT贫道-CSDN博客   🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. Flink与Doris版本兼容

    2024年01月18日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包