etcd

这篇具有很好参考价值的文章主要介绍了etcd。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

etcd

etcd 是一个分布式键值对存储,设计用来可靠而快速的保存关键数据并提供访问。通过分布式锁,leader选举和写屏障(write barriers)来实现可靠的分布式协作。etcd集群是为高可用,持久性数据存储和检索而准备。

Etcd 是 CoreOS 基于 Raft 协议开发的分布式 key-value 存储,可用于服务发现、共享配置以及一致性保障(如数据库选主、分布式锁等)。

单机安装

下载地址 https://github.com/etcd-io/etcd/releases,解压后查看版本

etcd --version
etcdctl version

创建配置文件:conf.yaml

name: "hezebin-etcd"
data-dir: /usr/local/etcd/data
listen-client-urls: "http://127.0.0.1:2379"
advertise-client-urls: "http://127.0.0.1:2379"
initial-cluster-token: "hezebin-etcd-token"
initial-cluster: "hezebin-etcd=http://127.0.0.1:2380"
initial-advertise-peer-urls: "http://127.0.0.1:2380"
initial-cluster-state: "new"

启动 etcd 服务:

etcd --config-file=/usr/local/etcd/conf.yaml

打开终端,执行以下命令来检查 etcd 服务的健康状态:

etcdctl endpoint health

如果一切正常,您将看到类似以下的输出:

127.0.0.1:2379 is healthy: successfully committed proposal: took = 11.667µs

如果 etcd 服务正在运行且健康,您会收到健康状态的确认。

您还可以通过执行以下命令来查看 etcd 集群的成员状态:

etcdctl member list

这将列出 etcd 集群中的成员信息,包括成员的 ID、名称和状态。

设置键值对

etcdctl put name "hezebin"
etcdctl get name

# 查询Etcd所有的key
etcdctl get --prefix ""

# 只读取键 name 的值的命令
etcdctl get name --print-value-only

# 以 name 为前缀的所有键的命令,结果数量限制为2:
etcdctl get --prefix --limit=2 name

# 访问修订版本为 4 时的键的版本
etcdctl get --prefix --rev=4 name

# 假设 etcd 集群已经有下列键:
a = 123
b = 456
z = 789
# 读取大于等于键 b 的 byte 值的键的命令:
etcdctl get --from-key b
# 清空数据
etcdctl del name
# 删除所有 n 前缀的节点
etcdctl del n -- prefix
# 删除键 zoo 并返回被删除的键值对的命令:
etcdctl del --prev-kv zoo

watch操作

watch 监测一个键值的变化,一旦键值发生更新,就会输出最新的值并退出

etcdctl watch name

#更新键值
etcdctl put name "korbin"

# watch 阻塞处返回结果
PUT 
name
korbin

读取键过往版本的值

应用可能想读取键的被替代的值。例如,应用可能想通过访问键的过往版本来回滚到旧的配置。或者,应用可能想通过多个请求来得到一个覆盖多个键的统一视图,而这些请求可以通过访问键历史记录而来。因为 etcd 集群上键值存储的每个修改都会增加 etcd 集群的全局修订版本,应用可以通过提供旧有的 etcd 修改版本来读取被替代的键。

假设 etcd 集群已经有下列键:

foo = bar         # revision = 2
foo1 = bar1       # revision = 3
foo = bar_new     # revision = 4
foo1 = bar1_new   # revision = 5

这里是访问键的过往版本的例子:

$ etcdctl get --prefix foo # 访问键的最新版本
foo
bar_new
foo1
bar1_new
$ etcdctl get --prefix --rev=4 foo # 访问修订版本为 4 时的键的版本
foo
bar_new
foo1
bar1
$ etcdctl get --prefix --rev=3 foo # 访问修订版本为 3 时的键的版本
foo
bar
foo1
bar1
$ etcdctl get --prefix --rev=2 foo # 访问修订版本为 2 时的键的版本
foo
bar
$ etcdctl get --prefix --rev=1 foo # 访问修订版本为 1 时的键的版本

压缩修订版本

如我们提到的,etcd 保存修订版本以便应用可以读取键的过往版本。但是,为了避免积累无限数量的历史数据,压缩过往的修订版本就变得很重要。压缩之后,etcd 删除历史修订版本,释放资源来提供未来使用。所有修订版本在压缩修订版本之前的被替代的数据将不可访问。

这是压缩修订版本的命令:

$ etcdctl compact 5
compacted revision 5
# 在压缩修订版本之前的任何修订版本都不可访问
$ etcdctl get --rev=4 foo
Error:  rpc error: code = 11 desc = etcdserver: mvcc: required revision has been compacted

注意: etcd 服务器的当前修订版本可以在任何键(存在或者不存在)以json格式使用get命令来找到。下面展示的例子中 mykey 是在 etcd 服务器中不存在的:

$ etcdctl get mykey -w=json
{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":15,"raft_term":4}}

lease租约(过期机制)

应用可以为 etcd 集群里面的键授予租约。当键被附加到租约时,它的存活时间被绑定到租约的存活时间,而租约的存活时间相应的被 time-to-live (TTL)管理。在租约授予时每个租约的最小TTL值由应用指定。租约的实际 TTL 值是不低于最小 TTL,由 etcd 集群选择。一旦租约的 TTL 到期,租约就过期并且所有附带的键都将被删除。

授予租约

# 授予租约,TTL为10秒
$ etcdctl lease grant 10
lease 32695410dcc0ca06 granted with TTL(10s)
# 附加键 foo 到租约32695410dcc0ca06
$ etcdctl put --lease=32695410dcc0ca06 foo barOK

应用通过租约 id 可以撤销租约。撤销租约将删除所有它附带的 key。

撤销租约

$ etcdctl lease revoke 32695410dcc0ca06
lease 32695410dcc0ca06 revoked
$ etcdctl get foo
# 空应答,因为租约撤销导致foo被删除

keepAlive续约

应用可以通过刷新键的 TTL 来维持租约,以便租约不过期。维持同一个租约的命令:

$ etcdctl lease keep-alive 32695410dcc0ca06
lease 32695410dcc0ca06 keepalived with TTL(10)
lease 32695410dcc0ca06 keepalived with TTL(10)
lease 32695410dcc0ca06 keepalived with TTL(10)
...

获取租约信息

应用程序可能想知道租约信息,以便可以更新或检查租约是否仍然存在或已过期。应用程序也可能想知道有那些键附加到了特定租约。

获取租约信息的命令:

$ etcdctl lease timetolive 694d5765fc71500b
lease 694d5765fc71500b granted with TTL(500s), remaining(258s)

获取租约信息和租约附带的键的命令:

$ etcdctl lease timetolive --keys 694d5765fc71500b
lease 694d5765fc71500b granted with TTL(500s), remaining(132s), attached keys([zoo2 zoo1])
# 如果租约已经过期或者不存在,它将给出下面的应答:
Error:  etcdserver: requested lease not found

事务

etcd 的事务(Transaction)机制允许你在一个原子操作中执行一系列操作,这些操作要么全部成功,要么全部失败,确保数据的一致性和完整性。

etcdctl txn 并没有对事务提供过多的支持,执行事务最好通过 go 来实现:

go get go.etcd.io/etcd/client/v3
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	// 创建 etcd 客户端
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"http://localhost:2379"}, // 替换为你的 etcd 地址
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// 创建一个 etcd 事务
	etcdTxn := clientv3.NewKV(cli).Txn(context.Background())

	// 定义事务的比较和操作
	etcdTxn.If(
		clientv3.Compare(clientv3.Value("my_key"), "=", "10"),
	).
		Then(
			clientv3.OpPut("my_key", "20"),
		).
		Else(
			clientv3.OpPut("my_key", "30"),
		)

	// 提交事务
	txnResp, err := etcdTxn.Commit()
	if err != nil {
		log.Fatal(err)
	}

	// 检查事务是否成功
	if !txnResp.Succeeded {
		fmt.Println("Transaction failed")
	} else {
		fmt.Println("Transaction succeeded")
	}
}

注意:etcdTxn.If 中指定的条件是 clientv3.Compare(clientv3.Value("my_key"), "=", "10"),也就是检查键 “my_key” 的值是否等于 “10”。如果这个条件不满足(例如,键 “my_key” 的值为空),那么事务的 Else 分支会执行,而且整个事务会被标记为失败。

基于etcd实现分布式锁

原生实现

要在 etcd 中实现分布式锁,你可以利用 etcd 的事务特性和租约(Lease)机制。以下是一个使用 etcd 实现分布式锁的示例:

# 伪代码,etcdctl 并不支持下述命令,需要以 go 方式实现,api 更丰富
etcdctl txn -- \
  put my_lock some_value  --lease=0d7689bc575d6611  \
  if_not_exists

# if_not_exists: 这是一个条件,表示只有在键不存在时才执行上述的 put 操作。这个条件确保只有第一次创建节点的时候才会成功,从而实现锁的获取。

这个命令的目的是在一个 etcd 事务中,尝试创建一个指定键名的键值对,如果该键名在 etcd 中尚不存在(即尝试获取锁),则创建成功,否则操作失败。这个操作模式可以帮助实现分布式锁的基本机制。

Go 代码实现:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	// 创建 etcd 客户端
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"http://localhost:2379"}, // 替换为你的 etcd 地址
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// 创建租约
	leaseResp, err := cli.Grant(context.Background(), 10) // 10 秒的租约时间
	if err != nil {
		log.Fatal(err)
	}

	// 锁的键名
	lockKey := "my_lock"

	// 尝试获取锁
	txnResp, err := cli.Txn(context.Background()).
		If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
		Then(clientv3.OpPut(lockKey, "lock_holder", clientv3.WithLease(leaseResp.ID))).
		Commit()
	if err != nil {
		log.Fatal(err)
	}

	// 检查是否成功获取锁
	if !txnResp.Succeeded {
		log.Println("Failed to acquire lock")
		return
	}

	log.Println("Lock acquired")

	// 续约循环(保持锁)
	keepAliveCh, err := cli.KeepAlive(context.Background(), leaseResp.ID)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		for range keepAliveCh {
			// 续约成功,执行你的逻辑,或者也可以不做任何处理
		}
	}()

	// 在这里执行需要保护的临界区代码

	// 释放锁(取消续约)
	_, err = cli.Revoke(context.Background(), leaseResp.ID)
	if err != nil {
		log.Fatal(err)
	}

	log.Println("Lock released")
}

官方 concurrency 包实现

cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
    log.Fatal(err)
}
defer cli.Close()

// 创建两个单独的会话用来演示锁竞争
s1, err := concurrency.NewSession(cli)
if err != nil {
    log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")

s2, err := concurrency.NewSession(cli)
if err != nil {
    log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")

// 会话s1获取锁
if err := m1.Lock(context.TODO()); err != nil {
    log.Fatal(err)
}
fmt.Println("acquired lock for s1")

m2Locked := make(chan struct{})
go func() {
    defer close(m2Locked)
    // 等待直到会话s1释放了/my-lock/的锁
    if err := m2.Lock(context.TODO()); err != nil {
        log.Fatal(err)
    }
}()

if err := m1.Unlock(context.TODO()); err != nil {
    log.Fatal(err)
}
fmt.Println("released lock for s1")

<-m2Locked
fmt.Println("acquired lock for s2")

通过源码可以发现concurrency包的实现原理为执行一个 key 的前缀,并在最终设置到 etcd key 的尾部拼接上 /lease_id,租约为 60 秒,且每隔 20 秒续租一次。

在 tryAcquire 函数中通过 put 上述的键值,判断版本号,若设置成功则拿到锁,否则阻塞等待锁:
etcd,Golang,etcd,分布式

阻塞期间先通过查询一次 key 是否存在判断是否阻塞,若不存在表示拿到锁,结束阻塞;存在则通过 watch 监听 key 的变更,仅允许DELETE类型,其他变更操作会报错,并做强制解锁。
etcd,Golang,etcd,分布式
etcd,Golang,etcd,分布式

服务注册与发现

当使用 etcd 实现服务注册与发现时,通常需要以下步骤:

  1. 引入 etcd 的 Go 客户端库
  2. 连接到 etcd 服务器
  3. 注册服务:将服务的信息写入 etcd 中
  4. 发现服务:从 etcd 中获取已注册的服务信息

以下是一个简单示例,演示如何使用 Go 语言操作 etcd 实现基本的服务注册与发现功能。请确保已经安装了 etcd 并启动了 etcd 服务器。

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	// 创建 etcd 客户端
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379"}, // etcd 服务器地址
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// 注册服务
	serviceName := "my-service"
	serviceIP := "192.168.1.100"
	servicePort := "8080"

	serviceKey := fmt.Sprintf("/services/%s/%s:%s", serviceName, serviceIP, servicePort)
	serviceValue := "some-metadata-about-the-service"

	ctx := context.Background()
	_, err = cli.Put(ctx, serviceKey, serviceValue)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Service registered: %s\n", serviceKey)

	// 发现服务
	discoveryKey := fmt.Sprintf("/services/%s", serviceName)
	resp, err := cli.Get(ctx, discoveryKey, clientv3.WithPrefix())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("Discovered services:")
	for _, kv := range resp.Kvs {
		fmt.Printf("Key: %s, Value: %s\n", kv.Key, kv.Value)
	}
}

可以结合 watch 机制优化更新服务变更。

Go 操作 Etcd 参考

go get go.etcd.io/etcd/client/v3
  • 民间文档:http://www.topgoer.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E6%93%8D%E4%BD%9C/go%E6%93%8D%E4%BD%9Cetcd/%E6%93%8D%E4%BD%9Cetcd.html

  • 官方文档:https://github.com/etcd-io/etcd/blob/main/client/v3/README.md文章来源地址https://www.toymoban.com/news/detail-635500.html

到了这里,关于etcd的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • k8s mysql集群 & 分布式锁 & apiserver & etcd 的关系

    在 Kubernetes (k8s) 中,MySQL 集群可以使用分布式锁来确保在多个实例之间对共享资源的互斥访问。这是通过结合 Kubernetes API Server 和 etcd 来实现的。 Kubernetes API Server 是 k8s 集群中的核心组件之一,它充当了集群的控制平面,提供了对集群资源的管理和操作接口。API Server 是一个

    2024年02月07日
    浏览(52)
  • golang分布式中间件之kafka

    Kafka是一个分布式发布-订阅消息系统,由LinkedIn公司开发。它被设计为快速、可靠且具有高吞吐量的数据流平台,旨在处理大量的实时数据。Kafka的架构是基于发布-订阅模型构建的,可以支持多个生产者和消费者。 在本文中,我们将讨论如何使用Go语言来实现Kafka分布式中间件

    2024年02月07日
    浏览(55)
  • Golang链路追踪:实现高效可靠的分布式系统监控

    在当今互联网应用的架构中,分布式系统已经成为主流。分布式系统的优势在于能够提供高可用性、高并发性和可扩展性。然而,随着系统规模和复杂性的增加,系统的监控和调试变得越来越困难。为了解决这个问题,链路追踪技术应运而生。 本文将介绍链路追踪的概念和原

    2024年02月08日
    浏览(42)
  • 在CSDN学Golang分布式中间件(ElasticSearch)

    倒排索引是一种用于快速查找文本中特定单词或短语的数据结构。它将文本中的每个单词或短语与包含该单词或短语的文档列表相关联。这使得可以轻松地查找包含给定单词或短语的所有文档。 在 Go 中,可以使用 map 和 slice 来实现倒排索引。具体来说,可以使用一个 map 将每

    2024年02月15日
    浏览(47)
  • Golang实现Redis分布式锁解决秒杀问题

    先写一个脚本sql,插入2000个用户 登录是通过2个字段,一个是mobile,一个是password,生成了mobile从1到2000,密码默认是123456 然后写一个单元测试,实现新注册的2000个用户登录,然后获取token 我们使用有缓冲的通道和sync.WaitGroup信号量,来控制协程的数量,经过测试,发现limi

    2024年02月14日
    浏览(42)
  • 在CSDN学Golang场景化解决方案(EFK分布式日志系统方案)

    在 Golang EFK 分布式日志系统方案中,ElasticSearch 是一个分布式搜索引擎和数据存储库,它可以用于存储和搜索大量的日志数据。以下是 ElasticSearch 分布式集群部署的步骤: 下载 ElasticSearch:从 ElasticSearch 官网上下载最新版本的 ElasticSearch。 解压缩并安装 ElasticSearch:将下载下来

    2024年02月14日
    浏览(44)
  • 【golang项目-GeeCache】动手写分布式缓存 day1 - 实现LRU算法

    【go项目-geecache】动手写分布式缓存 - day1 - 实现LRU算法 【go项目-geecache】动手写分布式缓存 - day2 - 单机并发缓存 【go项目-geecache】动手写分布式缓存 - day3 - HTTP 服务端 【go项目-geecache】动手写分布式缓存 - day4 - 一致性哈希(hash) 【go项目-geecache】动手写分布式缓存 - day5 - 分布

    2023年04月20日
    浏览(39)
  • 【分布式】分布式锁

    单机多线程: 在 Java 中,我们通常使用 ReetrantLock 类、synchronized 这类 本地锁 来控制一个 JVM 进程内的多个线程对本地共享资源的访问 分布式系统: 不同的服务/客户端通常运行在独立的 JVM 进程上。如果 多个 JVM 进程共享同一份资源 的话,使用本地锁就没办法实现资

    2024年02月14日
    浏览(50)
  • 分布式系统中的分布式链路追踪与分布式调用链路

    本文分享自天翼云开发者社区《分布式系统中的分布式链路追踪与分布式调用链路》,作者:c****w 在分布式系统中,由于服务间的调用关系复杂,需要实现分布式链路追踪来跟踪请求在各个服务中的调用路径和时间消耗。这对问题排查和性能监控都很重要。 常用的分布式链

    2024年01月19日
    浏览(59)
  • 【分布式】分布式ID

    分布式场景下,一张表可能分散到多个数据结点上。因此需要一些分布式ID的解决方案。 分布式ID需要有几个特点: 全局唯一(必要) :在多个库的主键放在一起也不会重复 有序(必要) :避免频繁触发索引重建 信息安全 :ID连续,可以根据订单编号计算一天的单量,造成

    2024年02月07日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包