一个基于Kafka客户端封装的工具,Kafka开发效率神器

这篇具有很好参考价值的文章主要介绍了一个基于Kafka客户端封装的工具,Kafka开发效率神器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

GitHub源码https://github.com/zhangchuangiie/SimpleKafka

SimpleKafka(Kafka客户端封装工具类)

一个基于Kafka客户端封装的工具,Kafka开发效率神器

特点:

  1. 封装了常用的Kafka客户端操作,无需维护配置,无需初始化客户端,真正实现了一行代码调用
  2. 将连接池的维护封装在工具类里面,多线程使用也无需维护客户端集合

使用方式:

只需要集成1个KafkaUtil.java文件即可,修改里面的kafka服务地址即可

典型示例:

  1. 同步生产: LinkedHashMap<String, Object> recordMeta = KafkaUtil.sendToKafka("RULEa93304e6d844000","222","aaaa");
  2. 异步生产: KafkaUtil.sendToKafkaAsync("RULEa93304e6d844000", "222", "aaaa");
  3. 消费数据: ArrayList<LinkedHashMap<String, Object>> buffer = KafkaUtil.recvFromKafka("RULEa93304e6d844000", "group1");
  4. 重置偏移: KafkaUtil.resetOffsetToEarliest("RULEa93304e6d844000", "group1");

接口介绍:

  1. kafkaListTopics: topic列表
  2. createTopic: topic创建
  3. delTopic: topic删除
  4. partitionsTopic: topic的分区列表,分区和副本数
  5. delGroupId: 删除groupId
  6. descCluster: 集群的节点列表
  7. kafkaConsumerGroups: 消费者列表
  8. kafkaConsumerGroups: 指定topic的活跃消费者列表
  9. sendToKafka: 生产数据到指定的topic,同步接口{"topic":"RULEa93304e6d844000","partition":1,"offset":681}
  10. sendToKafkaAsync: 生产数据到指定的topic,异步接口,默认回调
  11. sendToKafkaAsync: 生产数据到指定的topic,异步接口,自定义回调
  12. recvFromKafka: 按groupId消费指定topic的数据[{"topic":"RULEa93304e6d844000","key":"222","value":"aaaa","partition":1,"offset":681}]
  13. recvFromKafkaByOffset: 消费指定topic指定partition对应的offset数据
  14. recvFromKafkaByTimestamp: 消费指定topic指定partition对应的timestamp以后的数据
  15. resetOffsetToTimestamp: 重置指定topic的offset到对应的timestamp
  16. resetOffsetToEarliest: 重置指定topic的offset到最早
  17. resetOffsetToLatest: 重置指定topic的offset到最晚,一般在跳过测试脏数据时候使用
  18. consumerPositions: 获取当前消费偏移量情况{"partitionNum":2,"dataNum":1,"lagNum":0,"positions":[{"partition":0,"begin":0,"end":0,"current":0,"current1":0,"size":0,"lag":0},{"partition":1,"begin":681,"end":682,"current":682,"current1":682,"size":1,"lag":0}]}
  19. topicSize: 获取指定topic数据量详情情况 [{"partition": 0,"begin": 65,"end": 65,"size": 0}]
  20. topicSizeAll: 获取所有topic数据量详情情况
  21. topicSizeStatistics: 获取指定topic数据量统计{"partitionNum":5452,"dataNum":41570647}
  22. topicSizeStatisticsAll: 获取所有topic数据量统计{"topicNum":2550,"partitionNum":5452,"dataNum":41570647}

接口列表:

  1. kafkaListTopics: List kafkaListTopics()
  2. createTopic: void createTopic(String topic)
  3. delTopic: void delTopic(String topic)
  4. partitionsTopic: List partitionsTopic(String topic)
  5. delGroupId: void delGroupId(String groupId)
  6. descCluster: List descCluster()
  7. kafkaConsumerGroups: List kafkaConsumerGroups()
  8. kafkaConsumerGroups: List kafkaConsumerGroups(String topic)
  9. sendToKafka: LinkedHashMap<String, Object> sendToKafka(String topic, String key, String value)
  10. sendToKafkaAsync: void sendToKafkaAsync(String topic, String key, String value)
  11. sendToKafkaAsync: void sendToKafkaAsync(String topic, String key, String value,Callback callback)
  12. recvFromKafka: ArrayList<LinkedHashMap<String, Object>> recvFromKafka(String topic, String groupId)
  13. recvFromKafkaByOffset: ArrayList<LinkedHashMap<String, Object>> recvFromKafkaByOffset(String topic, String groupId,int partition,long offset)
  14. recvFromKafkaByTimestamp: ArrayList<LinkedHashMap<String, Object>> recvFromKafkaByTimestamp(String topic, String groupId,int partition,long timestamp)
  15. resetOffsetToTimestamp: boolean resetOffsetToTimestamp(String topic, String groupId, long timestamp)
  16. resetOffsetToEarliest: boolean resetOffsetToEarliest(String topic, String groupId)
  17. resetOffsetToLatest: boolean resetOffsetToLatest(String topic, String groupId)
  18. consumerPositions: List<LinkedHashMap<String, Object>> consumerPositions(String topic, String groupId)
  19. topicSize: List<LinkedHashMap<String, Object>> topicSize(String topic)
  20. topicSizeAll: LinkedHashMap<String, Object> topicSizeAll()
  21. topicSizeStatistics: LinkedHashMap<String, Object> topicSizeStatistics(String topic)
  22. topicSizeStatisticsAll: LinkedHashMap<String, Object> topicSizeStatisticsALL()

示范应用:

为了说明该工具的效用,基于该工具实现了一个HTTP接口的消息队列服务,该服务只用了几十行代码,就实现了基于标签内容的发布订阅服务,服务见APIKafka.java,客户端示例见ClientKafka.java。

该服务支持生产者任意标注标签,支持消费者按表达式条件订阅数据,表达式支持与或非,支持集合查找,以及字符串子串匹配。

同时也支持消息回溯消费已经消息统计查询。

实现了流式消息检索的基本需求。 

一个基于Kafka客户端封装的工具,Kafka开发效率神器,kafka,java

APIKafka,支持生产者任意标注标签,标签是开放的,可以是任意JSON,Key无需预先定义和Value也不必是枚举值,支持消费者按表达式条件订阅数据,支持开源表达式语言OGNL,包括支持与或非,支持对象取值,支持数组和集合的访问,也支持Java表达式,常用的有contains,startsWith,endsWith,length等,也支持matches正则匹配。可以满足流式消息检索的各种匹配要求。

联系人:

有问题可以联系:zhangchuang@iie.ac.cn文章来源地址https://www.toymoban.com/news/detail-754696.html

到了这里,关于一个基于Kafka客户端封装的工具,Kafka开发效率神器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • AWS 中文入门开发教学 36- 连接MySQL - MySQL客户端工具

    在EC2安装 MySQL 客户端工具,连接到 MySQL RDS 数据库实例 拷贝数据库终端节点 安装 MySQL 客户端命令行工具 连接到 MySQL 服务器实例 建立数据表 添加数据

    2024年02月12日
    浏览(44)
  • 【libevent】http客户端3:简单封装

    LibEventHttp

    2024年02月15日
    浏览(52)
  • Kafka-客户端使用

    Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API 封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来

    2024年02月22日
    浏览(53)
  • 基于.Net开发的ChatGPT客户端,兼容Windows、IOS、安卓、MacOS、Linux

    2023年目前要说最热的点,肯定是ChatGPT了。 ChatGPT官方提供的网页版本,还有需要科*上网,很多人都会基于此进行封装。 现在是移动互联网时代,基于手机APP的需求还是很大的。 所以,今天给大家推荐一个ChatGPT客户端开源项目,兼容苹果和安卓手机、PC。 这是基于C#开发的客

    2023年04月20日
    浏览(44)
  • kafka 02——三个重要的kafka客户端

    请参考下面的文章: Kafka 01——Kafka的安装及简单入门使用. AdminClient API: 允许管理和检测Topic、Broker以及其他Kafka对象。 Producer API: 发布消息到一个或多个API。 Consumer API: 订阅一个或多个Topic,并处理产生的消息。 如下: 完整的pom 关于配置,可参考官网: https://kafka.apa

    2024年02月13日
    浏览(47)
  • kafka客户端应用参数详解

    Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可: 1、消息发送者主流程  然后可以使用Kafka提供的Producer类,快速发送消息。 ​ 整体来说,构建Producer分为三个步骤: 设置Producer核心属性  :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTST

    2024年02月07日
    浏览(54)
  • MQTT记录(概述,docker部署,基于spring-integration-mqtt实现消息订阅与发布,客户端工具测试)

    需要spring-boot集成spring-integration-mqtt代码的直接跳到第5部分 1.1 MQTT是什么呢? message queue telemetry translation 是一种基于发布与订阅的轻量级消息传输协议.适用于低带宽或网络不稳定的物联网应用.开发者可以使用极少的代码来实现物联网设备之间的消息传输.mqtt协议广泛应用于物

    2024年02月12日
    浏览(47)
  • kafka之java客户端实战

            Kafka提供了两套客户端API, HighLevel API和LowLevel API 。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,

    2024年01月17日
    浏览(61)
  • 自定义kafka客户端消费topic

    使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。 后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没

    2024年02月02日
    浏览(55)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(62)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包