一文带你理解Kafka的Header

这篇具有很好参考价值的文章主要介绍了一文带你理解Kafka的Header。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Header简介

kafka消息头,Kafka,Flink,kafka,header,flink读取header

Kafka从 0.11.0.0 版本开始提供了一种在生产者和消费者之间传递元数据的机制,叫做 Kafka header。使用这个机制,你可以在消息中添加一些与数据内容无关的附加信息,如消息的来源、类型、版本、生产时间、过期时间、分区数、用户 ID 等等。
Kafka header 是由一个或多个键值对组成的列表,每个键值对都称为 header。 消息可以包含零个或多个 header
下面是一些简单的理解 Kafka header 的方式:

  • Kafka header 可以看作是消息的元数据,因为它们不包含实际可用的消息负载。
  • Kafka header 的作用类似于 HTTP 或者 TCP/IP 协议中的 header 头部,在消息中添加一些描述性信息,方便消费者解析和处理消息
  • Kafka header 的使用并不是强制性的。你完全可以不使用它们,只发送负载数据。
  • Kafka header 不同于消息的 key 和 value,因为它们与数据的生命周期无关

Header使用场景

  1. 消息追踪
    通过在Header中添加一个全局唯一的ID,可以跟踪消息在整个应用系统中的传递轨迹。

    ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
    record.headers().add("messageId", UUID.randomUUID().toString().getBytes());
    producer.send(record);
    

    在消费消息的时候,可以获取到消息的KafkaHeader,并从中提取出消息ID进行追踪:

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        Headers headers = record.headers();
        String messageId = new String(headers.lastHeader("messageId").value());
        System.out.println("Received message with ID: " + messageId);
    }
    
  2. 消息路由
    Header可以为消息添加一个路由键,在消息传递的过程中,可以根据这个路由键进行消息的路由。

    ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
    record.headers().add("routeKey", "myroutekey".getBytes());
    producer.send(record);
    

    在消费消息的时候,消费者可以根据路由键过滤出需要消费的消息:

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        Headers headers = record.headers();
        String routeKey = new String(headers.lastHeader("routeKey").value());
        if (routeKey.equals("myroutekey")) {
            //process the message
        }
    }
    
  3. 传递消息元数据
    Header可以在消息传递过程中携带一些重要的元数据,这些元数据可以用于解释消息的内容或者处理方式。

    ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
    record.headers().add("content-type", "text/plain".getBytes());
    producer.send(record);
    

    在消费消息的时候,可以从KafkaHeader中提取出content-type元数据,来解释消息的内容格式:

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        Headers headers = record.headers();
        String contentType = new String(headers.lastHeader("content-type").value());
        if (contentType.equals("text/plain")) {
            //process the message
        }
    }
    

Java使用Header案例

  1. 生产者

    package com.byd.dev.kfk;
    
    import org.apache.hadoop.security.UserGroupInformation;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.header.Header;
    import org.apache.kafka.common.header.internals.RecordHeader;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.io.IOException;
    import java.security.PrivilegedAction;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.Scanner;
    
    public class HeaderProducer {
        public static void main(String[] args) {
            // kerberos认证
            System.setProperty("java.security.krb5.conf", "D:/demo/krb5.conf");
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            System.setProperty("java.security.auth.login.config", "D:/demo/jaas.conf");
            try {
                UserGroupInformation.loginUserFromKeytab("c.dev.hdfs", "D:/demo/demo.keytab");
                UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<Object>() {
                    @Override
                    public Object run() {
                        try {
                            runProducer();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                        return null;
                    }
                });
    
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 启动生产者
         */
        public static void runProducer() throws InterruptedException {
            KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());
            sendHeaderMessage(producer);
        }
    
        /**
         * 生产带有Header的记录
         *
         * @param producer KafkaProducer
         */
        public static void sendHeaderMessage(KafkaProducer<String, String> producer) throws InterruptedException {
            List<Header> headers = new ArrayList<>();
            headers.add(new RecordHeader("website", "www.xc.com".getBytes()));
            Scanner sc = new Scanner(System.in);
            for (int i = 0; i < 100; i++) {
                System.out.println("生产第" + i + "条数据");
                String s = sc.next();
                ProducerRecord<String, String> record = new ProducerRecord<>("xc", null,
                        "key" + i, "生产的数据为:" + s, headers);
                producer.send(record);
                Thread.sleep(1000);
            }
    
        }
    
        /**
         * 发送带时间戳的Header记录
         * @param producer KafkaProducer
         * @param ts 时间戳
         */
        public static void sendHeaderMessage(KafkaProducer<String, String> producer, long ts) {
            ts = System.currentTimeMillis();
            List<Header> headers = new ArrayList<>();
            headers.add(new RecordHeader("website", "www.xc.com".getBytes()));
            ProducerRecord<String, String> record = new ProducerRecord<>("xc",
                    null, ts, "message", "Hello World", headers);
            producer.send(record);
        }
    
        /**
         * 获取连接Kafka的配置
         * @return Properties
         */
        public static Properties getProducerProperties() {
            Properties producerProperties = new Properties();
            producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092,xxx:9092,xxx:9092");
            producerProperties.put("sasl.kerberos.service.name", "kafka");
            producerProperties.put("sasl.mechanism", "GSSAPI");
            producerProperties.put("security.protocol", "SASL_PLAINTEXT");
            producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            return producerProperties;
        }
    }
    
    
  2. 消费者

    package com.byd.dev.kfk;
    
    import org.apache.hadoop.security.UserGroupInformation;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.header.Header;
    import org.apache.kafka.common.header.Headers;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.io.IOException;
    import java.security.PrivilegedAction;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class HeaderConsumer {
        public static void main(String[] args) {
            // kerberos认证
            System.setProperty("java.security.krb5.conf", "D:/demo/krb5.conf");
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            System.setProperty("java.security.auth.login.config", "D:/demo/jaas.conf");
            try {
                UserGroupInformation.loginUserFromKeytab("c.dev.hdfs",
                        "D:/project/lab-project/src/main/resources/kerberos/dev/ic.dev.hdfs.keytab");
                UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<Object>() {
                    @Override
                    public Object run() {
                        try {
                            runConsumer();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                        return null;
                    }
                });
    
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
    
        }
    
        /**
         * 启动消费者
         */
        public static void runConsumer(){
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties());
            consumerMessageWithHeader(consumer);
        }
    
        /**
         * 获取连接Kafka的配置
         * @return Properties
         */
        public static Properties getConsumerProperties() {
            Properties consumerProperties = new Properties();
            consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "xxx:9092,xxx:9092,xxx:9092");
            consumerProperties.put("sasl.kerberos.service.name","kafka");
            consumerProperties.put("sasl.mechanism","GSSAPI");
            consumerProperties.put("security.protocol","SASL_PLAINTEXT");
            consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
            return consumerProperties;
        }
    
        /**
         * 消费带有Header的记录
         * @param consumer KafkaConsumer
         */
        public static void consumerMessageWithHeader(KafkaConsumer<String, String> consumer) {
            consumer.subscribe(Arrays.asList("xc"));
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(10));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("消费记录的key:"+record.key());
                    System.out.println("消费记录的value:"+record.value());
    
                    Headers consumedHeaders = record.headers();
                    for (Header header : consumedHeaders) {
                        System.out.println("header的key:"+header.key());
                        System.out.println("header的value:"+new String(header.value()));
                    }
                }
            }
    
        }
    }
    
    

结果:
生产者生产消息:

hello
生产第1条数据
world
生产第2条数据

消费者消费消息:

消费记录的key:key0
消费记录的value:生产的数据为:hello
header的key:website
header的value:www.xc.com
消费记录的key:key1
消费记录的value:生产的数据为:world
header的key:website
header的value:www.xc.com

Flink消费Header案例

package com.byd.dev;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.Properties;

public class FlinkKafkaHeader {
    public static String topic = "xc";
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // kerberos认证
        System.setProperty("java.security.krb5.conf", "D:/demo/krb5.conf");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        System.setProperty("java.security.auth.login.config", "D:/demo/jaas.conf");
        try {
            UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        runApp(env);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                }
            });

        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    public static void runApp(StreamExecutionEnvironment env) throws Exception {
        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<>(topic, new MyDeserializationSchema(),
                        getKafkaProperties());
        consumer.setStartFromEarliest();
        env.addSource(consumer).print();
        env.execute(FlinkKafkaHeader.class.getSimpleName());
    }

    /**
     * 获取连接Kafka的配置
     * @return Properties
     */
    public static Properties getKafkaProperties() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");
        props.setProperty("group.id", "group1");
        props.setProperty("flink.partition-discovery.interval-millis", "10000");
        props.setProperty("security.protocol", "SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism", "GSSAPI");
        props.setProperty("sasl.kerberos.service.name", "kafka");
        return props;
    }
}

package com.byd.dev;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;

public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {

    @Override
    public boolean isEndOfStream(String s) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        Iterable<Header> headers = consumerRecord.headers();
        for (Header header : headers) {
            String key = header.key();
            String value = new String(header.value(), "UTF-8");
            // 处理 header 数据
            System.out.println("kafka的header数据:" + key + ":" + value);
        }
        return new String(consumerRecord.value(), "UTF-8");
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

Header设计思路

KafkaHeader的设计思路是基于Kafka的消息体系结构设计的,消息体系结构包括了消息的消息体、消息头和消息尾。KafkaHeader被设计成一个可扩展的消息头,可以为消息添加一些有用的元数据。KafkaHeader的键使用字符串表示,并且值可以是任何字节序列。

KafkaHeader还可以使用基于优先级的机制,覆盖或者添加KafkaHeader。当发送相同主题和分区的消息时,新的KafkaHeader将覆盖旧的KafkaHeader。

KafkaProducer和KafkaConsumer都提供了API,用于访问和修改KafkaHeader。其中KafkaProducer提供的API可以为将要发送到Kafka服务器的消息添加或者删除KafkaHeader。而KafkaConsumer提供的API可以用于从接收到的消息中提取KafkaHeader,并对消息进行分析和处理。

Header的性能影响和注意事项

由于KafkaHeader是在消息中添加的元数据,因此在为消息添加KafkaHeader时需要注意以下几点:

  1. 注意KafkaHeader的大小:KafkaHeader的大小会影响网络传输的性能,因此在添加KafkaHeader时需要权衡添加元数据的重要程度和通信性能的需要。

  2. 注意KafkaHeader支持的数据类型:KafkaHeader支持的数据类型包括了任何字节序列,因此在添加KafkaHeader时需要确保添加的元数据可以正确地解析和处理。

  3. 注意KafkaHeader的覆盖机制:当发送相同主题和分区的消息时,新的KafkaHeader会覆盖旧的KafkaHeader,因此需要注意不要在消息发送过程中出现意外丢失KafkaHeader的情况。

  4. 注意KafkaHeader的存在:KafkaHeader虽然可以为消息提供有用的元数据,但是当使用者处理消息时需要确保消息本身是唯一的标识,而不是KafkaHeader。

本文总结

本文介绍了 Kafka 中的 header,它可以用来传递一些与数据内容无关的附加信息,方便消费者解析和处理消息。与数据的 key、value 不同,header 不包含实际的消息内容,它们只是元数据,不影响消息的生命周期。在生产者和消费者中都可以使用 Kafka header,可以通过 API 来添加、读取和操作 header。文章来源地址https://www.toymoban.com/news/detail-598660.html

参考文献

  1. https://www.baeldung.com/java-kafka-custom-headers
  2. https://www.python100.com/html/55R0B31WXT2C.html

到了这里,关于一文带你理解Kafka的Header的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一篇文章带你从入门都入土 Kafka 消息中间件(原理+代码)

    目录 一、Kafka定义 二、消息队列 三、Kafka基础架构图 四、安装Kafka 4.1 为每台服务器下载Kafka并解压 4.2 查看目录结构 4.3 为每台服务器修改配置文件server.properties 4.4 为每台服务器配置Kafka环境变量 4.5 启动zookeeper集群 4.6 启动Kafka集群 4.7 关闭Kafka集群的注意事项 五、Topic命令

    2024年02月04日
    浏览(41)
  • 从零到Kafka:万字带你体验Spring Boot整合消息驱动的奇妙之旅

    主页传送门:📀 传送 Spring boot : | 基于Spring的开源框架,用于简化新Spring应用的初始搭建以及开发过程 特性: | 快速开发、轻量级、无代码生成和独立运行等特性 优势: | 简化配置,提供自动配置,减少开发时间 应用场景: | 适用于微服务架构、云原生应用等场景 环境

    2024年02月05日
    浏览(43)
  • 大数据系统常用组件理解(Hadoop/hive/kafka/Flink/Spark/Hbase/ES)

    一.Hadoop Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 Hadoop 以一种可靠、高效、可伸缩的方式进行数据处理。 Hadoop的核心是yarn、HDFS和Mapreduce。yarn是资源管理系统,实现资源调度,yarn是Hadoop2.0中的资源管理系统,总体上是master/slave结构。对于yarn可以粗浅将其理解

    2024年02月20日
    浏览(46)
  • 全网最详细地理解Kafka中的Topic和Partition以及关于kafka的消息分发、服务端如何消费指定分区、kafka的分区分配策略(range策略和RoundRobin策略)

    最近在学习kafka相关的知识,特将学习成功记录成文章,以供大家共同学习。 首先要注意的是, Kafka 中的 Topic 和 ActiveMQ 中的 Topic 是不一样的。 在 Kafka 中, Topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 Kafka 集群的消息都有一个类别。 物理上

    2024年01月25日
    浏览(43)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(48)
  • 一文带你深入理解probe函数

    驱动注册的probe函数 probe函数在设备驱动注册最后收尾工作,当设备的device 和其对应的driver 在总线上完成配对之后,系统就调用 platform设备的probe函数完成驱动注册最后工作。资源、 中断调用函数以及其他相关工作。下面是probe被调用的一些程序流程。 从driver_register看起:

    2024年02月13日
    浏览(71)
  • 【Kafka】消息队列Kafka进阶

    生产者分区写入策略 生产者写入消息到 topic,Kafka 将依据不同的策略将数据分配到不同的分区中。 轮询分区策略 随机分区策略 按key分区分配策略 自定义分区策略 轮询策略   默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区。如果在生产

    2024年02月15日
    浏览(43)
  • 【Kafka】消息队列Kafka基础

      消息队列,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。例如Java中的队列:   上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。我们可以简单理解消息队列就是

    2024年02月16日
    浏览(51)
  • [kafka]kafka如何保证消息有序

    严格的说,kafka只能保证同一个分区内的消息存储的有序性。 这个问题并没有标准答案,面试官只是想看看你如何思考的。 kafka只能保证单partition有序,如果kafka要保证多个partition有序,不仅broker保存的数据要保持顺序,消费时也要按序消费。假设partition1堵了,为了有序,那

    2024年02月16日
    浏览(37)
  • 一文带你如何用SpringBoot+RabbitMQ方式来收发消息

    预告了本篇的内容:利用RabbitTemplate和注解进行收发消息,还有一个我临时加上的内容:消息的序列化转换。 本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管

    2024年02月09日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包