Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async)

这篇具有很好参考价值的文章主要介绍了Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、背景

项目开发了一个类似kafka tools查询工具的kafka 查询,现在需要测试一下如果通过字节数组的形式写入,看看查询有没有问题

二、kafka查询代码

Python代码示例:

from kafka import KafkaProducer
import json

# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 定义JSON数据
json_data = {
    'name': '测试',
    'age': 30,
    'email': 'johndoe@example.com'
}

# 将JSON数据转换为字符串,并指定ensure_ascii参数为False,以保留非ASCII字符
json_str = json.dumps(json_data, ensure_ascii=False)

# 将字符串编码为字节数组
byte_array = json_str.encode('utf-8')

# 发送字节数组消息到Kafka主题
producer.send('lqiju_test_json_trans_bytearray_20230703', value=byte_array)

# 关闭Kafka生产者连接
producer.close()

Java代码示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaByteArrayProducer {
    public static void main(String[] args) {
        // Kafka配置
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        // 创建KafkaProducer实例
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);

        // 消息数据
        String topic = "your-topic-name";
        byte[] messageBytes = "Hello, Kafka!".getBytes();

        // 创建ProducerRecord对象并发送消息
        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, messageBytes);
        producer.send(record);

        // 关闭KafkaProducer
        producer.close();
    }
}

三、解决报错return '<SimpleProducer batch=%s>' % self.async

把上面代码运行,报错

Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async),Python Testtools,kafka,大数据,python,java,字节数组,kafka

 

因为py3.7里面async已经变成了关键字。导致不兼容。
 
解决办法:
pycharm工具,在执行的脚本右键点击open in terminal:执行pip install kafka-python

或者在settings里面安装

Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async),Python Testtools,kafka,大数据,python,java,字节数组,kafka

 重新执行,OK

Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async),Python Testtools,kafka,大数据,python,java,字节数组,kafka

 四、小结

kafka支持存储什么格式的消息?

Kafka支持存储任意格式的消息,它本身并不关心消息的具体格式。Kafka将消息视为字节数组(bytes)的形式进行传输和存储。这意味着你可以以任何你喜欢的方式序列化你的消息,并将其转换为字节数组进行发送到Kafka。

常见的消息格式包括文本(如JSON、XML、CSV等)、二进制数据、Avro、Protobuf等。你可以根据你的需求和使用场景选择合适的消息格式。

在发送消息时,你需要将消息转换为字节数组,并使用Kafka提供的Producer API将字节数组发送到指定的Topic。在消费消息时,你可以使用相应的Consumer API从Kafka订阅的Topic中接收字节数组,并根据你事先定义的消息格式将其反序列化为可读的格式。

总之,Kafka本身并不限制消息的格式,你可以使用任何你喜欢的格式来存储和传输消息。文章来源地址https://www.toymoban.com/news/detail-518961.html

到了这里,关于Python 字节数组方式写入kafka(含报错return ‘<SimpleProducer batch=%s>‘ % self.async)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Navicat远程连接云服务器数据库的方法(含报错解决方法)

      目    录  1.服务器控制台开启SSH服务  2.navicat远程连接云服务器数据库步骤: 3.navicat 操作演示 4.navicat远程连接常见错误报警     (图:001创建MySQL的ssh通道允许远程服务)     (首先登录云服务器官方提供的后台管理控制台,此处以腾讯云服务器为演示)  1.点击防火

    2024年02月05日
    浏览(42)
  • python自带的venv创建虚拟环境报错Error: Command returned non-zero exit status 1.

    Windows 10 教育版64位 Python 3.6.3 今天使用 python 自带的 venv 使用如下的命令: 创建虚拟环境报错: Error: Command \\\'[\\\'E:\\\\Code\\\\Python\\\\Git\\\\test1\\\\aaa\\\\Scripts\\\\python.exe\\\', \\\'-Im\\\', \\\'ensurepip\\\', \\\'--upgrade\\\', \\\'--default-pip\\\']\\\' returned non-zero exit status 1.) 这个时候,回到你需要创建 venv 的路径,发现我的新环境

    2024年02月16日
    浏览(45)
  • python利用pandas和csv包两种方式向一个csv文件写入或追加数据

    或者 一行加入一个数据

    2024年02月16日
    浏览(68)
  • python数组循环的几种方式

     Python中循环数组有几种方式: for-in循环,可以遍历数组中的每一个元素。 while循环,使用索引进行循环。 列表推导式,可以快速创建新的列表。 递归,可以遍历多维数组。 enumerate() 函数,在循环中同时获取索引和元素

    2024年02月16日
    浏览(74)
  • python面试题二:数组(字符串)实现反转的方式

    a_list = [1, 4, 6, 2, 9, 4, 8, 3, 7] print(reversed(a_list)) aa_list = list(reversed(a_list)) print(a_list) print(aa_list) b_list = [1, 4, 6, 2, 9, 4, 8, 3, 7] b_list.reverse() print(b_list) c_list = [1, 4, 6, 2, 9, 4, 8, 3, 7] print(c_list[::-1]) 参考:风一样汉字–Python 实现字符串反转的9种方法

    2023年04月12日
    浏览(51)
  • QT字节数组类QByteArray

    字节数组类以 ‘\\0’结尾,索引的下标从0开始。 第一次调用fill函数,不指定size参数,按照之前的长度,跟新值 第二次调用fill函数,指定size参数,重新调整字节数组的长度,并更新值 访问QByteArray类对象的某个元素有4种方式: [] at() data[] constData[] 其中,[]和data[]可读可写,

    2024年02月16日
    浏览(60)
  • FlinkSql写入/读取Kafka

    创建写入kafka的sink表 创建catalog 插入数据 发现kafka中已有数据 创建连接Kafka的Source表 创建iceberg表 3.插入数据 问题: 报错如下:org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId=‘null’, inTransaction=false, closed=false} Caused by: org.ap

    2024年02月15日
    浏览(49)
  • 使用clickhouse kafka表引擎消费kafka写入clickhouse

    1:seatunnel 消费kafka数据写入clickhouse 文章目录 系列文章目录 文章目录 前言 1.创建kafka 引擎表  2.创建clickhouse MergeTree表 3.创建kafka物化视图写入结构表 三、问题 1、修改物化视图 总结 本文使用 seatunnel 消费kafka数据写入clickhouse 文章的kafka topic以及格式,用另一种方式写入cl

    2024年02月17日
    浏览(49)
  • 前端学习C语言 - 数组和字节序

    本篇主要介绍: 一维二维数组 、 字符数组 、 数组名和初始化注意点 以及 字节序 。 初始化 有以下几种方式对数组初始化: Tip :以上写法创建的数组都是 不可变大小的 。 练习1 题目 : int a[5] = {1} ,请问 a 的每个值是多少? 输出: 1 0 0 0 0 。 在C和C++中,当我们创建数组

    2024年02月09日
    浏览(43)
  • pylink消费kafka写入ES

    # -*- coding: utf-8 -*- from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction from abc import ABC, abstractmethod from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction fr

    2024年02月08日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包