【kafka】记一次kafka基于linux的原生命令的使用

这篇具有很好参考价值的文章主要介绍了【kafka】记一次kafka基于linux的原生命令的使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

环境是linux,4台机器,版本3.6,kafka安装在node 1 2 3 上,zookeeper安装在node2 3 4上。

安装好kafka,进入bin目录,可以看到有很多sh文件,是我们执行命令的基础。
【kafka】记一次kafka基于linux的原生命令的使用,kafka,kafka,linq,分布式
启动kafka,下面的命令的后面带的配置文件的相对路径

kafka-server-start.sh ./server.properties

遇到不熟悉的sh文件,直接输入名字并回车,就会提示你可用的命令参数。如果参数用错了,kafka也会提示你相应的错误。

[root@localhost bin]# kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions and     
                                           replica assignment. Update the       
                                           configuration of an existing topic   
                                           via --alter is no longer supported   
                                           here (the kafka-configs CLI supports 
                                           altering topic configs with a --     
                                           bootstrap-server option).            
--at-min-isr-partitions                  if set when describing topics, only    
                                           show partitions whose isr count is   
                                           equal to the configured minimum.     
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  
  connect to>                              to.                                  
--command-config <String: command        Property file containing configs to be 
  config property file>                    passed to Admin Client. This is used 
                                           only with --bootstrap-server option  
                                           for describing and altering broker   
                                           configs.                             
--config <String: name=value>            A topic configuration override for the 
                                           topic being created or altered. The  
                                           following is a list of valid         
                                           configurations:                      
                                         	cleanup.policy                        
                                         	compression.type                      
                                         	delete.retention.ms                   
                                         	file.delete.delay.ms                  
                                         	flush.messages                        
                                         	flush.ms                              
                                         	follower.replication.throttled.       
                                           replicas                             
                                         	index.interval.bytes                  
                                         	leader.replication.throttled.replicas 
                                         	local.retention.bytes                 
                                         	local.retention.ms                    
                                         	max.compaction.lag.ms                 
                                         	max.message.bytes                     
                                         	message.downconversion.enable         
                                         	message.format.version                
                                         	message.timestamp.after.max.ms        
                                         	message.timestamp.before.max.ms       
                                         	message.timestamp.difference.max.ms   
                                         	message.timestamp.type                
                                         	min.cleanable.dirty.ratio             
                                         	min.compaction.lag.ms                 
                                         	min.insync.replicas                   
                                         	preallocate                           
                                         	remote.storage.enable                 
                                         	retention.bytes                       
                                         	retention.ms                          
                                         	segment.bytes                         
                                         	segment.index.bytes                   
                                         	segment.jitter.ms                     
                                         	segment.ms                            
                                         	unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs. It is  
                                           supported only in combination with --
                                           create if --bootstrap-server option  
                                           is used (the kafka-configs CLI       
                                           supports altering topic configs with 
                                           a --bootstrap-server option).        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option). Not supported with 
                                           the --bootstrap-server option.       
--describe                               List details for the given topics.     
--exclude-internal                       exclude internal topics when running   
                                           list or describe command. The        
                                           internal topics will be listed by    
                                           default                              
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting or    
                                           describing topics, the action will   
                                           only execute if the topic exists.    
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist.        
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected). If not supplied   
                                           for create, defaults to the cluster  
                                           default.                             
--replica-assignment <String:            A list of manual partition-to-broker   
  broker_id_for_part1_replica1 :           assignments for the topic being      
  broker_id_for_part1_replica2 ,           created or altered.                  
  broker_id_for_part2_replica1 :                                                
  broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        
  replication factor>                      partition in the topic being         
                                           created. If not supplied, defaults   
                                           to the cluster default.              
--topic <String: topic>                  The topic to create, alter, describe   
                                           or delete. It also accepts a regular 
                                           expression, except for --create      
                                           option. Put topic name in double     
                                           quotes and use the '\' prefix to     
                                           escape regular expression symbols; e.
                                           g. "test\.topic".                    
--topic-id <String: topic-id>            The topic-id to describe.This is used  
                                           only with --bootstrap-server option  
                                           for describing topics.               
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-min-isr-partitions               if set when describing topics, only    
                                           show partitions whose isr count is   
                                           less than the configured minimum.    
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--version                                Display Kafka version.                

如这里,我们创建一个topic名为test。

kafka-topics.sh --create --topic test  --bootstrap-server node1:9092 --partitions 2 --replication-factor 2
Created topic test.

连接其中node1上的kafka获得metedata里的topic列表

[root@localhost bin]# kafka-topics.sh --list --bootstrap-server node1:9092
test

查看某个topic的细节

[root@localhost bin]# kafka-topics.sh --describe --topic test --bootstrap-server node1:9092
Topic: test	TopicId: WgjG4Ou_Q7iQvzgipRgzjg	PartitionCount: 2	ReplicationFactor: 2	Configs: 
	Topic: test	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
	Topic: test	Partition: 1	Leader: 3	Replicas: 3,2	Isr: 3,2

在其中的一台机器上起一个生产者,在其他两台机器上起2个消费者,都在同一个组里。

[root@localhost bin]# kafka-console-producer.sh --broker-list node1:9092 --topic test
>hello 03
>1
>2
>3
>4
>5
>6
>7
>8

可以看到同一个组内,如果组内消费者注册情况不变化有且只有同一个consumer能够消费数据。可以满足对于消息要求顺序性,不能并发消费的情况。

[root@localhost bin]# kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test --group msb
hello 03
1
2
3
4
5
6
7
8

查看某个组内的情况

[root@localhost bin]# kafka-consumer-groups.sh --bootstrap-server node2:9092 --group msb --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST             CLIENT-ID
msb             test            1          24              24              0               console-consumer-4987804d-6e59-4f4d-9952-9afb9aff6cbe /192.168.184.130 console-consumer
msb             test            0          0               0               0               console-consumer-242992e4-7801-4a38-a8f3-8b44056ed4b6 /192.168.184.130 console-consumer

最后看一下zk中的情况吧。
zk根目录下多了一个kafka节点

[zk: localhost:2181(CONNECTED) 1] ls /
[kafka, node1, node6, node7, testLock, zookeeper]

kafka下面有很多metedata信息,包含在这些节点中,如,,

[zk: localhost:2181(CONNECTED) 2] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
#集群id
[zk: localhost:2181(CONNECTED) 3] ls /kafka/cluster 
[id]
[zk: localhost:2181(CONNECTED) 5] get /kafka/cluster/id 
{"version":"1","id":"8t14lxoAS1SdXapY6ysw_A"}
#controller的id
[zk: localhost:2181(CONNECTED) 6] get /kafka/controller
{"version":2,"brokerid":3,"timestamp":"1698841142070","kraftControllerEpoch":-1}

可以看到topics中有一个__consumer_offsets,是kafka用来存储offset的topic。文章来源地址https://www.toymoban.com/news/detail-743729.html

[zk: localhost:2181(CONNECTED) 10] ls /kafka/brokers/topics 
[__consumer_offsets, test]
[zk: localhost:2181(CONNECTED) 12] get /kafka/brokers/topics/__consumer_offsets 
{"partitions":{"44":[1],"45":[2],"46":[3],"47":[1],"48":[2],"49":[3],"10":[3],"11":[1],"12":[2],"13":[3],"14":[1],"15":[2],"16":[3],"17":[1],"18":[2],"19":[3],"0":[2],"1":[3],"2":[1],"3":[2],"4":[3],"5":[1],"6":[2],"7":[3],"8":[1],"9":[2],"20":[1],"21":[2],"22":[3],"23":[1],"24":[2],"25":[3],"26":[1],"27":[2],"28":[3],"29":[1],"30":[2],"31":[3],"32":[1],"33":[2],"34":[3],"35":[1],"36":[2],"37":[3],"38":[1],"39":[2],"40":[3],"41":[1],"42":[2],"43":[3]},"topic_id":"RGxJyefAQlKrmY3LTVbKGw","adding_replicas":{},"removing_replicas":{},"version":3}

到了这里,关于【kafka】记一次kafka基于linux的原生命令的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 记一次线上kafka造成的事故

    背景:所有的原始数据均存储在mysql,mysql会通过binlog将数据同步至kafka消息队列,但是有人将mysql中的数据进行删除(大概有2、3年的数据),被删除的数据也通过binlog被同步至消息队列里导致大量消息积压,且该消息队列只有3个分区,最多3个线程消费,消费方即使过滤也远

    2024年02月13日
    浏览(44)
  • 记一次linux系统使用dockerfile编写容器支持中文字体环境

    今天研发开发一个网上受理需要容器支持中文环境否则服务启动起来会出现中文乱码。 解决思路: 1、先把jar把在本地环境直接加参数运行如果运行起来无乱码说明本地的字体文件就可以直接COPY到容器中使用,如果还是乱码就说明本地的字体文件不支持中文环境,就需要下载

    2024年02月09日
    浏览(55)
  • 记一次Flink通过Kafka写入MySQL的过程

    一、前言 总体思路:source --transform --sink ,即从source获取相应的数据来源,然后进行数据转换,将数据从比较乱的格式,转换成我们需要的格式,转换处理后,然后进行sink功能,也就是将数据写入的相应的数据库DB中或者写入Hive的HDFS文件存储。 思路: pom部分放到最后面。 二

    2024年01月24日
    浏览(47)
  • 记一次docker安装kafka,zookeeper拒绝连接的问题

    第一次玩kafka,在使用docker安装kafka时,出现了如下问题 kafka的启动参数KAFKA_ZOOKEEPER_CONNECT不能是localhost:2181,因为不是在一个容器中,localhost改为ip地址就可以了 nc 命令连接到 localhost:2181,确保ZooKeeper 服务器正在监听该地址,并且可以通过网络进行访问 定位的过程中还遇到了

    2024年02月10日
    浏览(59)
  • 记一次线上kafka重复消费的问题解决及思考

    线上ELK日志发现kafka消费者消费到重复消息 由于生产方本身就发送了重复的消息,导致消费到重复消息 消费方采用的是循环poll的模式,具体是在多线程分租户去批量处理的消息

    2024年02月10日
    浏览(50)
  • 记一次Kafka 故障Too many open files问题

    查看日志发现,有kafka日志报错提示: 查看limit.conf文件,发现文件打开数设置的值也没问题,尝试增大值后,但就是提示Too many open files,报错就是文件打开数过多。 经不懈的百度百度,发现如下图两条关键信息; systemd 服务 模块 最大打开文件数默认为1024,查看其他没改动

    2024年02月13日
    浏览(42)
  • 记一次 springboot集成kafka-本地连接服务器的kafka,连接不上的问题

    yml中配置了bootstrap-servers: 服务器地址:9092 ,但是连接时却报了 Connection to node -1 ( localhost/127.0.0.1:9092 ) could not be established. chat给我的回复如下,通过一些列检查我确定了在服务器上,kafka没有问题 最后还是从一篇博客中的第一句话得到了答案,博客链接放在最下方 我是docker安

    2024年01月17日
    浏览(54)
  • 记一次linux复制病毒处理过程

    某天我的阿里云突然发信息告诉我服务器有自变异木马,我用远程工具连接服务器异常卡顿甚至掉线,reboot也不好使.用阿里云的网页控制台会好些,但还是卡,我又用阿里云控制台重启服务器,重启之后发现服务器完全连不上了,ping也ping不通了,我问了客服说可以用救援连接试试,果

    2024年01月24日
    浏览(60)
  • 记一次Linux启动Mysql异常解决

    并没有发现3306数据库端口 service mysqld start systemctl start mysqld.service 都无效,报错 发现是磁盘空间不足。。。 (下图是已经清理过的结果截图) 然后把磁盘的不重要文件直接删除即可 rm -rf * 总结: 第一步看全局端口占用情况 第二步看日志/根据提示命令看信息 其实,在启动My

    2024年02月14日
    浏览(49)
  • 记一次mysql8 在linux上安装全过程

    参照MYSQL官网官方文档安装 1、mysql官网 mysql官网 2、直接进入文档页 找到安装文档 3、找到自己系统对应的安装文档,选合适的安装方式,我这里使用的是YUM方式 a、开始安装之前需要替换yum仓库 具体步骤如下 b、将下载的文件上传至自己的服务器 如下 c c、执行yum仓库安装命

    2024年02月12日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包