kafka复习:(3)自定义序列化器和反序列化器

这篇具有很好参考价值的文章主要介绍了kafka复习:(3)自定义序列化器和反序列化器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、实体类定义:


public class Company {
    private String name;
    private String address;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "Company{" +
                "name='" + name + '\'' +
                ", address='" + address + '\'' +
                '}';
    }

    public Company(String name, String address) {
        this.name = name;
        this.address = address;
    }

    public Company() {
    }
}

二、自定义序列化器和反序列化器


import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanySerializer implements Serializer<Company> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    //进行字节数组序列化
    @Override
    public byte[] serialize(String topic, Company data) {
        if(data == null){
            return null;
        }
        byte[] name, address;
        try{
            if(data.getName() != null){
                name = data.getName().getBytes("UTF-8");
            }else {
                name = new byte[0];
            }
            if(data.getAddress() != null){
                address = data.getAddress().getBytes("UTF-8");
            }else{
                address = new byte[0];
            }
            ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);

            byteBuffer.putInt(name.length);
            byteBuffer.put(name);
            byteBuffer.putInt(address.length);
            byteBuffer.put(address);
            return byteBuffer.array();
        }catch (UnsupportedEncodingException e){
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public void close() {

    }
}

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name, address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);
        try {
            name = new String(nameBytes, "UTF-8");
            address = new String(addressBytes, "UTF-8");
        } catch (UnsupportedEncodingException ex) {
            throw new SerializationException("Error:"+ex.getMessage());
        }
        return new Company(name,address);

    }

    @Override
    public void close() {

    }
}

三、定义生产者和消费者文章来源地址https://www.toymoban.com/news/detail-667727.html

package com.cisdi.dsp.modules.metaAnalysis.rest;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CompanyProducer {
    public static void main(String[] args) throws Exception{
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设置value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
        properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");
        KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);
        Company company = new Company();
        company.setAddress("Beijing");
        company.setName("Connection");
        ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);
        producer.send(record).get();
    }
}

package com.cisdi.dsp.modules.metaAnalysis.rest;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CompanyConsumer {
    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");
        KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));
        while(true){
            ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){
                System.out.println(consumerRecord.value());
            }
        }
    }
}

到了这里,关于kafka复习:(3)自定义序列化器和反序列化器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java序列化和反序列化

    目录 一、序列化和反序列化 二、Java序列化演示 三、反序列化漏洞 1、含义 ​序列化就是内存中的对象写入到IO流中,保存的格式可以是二进制或者文本内容。反序列化就是IO流还原成对象。 2、用途 (1)传输网络对象 (2)保存Session 1、序列化 java.io.ObjectOutputStream代表对象

    2023年04月25日
    浏览(39)
  • 什么是序列化和反序列化?

    JSON(JavaScript Object Notation)和XML(eXtensible Markup Language)是两种常用的数据交换格式,用于在不同系统之间传输和存储数据。 JSON是一种轻量级的数据交换格式,它使用易于理解的键值对的形式表示数据。JSON数据结构简单明了,易于读写和解析,是基于JavaScript的一种常用数据

    2024年02月09日
    浏览(57)
  • Java序列化和反序列化机制

    在阅读 ArrayList 源码的时候,注意到,其内部的成员变量动态数组 elementData 被Java中的 transient 修饰 transient 意味着Java在序列化时会跳过该字段(不序列化该字段) 而Java在默认情况下会序列化类(实现了 Java.io.Serializable 接口的类)的所有非瞬态(未被 transient 修饰

    2024年03月15日
    浏览(50)
  • TCP定制协议,序列化和反序列化

    目录 前言 1.理解协议 2.网络版本计算器 2.1设计思路 2.2接口设计 2.3代码实现: 2.4编译测试 总结         在之前的文章中,我们说TCP是面向字节流的,但是可能对于面向字节流这个概念,其实并不理解的,今天我们要介绍的是如何理解TCP是面向字节流的,通过编码的方式,自

    2024年02月12日
    浏览(34)
  • [计算机网络]---序列化和反序列化

    前言 作者 :小蜗牛向前冲 名言 :我可以接受失败,但我不能接受放弃    如果觉的博主的文章还不错的话,还请 点赞,收藏,关注👀支持博主。如果发现有问题的地方欢迎❀大家在评论区指正  目录  一、再谈协议 二、序列化和反序化 1、网络版本计算器的场景搭建 2、

    2024年02月20日
    浏览(43)
  • java中的序列化和反序列化

    objectOutputStream 对象的序列化,以流的形式将对象写入文件 构造方法: objectOutputStream(OutputStream out) 传入一个字节输入流创建objectOutputStream对象 成员方法: void writeObject(object obj) 将指定的对象写入objectOutputStream 使用步骤: 创建一个类,这个类实现Serializable接口,Serializable是一

    2024年02月14日
    浏览(35)
  • Java中序列化和反序列化解释

    在Java中,序列化(Serialization)是指将对象的状态转换为字节流的过程,以便将其保存到文件、在网络中传输或持久化到数据库中。而反序列化(Deserialization)则是将字节流转换回对象的过程,恢复对象的状态。 序列化和反序列化主要用于以下场景: 1. 对象持久化:通过序列

    2024年02月07日
    浏览(56)
  • 从浅入深理解序列化和反序列化

    什么是java序列化 序列化:把对象转换为字节序列的过程 反序列:把字节序列恢复为对象的过程 对象序列化机制(object serialization)是java语言内建的一种对象持久化方式,通过对象序列化,可以将对象的状态信息保存为字节数组,并且可以在有需要的时候将这个字节数组通过

    2024年02月06日
    浏览(41)
  • iOS处理json,序列化和反序列化

    Mantle 是一个开源的 Objective-C 框架,用于在 iOS 和 macOS 应用程序中实现模型层的序列化和反序列化。它提供了一种简单而强大的方式来将 JSON数据格式转换为自定义的数据模型对象,以及将数据模型对象转换为字典或 JSON 格式。 Mantle具有如下特点 自动映射 Mantle自动将 JSON 数据

    2024年02月11日
    浏览(63)
  • 【Linux】应用层协议序列化和反序列化

    欢迎来到Cefler的博客😁 🕌博客主页:折纸花满衣 🏠个人专栏:题目解析 🌎推荐文章:C++【智能指针】 前言 在正式代码开始前,会有一些前提知识引入 在网络应用层中,序列化(Serialization)和反序列化(Deserialization)是将数据转换为可在网络上传输的格式,并从网络接

    2024年04月23日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包