Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async)

这篇具有很好参考价值的文章主要介绍了Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、背景

项目开发了一个类似kafka tools查询工具的kafka 查询,现在需要测试一下如果通过字节数组的形式写入,看看查询有没有问题

二、kafka查询代码

Python代码示例:

from kafka import KafkaProducer
import json

# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 定义JSON数据
json_data = {
    'name': '测试',
    'age': 30,
    'email': 'johndoe@example.com'
}

# 将JSON数据转换为字符串,并指定ensure_ascii参数为False,以保留非ASCII字符
json_str = json.dumps(json_data, ensure_ascii=False)

# 将字符串编码为字节数组
byte_array = json_str.encode('utf-8')

# 发送字节数组消息到Kafka主题
producer.send('lqiju_test_json_trans_bytearray_20230703', value=byte_array)

# 关闭Kafka生产者连接
producer.close()

Java代码示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaByteArrayProducer {
    public static void main(String[] args) {
        // Kafka配置
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        // 创建KafkaProducer实例
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);

        // 消息数据
        String topic = "your-topic-name";
        byte[] messageBytes = "Hello, Kafka!".getBytes();

        // 创建ProducerRecord对象并发送消息
        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, messageBytes);
        producer.send(record);

        // 关闭KafkaProducer
        producer.close();
    }
}

三、解决报错return '<SimpleProducer batch=%s>' % self.async

把上面代码运行,报错

Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async),Python Testtools,kafka,大数据,python,java,字节数组,kafka

 

因为py3.7里面async已经变成了关键字。导致不兼容。
 
解决办法:
pycharm工具,在执行的脚本右键点击open in terminal:执行pip install kafka-python

或者在settings里面安装

Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async),Python Testtools,kafka,大数据,python,java,字节数组,kafka

 重新执行,OK

Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async),Python Testtools,kafka,大数据,python,java,字节数组,kafka

 四、小结

kafka支持存储什么格式的消息?

Kafka支持存储任意格式的消息,它本身并不关心消息的具体格式。Kafka将消息视为字节数组(bytes)的形式进行传输和存储。这意味着你可以以任何你喜欢的方式序列化你的消息,并将其转换为字节数组进行发送到Kafka。

常见的消息格式包括文本(如JSON、XML、CSV等)、二进制数据、Avro、Protobuf等。你可以根据你的需求和使用场景选择合适的消息格式。

在发送消息时,你需要将消息转换为字节数组,并使用Kafka提供的Producer API将字节数组发送到指定的Topic。在消费消息时,你可以使用相应的Consumer API从Kafka订阅的Topic中接收字节数组,并根据你事先定义的消息格式将其反序列化为可读的格式。

总之,Kafka本身并不限制消息的格式,你可以使用任何你喜欢的格式来存储和传输消息。文章来源地址https://www.toymoban.com/news/detail-518961.html

到了这里,关于Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Navicat远程连接云服务器数据库的方法(含报错解决方法)

      目    录  1.服务器控制台开启SSH服务  2.navicat远程连接云服务器数据库步骤: 3.navicat 操作演示 4.navicat远程连接常见错误报警     (图:001创建MySQL的ssh通道允许远程服务)     (首先登录云服务器官方提供的后台管理控制台,此处以腾讯云服务器为演示)  1.点击防火

    2024年02月05日
    浏览(49)
  • python自带的venv创建虚拟环境报错Error: Command returned non-zero exit status 1.

    Windows 10 教育版64位 Python 3.6.3 今天使用 python 自带的 venv 使用如下的命令: 创建虚拟环境报错: Error: Command \\\'[\\\'E:\\\\Code\\\\Python\\\\Git\\\\test1\\\\aaa\\\\Scripts\\\\python.exe\\\', \\\'-Im\\\', \\\'ensurepip\\\', \\\'--upgrade\\\', \\\'--default-pip\\\']\\\' returned non-zero exit status 1.) 这个时候,回到你需要创建 venv 的路径,发现我的新环境

    2024年02月16日
    浏览(49)
  • python利用pandas和csv包两种方式向一个csv文件写入或追加数据

    或者 一行加入一个数据

    2024年02月16日
    浏览(72)
  • python数组循环的几种方式

     Python中循环数组有几种方式: for-in循环,可以遍历数组中的每一个元素。 while循环,使用索引进行循环。 列表推导式,可以快速创建新的列表。 递归,可以遍历多维数组。 enumerate() 函数,在循环中同时获取索引和元素

    2024年02月16日
    浏览(84)
  • python面试题二:数组(字符串)实现反转的方式

    a_list = [1, 4, 6, 2, 9, 4, 8, 3, 7] print(reversed(a_list)) aa_list = list(reversed(a_list)) print(a_list) print(aa_list) b_list = [1, 4, 6, 2, 9, 4, 8, 3, 7] b_list.reverse() print(b_list) c_list = [1, 4, 6, 2, 9, 4, 8, 3, 7] print(c_list[::-1]) 参考:风一样汉字–Python 实现字符串反转的9种方法

    2023年04月12日
    浏览(64)
  • QT字节数组类QByteArray

    字节数组类以 ‘\\0’结尾,索引的下标从0开始。 第一次调用fill函数,不指定size参数,按照之前的长度,跟新值 第二次调用fill函数,指定size参数,重新调整字节数组的长度,并更新值 访问QByteArray类对象的某个元素有4种方式: [] at() data[] constData[] 其中,[]和data[]可读可写,

    2024年02月16日
    浏览(62)
  • FlinkSql写入/读取Kafka

    创建写入kafka的sink表 创建catalog 插入数据 发现kafka中已有数据 创建连接Kafka的Source表 创建iceberg表 3.插入数据 问题: 报错如下:org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId=‘null’, inTransaction=false, closed=false} Caused by: org.ap

    2024年02月15日
    浏览(51)
  • 使用clickhouse kafka表引擎消费kafka写入clickhouse

    1:seatunnel 消费kafka数据写入clickhouse 文章目录 系列文章目录 文章目录 前言 1.创建kafka 引擎表  2.创建clickhouse MergeTree表 3.创建kafka物化视图写入结构表 三、问题 1、修改物化视图 总结 本文使用 seatunnel 消费kafka数据写入clickhouse 文章的kafka topic以及格式,用另一种方式写入cl

    2024年02月17日
    浏览(50)
  • 前端学习C语言 - 数组和字节序

    本篇主要介绍: 一维二维数组 、 字符数组 、 数组名和初始化注意点 以及 字节序 。 初始化 有以下几种方式对数组初始化: Tip :以上写法创建的数组都是 不可变大小的 。 练习1 题目 : int a[5] = {1} ,请问 a 的每个值是多少? 输出: 1 0 0 0 0 。 在C和C++中,当我们创建数组

    2024年02月09日
    浏览(45)
  • flink写入到kafka 大坑解析。

    1.kafka能不能发送null消息?    能! 2 flink能不能发送null消息到kafka? 不能!     这里就报了java的最常见错误 空指针,原因就是flink要把kafka的消息getbytes。所以flink不能发送null到kafka。 这种问题会造成什么后果? flink直接挂掉。 如果我们采取了失败重试机制会怎样? 数据重

    2024年02月15日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包