背景
使用 kafka-clients.jar 中的 原生 API 消费 Kafka 数据时,consumer.poll
操作遇到了一个异常:
Consume data error Error deserializing key/value for partition xx-topic-0 at offset 55920. If needed, please seek past the record to continue consumption.
自定义的消费线程遇到这个异常后,一直刷该异常。网上给的解决办法都是使用 spring-kafka 设置异常处理器实现的,但是对于原生 API 怎么办呢?
解决办法
搜到一篇文章 《如何最好地处理 kafkaconsumer poll方法的 SerializationException》,试了一下这个方法,能解决问题。
基本思路如下:
- 针对
SerializationException
异常,获取异常信息。 - 解析异常信息中 at offset 之前的消息名称和分区值,之后的有问题的 offset 值。
- 重新 seek 到下一个 offset 。
详细代码如下:
if (e instanceof SerializationException){
// 拆解出:xx-topic-0 at offset 55920
String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
// 拆解 at offset ,之前是 「topic名称-分区编号」;后面是「错误偏移值」
String [] ss = s.split("at offset");
// 消费的 topic 值
String topics = "xx-topic";
// 解析分区编号和下一个 offset
int nextOffset = Integer.valueOf(ss[1].trim()) + 1;
// topic 分区编号解析
int partition = Integer.valueOf(ss[0].trim().replace(topics+"-", ""));
// 重新定位到下一个消息,否则这里会一直死循环,即传说中的 Kafka 毒丸
TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, nextOffset);
} else {
// TODO 其他异常处理,不影响程序运行
}
启示录
这其实就是 Kafka 消费的 Poison Pill 问题,主要解决办法都是针对 spring-kafka 的,对于原生 API 的程序,可以解析序列化异常消息以获取所需的数据,包括主题名称、分区和偏移量,按提示重新 seek 到下一个偏移值。文章来源:https://www.toymoban.com/news/detail-620852.html
这个异常过去很少碰到,但是一旦碰到,Kafka 消费程序就陷入无限循环中废掉了。参考文档中解析 partition 值的部分没有考虑到 topic 名称含有 “-” 的情况,所以会出错,简单的处理方法是直接替换掉 topicName-
这部分字符串就得到了分区值了。文章来源地址https://www.toymoban.com/news/detail-620852.html
到了这里,关于原生 Kafka 消费时无限报 Error deserializing key/value for partition 问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!