消息队列kafka基础,基于go代码举例

这篇具有很好参考价值的文章主要介绍了消息队列kafka基础,基于go代码举例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基础概念

broker

broker就是单个kafka实例。Kafka集群中,一个kafka服务器就是一个broker。

topic

一类消息的集合。在kafka中,消息以主题为单位进行归类,producer负责将消息发送到指定的主题,而consumer负责订阅主题并进行消费。在生产者向kafka发送数据以及消费者订阅数据都要指定具体的topic来消费。

partition

topic是一个逻辑上的概念,而partition是物理层的概念。一个topic可以分为多个分区,并且可以分配在多个kafka服务器上。一个分区只属于单个topic,很多时候也会把分区称为主题分区(Topic-Partition)。分区可以提高kafka的并发效率。如多个消费者可以同时消费一个topic下的不同分区,但不保证topic消费的顺序性。(同一个分区下可以保证消息消费的顺序性,而针对一个主topic消费是无序的)
 

副本 partition分区的备份

partition的副本,从leader replica同步数据,当kafka服务挂了之后,防止数据丢失。副本的数量不能超过broker的数量,否则创建主题时会失败。

所有副本的统称(Assigned Repllicas),AR = ISR + OSR。ISR:表示和Leader保持同步(默认30s)的follower集合。OSR:表示Follower与Leader副本同步时,延迟过多的副本。

producer生产者

生产者,生产并发送消息的一方。可以同步发送和异步发送。在创建生产者示例的时候需要传入配置项和kafka实例地址等消息。比如生产者的ACK机制,生产者写入分区策略(向分区发送数据的规则等)

消息队列kafka基础,基于go代码举例,kafka,分布式,go

consumer 消费者和消费者组

消费者,接收订阅消费消息的一方。

kafka消费者组(Consumer Group)是kafka提供的可扩展且具有容错性的消费者机制。
  它是一个组,所以内部有可以有多个消费者,这些消费者共用一个ID(Group ID),一个组内的所有消费者共同协作,完成对订阅的主题的所有分区进行消费。其中一个主题中的一个分区只能由一个消费者组中的一个消费者消费。

消费者组的特性

作用:可以同时消费多个分区,kafka自动提交offset基于消费者组id。
一个消费者组可以有多个消费者。
Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组。
每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费。消费者组之间不影响。

消费者组中的消费者最好等于topic下面的分区个数,可以实现效率最高的消费。分区数可以大于消费者的数量,但是消费者数量不能多于分区数,会造成多余的消费者没有分区可以消费的情况(一个消费者组的消费者只能消费一个topic下的一个分区)。消费者和分区的分配策略可以通过设置消费者策略来实现。主要包括三种情况,详情可以下面的消费者策略

Range范围分配策略:

Range范围分配策略是kafka默认的分配策略, 它可以确保每个消费者消费的分区数量是均衡的;

RoundRobin轮询策略:

RoundRobinAssignor轮询策略是将消费者组所有消费者以及消费者所订阅的所有的topic的partition按照字典顺序排序(topic和partition的hashcode进行排序), 然后通过轮询方式逐个将分区一次分配给每个消费者;

stricky粘性分配策略:

从kafak 0.11.x开始, 引入此类分配策略; 主要目的:

  • 分区分配尽可能均匀;
  • 在发生rebalance的时候, 分区的分配尽可能与上一次分配保持相同;
    • 没有发生rebalance时, stricky粘性分配策略和RoundRobin分配策略类似;

offset

  消息在日志中的位置,可以理解是消息在partition上的偏移量,也是代表该消息的唯一序号;重点:分区上的偏移量。在生产者发送消息的时候,可以通过kafka返回来查看offset的值。

offset的作用是记录数据的位置,并且kafka提供主动提交offset机制(会保存消费者组上次消费的offset和分区以及topic),可以避免消费者宕机以后再重新消费数据。出了kafka自动提交offset,kafka也提供了API可以让消费者主动提交offset。

  消费过程中可以通过创建单个消费者(不指定消费者组id),这种情况下如果配置消费的offset设置为为最早的offset(最新的就是从offset最大的地方开始消费,这里讨论没有意义),消费者挂了以后,每次消费会重新消费最早的数据。而不是上次的数据

  如果用消费者组消费的话并且设置为最早的offset,kafka会自动帮我们保存上次消费的位置,当消费者宕机了以后,重启消费是消费上次的位置。(当然也可能会造成数据的丢失或者重复,即kafka还没来得及保存offset挂了,重启后从没保存的offset开始消费,或者消费者还没来及的保存,kafka已经保存了offset:丢失)

进阶

生产者写入策略

轮询分区

设置方法 :

消息队列kafka基础,基于go代码举例,kafka,分布式,go

原理

消息队列kafka基础,基于go代码举例,kafka,分布式,go

轮询方法就是按照分区0,1,2,0,1,2的方式轮询写入数据

随机策略

设置方法

消息队列kafka基础,基于go代码举例,kafka,分布式,go

字面意思,随机写入,一般不用,容易造成数据倾斜(某个分区写入了大量数据)

按照key值来分

设置方法

配置文件配置

消息队列kafka基础,基于go代码举例,kafka,分布式,go

对想发送到同一个分区的消息设置相同的key值

消息队列kafka基础,基于go代码举例,kafka,分布式,go

自定义策略

消息队列kafka基础,基于go代码举例,kafka,分布式,go

消息队列kafka基础,基于go代码举例,kafka,分布式,go

消费者消费策略

消费者策略主要是定义对于多个消费者,分区如何分配的问题。以及解决发生reblance再平衡时,如何进行分区和消费者之间的分配。主要包括range范围分配,Round轮询策略(类似于生产者策略中的Round,Sticky粘性分配策略)

什么时候会发生reblance

消费者个数发生变化

订阅主题发生变化

消费分区的个数发生变化

以上都会造成分区数和消费者数之间的重新分配,所以要设置分配策略

range范围分配

配置方法

消息队列kafka基础,基于go代码举例,kafka,分布式,go

具体原理:

消息队列kafka基础,基于go代码举例,kafka,分布式,go

Round轮询分配

消息队列kafka基础,基于go代码举例,kafka,分布式,go

配置方法

消息队列kafka基础,基于go代码举例,kafka,分布式,go

Sticky粘性分配

粘性分配和Round轮询分配在没有发生reblance的时候都是一样的。

不同点是在发生reblance的时候,sticky策略会尽可能保存发生变化之前的架构,例如将属于挂到的consumer的分区均匀分配给剩下的consumer。而前面的2个策略是根据现有的分区数和消费者数量再次执行一遍它们的策略。显而易见,sticky策略会减少资源的消耗。

配置方法

消息队列kafka基础,基于go代码举例,kafka,分布式,go

kafka消费提交机制

提交就是Offset提交,分为kafka自动提交和消费者主动提交

自动提交是默认的,在代码中默认是true

消息队列kafka基础,基于go代码举例,kafka,分布式,go

而消费者主动提交需要我们调用kafka提供的api进行提交

消息队列kafka基础,基于go代码举例,kafka,分布式,go

kafka生产端ACK机制

ack机制是生产者向kafka发送数据避免消息丢失的一种解决办法,可以通过设置0,-1,1代表不同的确认程度。

ack为0

生产端不等kafka发送确认消息,直接发送下一条(leader和副本数据都可能丢失)

ack为1

生产端只等leader副本写入,不管其他副本是否写入,发送下一条。(副本数据可能丢失)

ack为-1

生产端等所有副本写入再发送下一条。

kafka生产端的幂等性和事务

幂等性

幂等性开启方法

消息队列kafka基础,基于go代码举例,kafka,分布式,go

大体概念:

PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。但是如果重新连接会产生一个新的pid(宕机后重启变化)

Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number

分区号:写入对应的分区

Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。

缺点:幂等性只能保证一个Producer会话的消息不重复,宕机重启还会造成重复,所以kafka提供了事务。

事务

消息队列kafka基础,基于go代码举例,kafka,分布式,go

生产者创建时,设置全局唯一的事务ID --- TransactionID;事务ID与PID绑定,当producer重启后,会根据事务ID查找PID,因此能够保证全局at-exactly-once语义

配置方法:transactional.id(注:使用事务的前提是必须开启幂等性)

解决全局精确唯一语义是kafka事务引入的初衷,但是后期又引入的新的功能:producer生产消息的batch在一个原子单元内完成;文章来源地址https://www.toymoban.com/news/detail-785970.html

代码示例

生产者同步发送

package main

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/sarama"
   "time"
)

type Tem struct {
   Temperature int    `json:"temperature"`
   Humidity    int    `json:"humidity"`
   Name        string `json:"name"`
}

func main() {
   config := sarama.NewConfig()
   // 生产者配置
   //确认ACK all代表所有副本全部写入
   config.Producer.RequiredAcks = sarama.WaitForAll

   //random 写入策略 随机写入一般不用 容易引起数据倾斜
   // config.Producer.Partitioner = sarama.NewRandomPartitioner

   //返回结果  同步发送必须设置为true  不然SendMessage无法返回结果
   config.Producer.Return.Successes = true
   //轮询写入
   config.Producer.Partitioner = sarama.NewRoundRobinPartitioner

   //按照key写入 需要在kafka发送结构体中接入key参数 如下
   // config.Producer.Partitioner = sarama.NewHashPartitioner

   // 封装消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "devices123"
   // 设置key key写入的时候用到的参数 想写入同一个分区就用同一个key
   msg.Key = sarama.StringEncoder("my-key")
   //time_str := time.Now().Format("2006-01-02 15:04:05")
   tem1 := Tem{
      Temperature: 180,
      Humidity:    20,
      Name:        "kafka",
   }
   marshal, _ := json.Marshal(tem1)
   msg.Value = sarama.StringEncoder(string(marshal))
   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"192.168.174.128:9092"}, config)
   if err != nil {
      fmt.Println("producer closed", err)
      return
   }
   defer client.Close()
   for {
      select {
      case <-time.Tick(1 * time.Second):
         partition, offset, err := client.SendMessage(msg)

         if err != nil {
            fmt.Println("send failed", err)
            return
         }
         fmt.Println("partition:", partition, "offset:%v", offset)

         //}

         // 发送消息
      }

   }

}

生产者异步发送

package main

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/sarama"
   "log"
   "sync"
   "sync/atomic"
)

type Tem struct {
   Temperature int    `json:"temperature"`
   Humidity    int    `json:"humidity"`
   Name        string `json:"name"`
}

func main() {
   config := sarama.NewConfig()
   // 异步生产者不建议把 Errors 和 Successes 都开启,一般开启 Errors 就行
   // 同步生产者就必须都开启,因为会同步返回发送成功或者失败
   config.Producer.Return.Errors = false   // 设定需要返回错误信息
   config.Producer.Return.Successes = true // 设定需要返回成功信息

   //创建producer  NewAsyncProducer异步工厂
   producer, err := sarama.NewAsyncProducer([]string{"192.168.174.128:9092"}, config)
   if err != nil {
      log.Fatal("NewSyncProducer err:", err)
   }
   tem1 := Tem{
      Temperature: 180,
      Humidity:    20,
      Name:        "kafka",
   }
   marshal, err := json.Marshal(tem1)
   //创建信息
   str := sarama.StringEncoder(string(marshal))
   // 异步关闭
   defer producer.AsyncClose()

   // 创建消息发送完关闭管道
   finColese := make(chan bool)
   wg := &sync.WaitGroup{}
   wg.Add(1)
   //创建协程监听  写入返回的成功信息 或者错误信息
   go func() {
      // [!important] 异步生产者发送后必须把返回值从 Errors 或者 Successes 中读出来 不然会阻塞 sarama 内部处理逻辑 导致只能发出去一条消息
      for {
         select {
         case s := <-producer.Successes():
            log.Printf("[Producer] key:%v msg:%+v \n", s.Key, s.Value)
         case e := <-producer.Errors():
            if e != nil {
               log.Printf("[Producer] err:%v msg:%+v \n", e.Msg, e.Err)
            }
         case <-finColese:
            fmt.Println("收到关闭信号准备关闭协程")
            wg.Done()
            return

         }
      }
   }()
   var count int64
   // 异步发送
   for i := 0; i < 20; i++ {

      msg := &sarama.ProducerMessage{Topic: "devices123", Key: nil, Value: sarama.StringEncoder(str)}
      // 异步发送只是写入内存了就返回了,并没有真正发送出去
      // sarama 库中用的是一个 channel 来接收,后台 goroutine 异步从该 channel 中取出消息并真正发送
      producer.Input() <- msg
      atomic.AddInt64(&count, 1)

   }
   finColese <- true
   wg.Wait()
   log.Printf("发送完毕 总发送消息数:%v\n", count)

}

消费者组消费数据

package main

// SIGUSR1 toggle the pause/resume consumption
import (
	"context"
	"errors"
	"flag"
	"log"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"

	"github.com/IBM/sarama"
)

// Sarama configuration options
var (
	brokers  = ""
	version  = ""
	group    = ""
	topics   = ""
	assignor = ""
	oldest   = true
	verbose  = false
)

func init() {
	flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
	flag.StringVar(&group, "group", "", "Kafka consumer group definition")
	flag.StringVar(&version, "version", sarama.DefaultVersion.String(), "Kafka cluster version")
	flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list")
	flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
	flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
	flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
	flag.Parse()

	if len(brokers) == 0 {
		panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
	}

	if len(topics) == 0 {
		panic("no topics given to be consumed, please set the -topics flag")
	}

	if len(group) == 0 {
		panic("no Kafka consumer group defined, please set the -group flag")
	}
}

func main() {
	keepRunning := true
	log.Println("Starting a new Sarama consumer")

	if verbose {
		sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
	}

	version, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}

	/**
	 * Construct a new Sarama configuration.
	 * The Kafka cluster version has to be defined before the consumer/producer is initialized.
	 */
	config := sarama.NewConfig()
	config.Version = version

	switch assignor {
	case "sticky":
		config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
	case "roundrobin":
		config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
	case "range":
		config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
	default:
		log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
	}

	if oldest {
		config.Consumer.Offsets.Initial = sarama.OffsetOldest
	}

	/**
	 * Setup a new Sarama consumer group
	 */
	consumer := Consumer{
		ready: make(chan bool),
	}

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
	if err != nil {
		log.Panicf("Error creating consumer group client: %v", err)
	}

	consumptionIsPaused := false
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			// `Consume` should be called inside an infinite loop, when a
			// server-side rebalance happens, the consumer session will need to be
			// recreated to get the new claims
			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
				if errors.Is(err, sarama.ErrClosedConsumerGroup) {
					return
				}
				log.Panicf("Error from consumer: %v", err)
			}
			// check if context was cancelled, signaling that the consumer should stop
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()

	<-consumer.ready // Await till the consumer has been set up
	log.Println("Sarama consumer up and running!...")

	sigusr1 := make(chan os.Signal, 1)
	signal.Notify(sigusr1, syscall.SIGUSR1)

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

	for keepRunning {
		select {
		case <-ctx.Done():
			log.Println("terminating: context cancelled")
			keepRunning = false
		case <-sigterm:
			log.Println("terminating: via signal")
			keepRunning = false
		case <-sigusr1:
			toggleConsumptionFlow(client, &consumptionIsPaused)
		}
	}
	cancel()
	wg.Wait()
	if err = client.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)
	}
}

func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
	if *isPaused {
		client.ResumeAll()
		log.Println("Resuming consumption")
	} else {
		client.PauseAll()
		log.Println("Pausing consumption")
	}

	*isPaused = !*isPaused
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	ready chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	// Mark the consumer as ready
	close(consumer.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
	for {
		select {
		case message, ok := <-claim.Messages():
			if !ok {
				log.Printf("message channel was closed")
				return nil
			}
			log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
			session.MarkMessage(message, "")
		// Should return when `session.Context()` is done.
		// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
		// https://github.com/IBM/sarama/issues/1192
		case <-session.Context().Done():
			return nil
		}
	}
}

到了这里,关于消息队列kafka基础,基于go代码举例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(48)
  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(50)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(49)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(47)
  • 分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

    生产者发送消息流程参考图1: 先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 接下来,如果没有显式

    2024年02月13日
    浏览(50)
  • GO学习之 消息队列(Kafka)

    1、GO学习之Hello World 2、GO学习之入门语法 3、GO学习之切片操作 4、GO学习之 Map 操作 5、GO学习之 结构体 操作 6、GO学习之 通道(Channel) 7、GO学习之 多线程(goroutine) 8、GO学习之 函数(Function) 9、GO学习之 接口(Interface) 10、GO学习之 网络通信(Net/Http) 11、GO学习之 微框架(Gin) 12、GO学习

    2024年02月09日
    浏览(38)
  • 分布式应用之zookeeper集群+消息队列Kafka

           ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。为分布式框架提供协调服务的

    2024年02月06日
    浏览(66)
  • zookeeper+kafka分布式消息队列集群的部署

    目录 一、zookeeper 1.Zookeeper 定义 2.Zookeeper 工作机制 3.Zookeeper 特点 4.Zookeeper 数据结构 5.Zookeeper 应用场景 (1)统一命名服务 (2)统一配置管理 (3)统一集群管理 (4)服务器动态上下线 6.Zookeeper 选举机制 (1)第一次启动选举机制 (2)非第一次启动选举机制 7.部署zookeepe

    2024年02月14日
    浏览(52)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包