Flink生产数据到kafka

这篇具有很好参考价值的文章主要介绍了Flink生产数据到kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

文章目录

前言

一、版本

二、使用步骤

1.maven引入库

2.上代码


前言

近期开始学习Flink程序开发,使用java语言,此文以生产数据至kafka为例记录下遇到的问题以及代码实现,若有错误请提出。

一、版本

Flink版本:1.15.4
kafka版本:3.0.0

二、使用步骤

1.maven引入库

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
    <flink.version>1.15.4</flink.version>
    <target.java.version>1.8</target.java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <maven.compiler.source>${target.java.version}</maven.compiler.source>
    <maven.compiler.target>${target.java.version}</maven.compiler.target>
    <log4j2.version>2.14.1</log4j2.version>
</properties>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>${flink.version}</version>
</dependency>

2.上代码

以下代码将Flink环境初始化、配置、生产数据至kafka代码放在一个独立类的原因,主要是在我近期做的业务中需要有多个实例往kafka生产数据,后面两个启动类调用此类。


import com.alibaba.fastjson.JSON;
import com.dogame.data.report.comsume.bean.KafkaEnvParameter;
import com.dogame.data.report.comsume.bean.Order;
import com.dogame.data.report.comsume.utils.EnvironmentUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.Random;
import java.util.UUID;


public class WorkerService {


    public void run (String[] args, String bizType) throws Exception {

        StreamExecutionEnvironment ENV = EnvironmentUtils.initEnvironmentConfig(args, bizType);
        DataStream<String> orderDataStream = ENV.addSource(new ParallelSourceFunction<String>() {
            private boolean running = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                String [] channels = {"www.xxx.com","www.sdfs.com","www.qqq.com","www.dfff.com"};
                while(running){
                    Order order = new Order();
                    order.setOrderId(String.valueOf(System.currentTimeMillis()+new Random().nextInt()));
                    order.setTime(System.currentTimeMillis());
                    order.setFee(1000* new Random().nextDouble());
                    order.setChannel(channels[new Random().nextInt(channels.length)]);
                    ctx.collect(JSON.toJSONString(order)); //发送出去
                    Thread.sleep(10);
                }
            }

            @Override
            public void cancel() {
                running = false;
            }
        });
        orderDataStream.print();

        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setKafkaProducerConfig(KafkaEnvParameter.productProperties)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(KafkaEnvParameter.responseTopics)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                //.setTransactionalIdPrefix(UUID.randomUUID().toString())
                .build();

        orderDataStream.sinkTo(sink);
        orderDataStream.print();
        ENV.execute("SourceFunctionTest");
    }
}
@Slf4j
public class SourceFunctionTest {

    public static void main(String[] args)  {
        try {
            String bizType = "t0";
            WorkerService workerService = new WorkerService();
            workerService.run(args, bizType);
        } catch (Exception e) {
            log.error(" fail ", e);
        }
    }
}


@Slf4j
public class SourceFunctionTest1 {
    
    public static void main(String[] args) throws Exception {
        try {
            String bizType = "t1";
            WorkerService workerService = new WorkerService();
            workerService.run(args, bizType);
        } catch (Exception e) {
            log.error(" fail ", e);
        }
    }
}

需要强调一点的是:经过测试,发现以下问题:若只启动一个实例,代码能稳定运行,若启动两个以上实例,运行几分钟后会有程序报错,最终只有一个实例能持续正常运行,经过2天对这个问题的排查(问chatGPT结合相关代码走读),发现居然是KafkaSinkBuilder这个方法未调用(以上workerService类已注释,打开即可):文章来源地址https://www.toymoban.com/news/detail-425390.html


    /**
     * Sets the prefix for all created transactionalIds if {@link DeliveryGuarantee#EXACTLY_ONCE} is
     * configured.
     *
     * <p>It is mandatory to always set this value with {@link DeliveryGuarantee#EXACTLY_ONCE} to
     * prevent corrupted transactions if multiple jobs using the KafkaSink run against the same
     * Kafka Cluster. The default prefix is {@link #transactionalIdPrefix}.
     *
     * <p>The size of the prefix is capped by {@link #MAXIMUM_PREFIX_BYTES} formatted with UTF-8.
     *
     * <p>It is important to keep the prefix stable across application restarts. If the prefix
     * changes it might happen that lingering transactions are not correctly aborted and newly
     * written messages are not immediately consumable until the transactions timeout.
     *
     * @param transactionalIdPrefix
     * @return {@link KafkaSinkBuilder}
     */
    public KafkaSinkBuilder<IN> setTransactionalIdPrefix(String transactionalIdPrefix) {
        this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix");
        checkState(
                transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length
                        <= MAXIMUM_PREFIX_BYTES,
                "The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size.");
        return this;
    }

到了这里,关于Flink生产数据到kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka3.0.0版本——生产者如何提高吞吐量

    设置批次大小,batch.size 默认 16K。 设置等待时间,linger.ms 默认 0。 设置缓冲区大小,buffer.memory 默认 32M 设置压缩, compression.type 默认 none,可配置值 gzip、snappy、lz4 和 zstd。 代码 在kafka集群上开启 Kafka 消费者 在 IDEA 中执行代码,观察kafka集群控制台中是否接收到消息。 测试

    2023年04月10日
    浏览(26)
  • flink开发常见问题 —— flink-kafka 依赖版本冲突问题

    由于 flink / kafka 的版本不断更新,创建项目的时候就应当考虑清楚这几个依赖库的版本问题,尽可能地与实际场景保持一致,比如服务器上部署的 kafka 是哪个版本,flink 是哪个版本,从而确定我们需要开发的是哪个版本,并且在真正的开发工作开始之前,应当先测试一下保证

    2024年02月07日
    浏览(38)
  • 《Git入门实践教程》前言+目录

    版本控制系统(VCS)在项目开发中异常重要,但和在校大学生的交流中知道,这个重要方向并未受到重视。具备这一技能,既是项目开发能力的体现,也可为各种面试加码。在学习体验后知道,Git多样化平台、多种操作方式、丰富的资源为业内人士提供了方便的同时,也造成

    2024年02月10日
    浏览(47)
  • FPGA学习实践之旅——前言及目录

    很早就有在博客中记录技术细节,分享一些自己体会的想法,拖着拖着也就到了现在。毕业至今已经半年有余,随着项目越来越深入,感觉可以慢慢进行总结工作了。趁着2024伊始,就先开个头吧,这篇博客暂时作为汇总篇,记录在这几个月以及之后从FPGA初学者到也算有一定

    2024年02月03日
    浏览(31)
  • 【数据结构】【王道】【数据结构实现】文章目录

    持续更新中。。。 数据结构 链接 顺序表实现及基本操作(可直接运行) 文章链接 无头结点单链表的实现及基本操作(可直接运行) 文章链接 带头结点单链表的实现及基本操作(可直接运行) 文章链接 双链表的实现及基本操作(可直接运行) 文章链接 循环链表的实现及

    2023年04月08日
    浏览(75)
  • 大数据Flink(五十七):Yarn集群环境(生产推荐)

    文章目录 Yarn集群环境(生产推荐) 一、准备工作

    2024年02月13日
    浏览(31)
  • 【Kafka】Java实现数据的生产和消费

    Kafka 是由 LinkedIn 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的 基于发布订阅模式的消息引擎系统 。 Broker:消息中间件处理节点,一个Kafka节点就是一个Broker,一个或者多个Broker可以组成一个Kafka集群; T

    2023年04月19日
    浏览(38)
  • kafka生产者api和数据操作

    发送流程 消息发送过程中涉及到两个线程—— main线程和Sender线程 main线程 使用serializer(并非java默认)序列化数据,使用partitioner确认发送分区 在main线程中创建了一个双端队列RecordAccumulator,main线程将批次数据发送给RecordAccumulator。 创建批次数据是从内存池中分配内存,在

    2024年02月13日
    浏览(21)
  • Kafka - 获取 Topic 生产者发布数据命令

    从头开始获取 20 条数据(等价于时间升序) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --from-beginning --max-messages 20 获取最新 20 条数据(等价于时间降序)去掉 --from-beginning 即可(默认) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --max-me

    2024年02月14日
    浏览(26)
  • 第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    重要的特性: 消息通过 队列来进行交换 每条消息仅会传递给一个消费者 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级) 生产者或者消费者可以动态加入 传送模型: 异步即发即弃:生产者发送一条消息,不会等待收到一个响应 异步请求、应答:

    2024年02月20日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包