zookeeper的介绍和用docker搭建zookeeper集群,以及Go语言使用zookeeper

这篇具有很好参考价值的文章主要介绍了zookeeper的介绍和用docker搭建zookeeper集群,以及Go语言使用zookeeper。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


typora-copy-images-to: imgs


Zookeeper的使用

1、Zookeeper简介

docker zookeeper,docker,golang,zookeeper

 Apache ZooKeeper 是 Apache 软件基金会的一个软件项目,为大型分布式系统提供开源分布式配置服务、同步服务和命名注册。ZooKeeper原本是Hadoop的一个子项目,但现在它本身已经是一个顶级项目了。

 zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务。 

2、使用Docker快速部署zookeeper

2.1、Docker官方镜像

Docker Zookeeper

2.2、Docker安装zookeeper

下载zookeeper最新版的镜像

docker search zookeeper 
docker pull zookeeper
docker images
docker inspect zookeeper 

docker inspect zookeeper用来查看zookeeper的详细信息

在/root/docker/目录下新建一个zookeeper挂载点文件夹

mkdir /root/docker/zookeeper

挂载本地文件夹并启动服务

docker run -e TZ="Asia/Shanghai" -d -p 2181:2181 -v /root/docker/zookeeper:/data --name zookeeper --restart always zookeeper

参数解释

-e TZ="Asia/Shanghai" # 指定上海时区
-d # 指示后台运行容器
-p 2181:2181 # 端口映射,前面的端口为本地的2181端口,后者为容器内的端口
--name # 设置创建的容器的名称
-v # 挂在文件,将本地目录或文件挂在到容器指定目录
--restart always # 始终重新启动zookeeper

2.3、进入zookeeper容器客户端

方式一
docker run -it --rm --link zookeeper:zookeeper zookeeper zkCli.sh -server zookeeper       

运行上诉命令后会进入到zkCli

docker zookeeper,docker,golang,zookeeper

方式二

前台进入zookeeper容器执行脚本新建一个Client

 docker exec -it zookeeper bash   //进入zookeeper容器,退出时不会关闭容器
 ./bin/zkCli.sh		//执行脚本新建一个Client    

3、docker构建zookeeper集群

3.1、创建docker-compose.yml文件

cd /root/docker/docker-compose/zookeeper # 进入你想要存放docker-compose.yml的文件
vim docker-compose.yml		# 把下面的代码复制到docker-compose.yml文件中


version: 'latest'
services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    container_name: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo1/data:/data
      - /root/docker/docker-compose/zoo1/datalog:/datalog

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo2/data:/data
      - /root/docker/docker-compose/zoo2/datalog:/datalog

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo3/data:/data
      - /root/docker/docker-compose/zoo3/datalog:/datalog

volumes表示的是文件映射要根据自己的情况进行修改

3.2、执行构建集群命令

3.2.1、安装Docker Compose(Linux下)

我们先要搭建以下docker-compose的环境,执行以下命令下载Docker Compose,要更改版本的话替换v2.2.2就好

sudo curl -L "https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

赋予可执行权限

sudo chmod +x /usr/local/bin/docker-compose

创建软链接

sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
3.2.2、启动docker-compose.yml文件

在执行命令之前要保证2181-2183端口没有被占用

cd /root/docker/docker-compose/zookeeper  # 到docker-compose.yml所在的目录
docker-compose up -d
3.2.3、验证集群是否搭建成功
docker exec -it zoo1 bash
zkServer.sh status

看到如下信息表示集群已经搭建成功,可以看出zoo1的角色是follower,查看其他的会发现zoo2是follower,zoo3是leader。

docker zookeeper,docker,golang,zookeeper

4、Zookeeper的简单使用

3.1、Go语言与Zookeeper服务端建立连接

3.1.1、建立连接的代码
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)
//打印节点状态信息的函数
func StatePrintf(state *zk.Stat) {
	fmt.Println("State->")
	fmt.Printf("Czxid = %v,\nMzxid = %v,\nCtime = %v,\nMtime = %v,\nVersion = %v,\nCversion = %v,\nAversion = %v,\nEphemeralOwner = %v,\nDataLength = %v,\nNumChildren = %v,\nPzxid = %v,\n",
		state.Czxid,          //创建该节点的zxid
		state.Mzxid,          //最后一个修改该节点的zxid
		state.Ctime,          //创建该节点的时间
		state.Mtime,          //最后一次修改该节点的时间
		state.Version,        //修改该节点数据的次数
		state.Cversion,       //修改该节点儿子节点的次数
		state.Aversion,       //修改ACL的次数
		state.EphemeralOwner, //创建该临时节点的会话id
		state.DataLength,     //节点数据的长度
		state.NumChildren,    //该节点的儿子节点的数量
		state.Pzxid,          //最后一个修改的儿子节点的zxid(当创建或者删除子节点时才会改变)
	)
}
//输出错误信息的函数
func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	CreateNode(conn)
	SetTest(conn)
	GetTest(conn)
	DeleteTest(conn)
    //休眠5分钟再关闭,以便看到创建的临时节点
	time.Sleep(5 * time.Minute)
	defer conn.Close()
}
3.1.2、关于zxid的一些解释
Zookeeper中的**zxid**是一个长为64位的数字。高32位用来表示当前Leader的周期,低32位用来表示当前请求产生的事务在当前Leader周期内的顺序。每产生一个新的事务,zxid的低32位就会自动加1。当zxid达到最大值,即zxid的低32位达到`0xffffffff`,就会触发集群强制选主,Leader变更后高32位都会自增1,并重置zxid低32位的计数值(zxid高32位变为新Leader的周期,低32位变为0)。

如果一个zookeeper集群每秒能操作10000次,即10k/s ops,那么
	2^32/(86400*10000)≈4.97天

也就是说4.97天之后就会进行自动切主的操作,对于一些服务来说平均五天切一次主是难以容许的,我们可以重新设计zxid,增加低位技术的位数到自己需要的值,假设64位全部用做低位计数。

	2^64/(86400*10000)≈21350398233.46天,即58494241.73年

一般来说集群不可能可靠的运行这么多年,所以重新设计zxid还是要根据业务需求来进行。

3.2、创建节点

3.2.1、zkCli操作
create /节点路径 value  # 可以在创建节点的同时设置节点的值,创建的节点是持久化的节点
create -e /节点路径 value  # 创建临时节点,在客户端断开后会自动删除的节点
create -s /节点路径 value  # 创建顺序节点,zookeeper会自动在节点路径后面加顺序递增的编号

直接创建节点

docker zookeeper,docker,golang,zookeeper

临时节点,顺序节点,quit退出之后再进入刚创建的temp节点会消失。

docker zookeeper,docker,golang,zookeeper

3.2.2、Go语言API操作
// 创建节点
func CreateNode(conn *zk.Conn) {
	//创建永久节点
	path, err := conn.Create("/app3", []byte("zhangsan"), 0, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create node", err)
	fmt.Printf("Created node path[%v]\n", path)

	//创建临时节点,在会话结束时会自动删除临时节点
	ephemeral, err := conn.Create("/ephemeral", nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create ephemeral node", err)
	fmt.Printf("Created ephemeral node path[%v]\n", ephemeral)

	//创建顺序节点
	sequence, err := conn.Create("/sequence", nil, zk.FlagSequence, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create sequence node", err)
	fmt.Printf("Created sequence node path[%v]\n", sequence)

	//创建临时顺序节点 create -es /ephemeralsequece
	ephemeralsequece, err := conn.Create("/ephemeralsequece", nil, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create ephemeralsequece node", err)
	fmt.Printf("Created ephemeralsequece node path[%v]\n", ephemeralsequece)
}

3.3、修改节点

3.3.1、zkCli操作
set /节点路径 value

示例如下

docker zookeeper,docker,golang,zookeeper

3.3.2、Go语言API操作
// Set操作
func SetTest(conn *zk.Conn) {
	//获取节点的version信息
	_, state, _ := conn.Get("/app3")

	//set /app3 lisi
	state, err := conn.Set("/app3", []byte("lisi"), state.Version)
	FailOnError("Failed to Set Value", err)
	StatePrintf(state)

	//获取修改后的值
	value, _, err := conn.Get("/app3")
	FailOnError("Failed to Get New value", err)
	fmt.Println("New Value = ", value)
}

3.4、查询节点

3.4.1、zkCli操作
ls 目录   # 查看目录下的所有子节点 
get /节点路径
ls -s 目录   # 查看目录的所有详细信息

示例如下

docker zookeeper,docker,golang,zookeeper

docker zookeeper,docker,golang,zookeeper

3.4.2、Go语言API操作
// 查询节点
func GetTest(conn *zk.Conn) {
	result, state, err := conn.Get("/app3")
	//获取子节点
	//children,state,err:=conn.Children("/app3")
	FailOnError("Failed to Get Node Info", err)
	fmt.Printf("result:[%v]\n", string(result))
	StatePrintf(state)
}

3.5、删除节点

3.5.1、zkCli操作
delete /节点路径   # 删除单个节点
deleteall /节点路径 	# 删除带有子节点的节点

示例如下

docker zookeeper,docker,golang,zookeeper

3.5.2、Go语言API操作
// 删除节点
func DeleteTest(conn *zk.Conn) {
	path := "/app3"
	//先判断节点存不存在
	exists, state, _ := conn.Exists(path)
	fmt.Printf("path[%s] exists:%v\n", path, exists)
	//删除节点
	err := conn.Delete("/app3", state.Version)
	FailOnError("Failed to Delete node", err)
	fmt.Printf("path[%s] is deleted.", path)

	exists, _, _ = conn.Exists(path)
	fmt.Printf("path[%s] exists: %v\n", path, exists)
}

5、go-zookeeper权限(ACL)

zookeeper的节点有五种权限:Create、Read、Write、Delete、Admin。

ACL权限由schema:id:permissions组成

schema有四种方式

  • world
  • auth
  • digest
  • ip

下面对这四种方式都测试一遍

4.1、world

默认方式,相当于全世界都能访问。

/app3节点的权限修改为 crwa 后尝试删除其子节点 /p1

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/app3节点的acl信息
	acl, state, err := conn.GetACL("/app3")
	FailOnError("Failed to GetACL", err)
	fmt.Println("\nget acl:")
	fmt.Println("scheme =", acl[0].Scheme)
	fmt.Println("id =", acl[0].ID)
	fmt.Println("permissions =", acl[0].Perms)

	//修改/app3节点的权限修改为crwa
	perms := zk.PermCreate | zk.PermRead | zk.PermWrite | zk.PermAdmin
	_, err = conn.SetACL("/app3", zk.WorldACL(int32(perms)), state.Aversion)
	FailOnError("Failed to SetACL", err)
	fmt.Println("SetAcl successful.")

	//create child node
	_, err = conn.Create("/app3/p1", nil, 0, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create node", err)

	//get state of child node
	_, state, err = conn.Get("/app3/p1")
	FailOnError("Failed to Get node info", err)

	//delete /app3/p1
	err = conn.Delete("/app3/p1", state.Version)
	FailOnError("Failed to Delete Node", err)
}

测试结果如下:因为我们没有赋予/app3节点Delete权限,即使子结点/p1赋予了全部权限也不能删除该子节点。

docker zookeeper,docker,golang,zookeeper

4.2、auth

auth 用来授予用户权限,所以需要先创建用户。

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/auth节点的状态信息
	_, state, err := conn.Get("/auth")
	FailOnError("Failed to Get node info", err)

	//用户授权,用户不存在的话会新建
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to AddAuth", err)

	acl := zk.ACL{
		Scheme: "auth",
		Perms:  zk.PermAll,
		ID:     "user1:123456",
	}

	//为用户授权
	_, err = conn.SetACL("/auth", []zk.ACL{acl}, state.Version)
	FailOnError("Failed to SetACL", err)
	fmt.Println("AddAuthSuccess")
}

docker zookeeper,docker,golang,zookeeper

授权成功之后如果在其他连接中要查询节点信息要先验证用户信息才能进入下一步操作,也就是把conn.AddAuth操作提前,如果使用不正确的用户名和密码,得到的会是同样的用户认证失败的结果。

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//先进行访问/auth节点的话会报错
	_, _, err = conn.Get("/auth")
	if err != nil {
		fmt.Println("Get Node info error:", err)
	}

	//要先进行AddAuth操作
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to Add Auth", err)

	//再获取/auth节点的信息就不会报错了
	acl, _, err := conn.GetACL("/auth")
	FailOnError("Failed to Get node info", err)
	fmt.Println("acl 信息:")
	for i := 0; i < len(acl); i++ {
		fmt.Println("scheme =", acl[0].Scheme)
		fmt.Println("id =", acl[0].ID)
		fmt.Println("permissions =", acl[0].Perms)
	}
}

从测试结果来看在未进行AddAuth操作时我们是获取不到/auth节点信息的,节点的密码返回的是加密后的密码。

docker zookeeper,docker,golang,zookeeper

4.3、digest

digestauth基本相同,唯一的区别在于设置权限时,密码需要使用密文。

zk golang 库中有专为digest构造的方法:

zk.DigestACL(perms int32, user, password string)

此方法传入的密码需要是明文,其内部逻辑会将明文转为密文再向 zookeeper 传递。

使用示例:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/digest
	_, state, err := conn.Get("/digest")
	FailOnError("Failed to Get node info", err)

	//用户授权,用户不存在的话会新建
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to AddAuth", err)

	//zk.DigestACL会将传入的明文转换成密文acl
	acl := zk.DigestACL(zk.PermAll, "user1", "123456")

	//为用户授权
	_, err = conn.SetACL("/digest", acl, state.Version)
	FailOnError("Failed to SetACL", err)
	fmt.Println("节点[/digest]已对用户 user1 授权")
}

4.4、ip

ip 权限顾名思义,就是限制 ip 地址的访问权限。

把节点的权限设置给指定的 ip 地址后,其他 ip 将无法访问该节点。

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()
	//获取/ip节点的状态信息
	_, state, err := conn.Get("/ip")
	FailOnError("Failed to Get node info", err)

	acl := zk.ACL{
		Scheme: "ip",
		Perms:  zk.PermAll,
		ID:     "192.168.17.1",
	}

	//为用户授权
	_, err = conn.SetACL("/ip", []zk.ACL{acl}, state.Aversion)
	FailOnError("Failed to SetACL", err)
	fmt.Println("节点[/ip]已对用户 192.168.17.1 授权")

	//获取以下节点的acl权限
	acls, _, err := conn.GetACL("/ip")
	FailOnError("Failed to Get node info", err)
	fmt.Println("acl 信息:")
	for i := 0; i < len(acls); i++ {
		fmt.Println("scheme =", acls[0].Scheme)
		fmt.Println("id =", acls[0].ID)
		fmt.Println("permissions =", acls[0].Perms)
	}
}

这里我用的是VMware的虚拟机的zookeeper然后用本地Windows去连接zookeeper发送的消息这个过程会经过虚拟路由转发,所以这里授权的ip地址是VMware虚拟网卡的地址,不然的话会报没有权限的错误。

docker zookeeper,docker,golang,zookeeper

docker zookeeper,docker,golang,zookeeper

6、watch机制

5.1、watch的事件类型

watch 用来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。

每个 watch 仅有一次触发的机会,一旦触发会立即失效,想要持续监听,就需要一直注册。

go-zookeeper监听的事件类型分为五种:

  • zk.EventNodeCreated: 节点创建事件,需要watch一个不存在的节点,当节点被创建时触发,此watch通过conn.ExistsW(path string)设置
  • zk. EventNodeDeleted :节点删除事件,需要watch一个已存在的节点,当节点被移除时触发,此watch通过conn.ExistsW(path string)设置
  • zk. EventNodeDataChanged: 节点数据变化事件,此watch通过conn.GetW(path string) 以及 conn.ExistsW(path string) 设置
  • zk. EventNodeChildrenChanged : 子节点改变事件(数量改变),此watch通过conn.ChildrenW(path string)设置, 当path 下面增删子节点时触发(修改path下的子节点的内容时,不会触发通知)。
  • zk.EventNoWatching: watch移除事件,服务端出于某些原因不再为客户端watch节点时触发。

5.2、监听的方式

方式一、全局监听

全局监听的方式会在有监听事件发生时会执行监听器的回调函数

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

func main() {
	callbackOption := zk.WithEventCallback(callback)
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5, callbackOption)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	//注册一个监听事件
	exists, state, _, err := conn.ExistsW("/global")
	if err != nil {
		log.Println(err)
	}
	//如果节点不存在则创建
	if !exists {
		//创建一个临时的global节点
		_, err = conn.Create("/global", []byte("globaltest"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
		if err != nil {
			log.Println(err)
		}
		//在注册一个监听事件监听/global节点的删除
		_, state, _, err = conn.ExistsW("/global")
		if err != nil {
			log.Println(err)
		}
	}
	err = conn.Delete("/global", state.Version)
	if err != nil {
		log.Println(err)
	}
	defer conn.Close()
}

// 监听事件的回调函数
func callback(event zk.Event) {
	fmt.Println("###########################")
	fmt.Println("path: ", event.Path)
	fmt.Println("type: ", event.Type.String())
	fmt.Println("state: ", event.State.String())
	fmt.Println("---------------------------")
}

docker zookeeper,docker,golang,zookeeper

测试结果

docker zookeeper,docker,golang,zookeeper

方式二、局部监听
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

// 输出错误信息的函数
func FailOnError(msg string, err error) {
	if err != nil {
		log.Printf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	//先等待连接上再启动监听携程
	time.Sleep(5 * time.Second)
	go watchNodeCreated("/partial", conn)
	go watchNodeDataChanged("/partial", conn)
	go watchNodeChildrenChanged("/partial", conn)
	go watchNodeDeleted("/partial", conn)
	defer conn.Close()
	//等待操作结束
	time.Sleep(1 * time.Hour)
}

// 监听节点的创建事件
func watchNodeCreated(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Create\n", path)
	for {
		//利用channel通信机制将Event数据传递给ch
		//ch:=make(chan Event)
		_, _, ch, err := conn.ExistsW(path)
		//当err为nil时ch才会有数据否者会阻塞协程
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeCreated {
				log.Printf("Node[%v] Created\n", path)
			}
		} else {
			FailOnError("Failed to watchNodeCreated", err)
		}
	}
}

// 监听节点数据修改事件
func watchNodeDataChanged(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Data Change\n", path)
	for {
		_, _, ch, err := conn.GetW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeDataChanged {
				log.Printf("Node[%v] Data Changed", path)
			}
		}
	}
}

// 监听节点子节点的修改事件
func watchNodeChildrenChanged(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Children Change", path)
	for {
		_, _, ch, err := conn.ChildrenW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeChildrenChanged {
				log.Printf("Node[%v] Children Changed", path)
			}
		}
	}
}

// 监听节点的删除事件
func watchNodeDeleted(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Delete", path)
	for {
		_, _, ch, err := conn.ExistsW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeDeleted {
				log.Printf("Node[%v] Deleted", path)
			}
		} else {
			FailOnError("Failed to watchNodeDeleted", err)
		}
	}
}

启动程序之后在虚拟机上运行客户端程序执行以下命令

docker zookeeper,docker,golang,zookeeper

程序会输出以下结果

docker zookeeper,docker,golang,zookeeper

7、go-zookeeper实现分布式锁

zookeeper的分布式锁可以利用每个节点的唯一性来完成,但所有服务监听一个节点对于分布式系统来说完全是资源浪费。而zookeeper可以利用临时顺序节点来创建一个有序的临时节点列表来完成分布式锁:
  1. 客户端获取锁时,在lock节点下创建临时顺序节点。
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁之后,将该节点删除。
  3. 如果发现自己创建的子节点并非所有子节点中最小的,说明自己没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
  4. 如果发现比自己小的那个节点删除,则客户端的Watcher会收到相应的同支,此时再次判断自己创建的节点是否时lock子节点中序号最小的,如果是则获取到锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

假如服务 A 创建了节点 a,此时节点 a 的前面没有节点,所以服务 A 可以执行。此时服务 B 创建了节点 b,节点 b 是节点 a 的下一个节点,那么服务 B 只需要监听节点 a 即可。

也就是说,因为临时有序节点列表是有序的,所以每个服务只需要监听自己创建的节点的前一个节点即可。

我们利用golang的goroutine来模拟客户端实现分布式锁的过程,以下是50个goroutine进行抢锁的示例:
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	var wg sync.WaitGroup

	for i := 0; i < 50; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			//新建一个锁
			lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll))
			//加锁
			err = lock.LockWithData([]byte("it is a lock"))
			if err != nil {
				panic(err)
			}
			fmt.Println("第", n, "个 goroutine 获取到了锁")
			time.Sleep(time.Second) // 1 秒后释放锁
			//解锁
			lock.Unlock()
		}(i)
	}
	//等待协程运行结束
	wg.Wait()
}

运行结果如下(只截取了一部分)

docker zookeeper,docker,golang,zookeeper文章来源地址https://www.toymoban.com/news/detail-690759.html

到了这里,关于zookeeper的介绍和用docker搭建zookeeper集群,以及Go语言使用zookeeper的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 云计算集群搭建记录[Hadoop|Zookeeper|Hbase|Spark | Docker |OpenStack]更新索引 |动态更新

    为了能够更好的查看所更新的文章,讲该博文设为索引 为了解决在编辑文件等操作的过程中的权限问题,博主一律 默认采用 root 账户登录 对于初次安装的用户可以采用如下命令行: 另外推荐一款终端工具:Tabby,既能够连接自己的服务器,也能够连接自己本地的虚拟机,还

    2023年04月13日
    浏览(56)
  • Go语言网络编程介绍以及案例运用

    1. 基本概念 TCP 和 UDP : Go语言支持TCP(传输控制协议)和UDP(用户数据报协议)。TCP提供可靠的、面向连接的通信,而UDP提供无连接的快速数据传输。 并发 : Go语言的并发模型是通过goroutines实现的。每个网络请求都可以在自己的goroutine中处理,实现高效的并发。 Channels : 用于

    2024年01月25日
    浏览(71)
  • docker集群的详解以及超详细搭建

    在我们日常学习或开发过程中,如果我们的服务均采用docker容器的方式运行,比如提供后端接口服务的容器 containerA 和提供数据存取服务的容器 containerB ,如下图所示,不同的docker容器拥有各自的ip地址和端口号。 本文假定一台主机只有一个docker进程。在这种情况下,容器

    2024年01月16日
    浏览(38)
  • 【RabbitMQ】RabbitMQ 集群的搭建 —— 基于 Docker 搭建 RabbitMQ 的普通集群,镜像集群以及仲裁队列

    在RabbitMQ中,有不同的集群模式,包括普通模式、镜像模式和仲裁队列。每种模式具有不同的特点和应用场景。 普通集群,也称为标准集群(classic cluster),具备以下特征: 在集群的各个节点之间共享部分数据,包括交换机和队列的元信息,但不包括队列中的消息。 当访问

    2024年02月04日
    浏览(58)
  • Go语言中入门Hello World以及IDE介绍

    您可以阅读Golang教程第1部分:Go语言介绍与安装 来了解什么是golang以及如何安装golang。 Go语言已经安装好了,当你开始学习Go语言时,编写一个\\\"Hello, World!\\\"程序是一个很好的入门点。 下面将会提供了一些有关IDE和在线编辑器的信息,和如何使用Go语言编写并运行一个简单的

    2024年02月07日
    浏览(54)
  • 分布式集群——搭建Hadoop环境以及相关的Hadoop介绍

    分布式集群——jdk配置与zookeeper环境搭建 分布式集群——搭建Hadoop环境以及相关的Hadoop介绍 文章目录 前言 一 hadoop的相关概念 1.1 Hadoop概念 补充:块的存储 1.2 HDFS是什么 1.3 三种节点的功能 I、NameNode节点 II、fsimage与edits文件存放的内容介绍 III、DataNode节点 IV、SecondaryNameNod

    2024年02月10日
    浏览(55)
  • Linux-一篇文章,速通Hadoop集群之伪分布式,完全分布式,高可用搭建(附zookeeper,jdk介绍与安装)。

    文章较长,附目录,此次安装是在VM虚拟环境下进行。文章第一节主要是介绍Hadoop与Hadoop生态圈并了解Hadoop三种集群的区别,第二节和大家一起下载,配置Linux三种集群以及大数据相关所需的jdk,zookeeper,只需安装配置的朋友可以直接跳到文章第二节。同时,希望我的文章能帮

    2024年03月19日
    浏览(53)
  • 【go语言开发】go项目打包成Docker镜像,包括Dockerfile命令介绍、goctl工具生成

    本文主要介绍如何将go项目打包成镜像,首先介绍Dockerfile常用命令介绍,然后介绍使用工具goctl用于生成Dockerfile,还可以根据需求自定义指令内容,最后讲解如何将go-blog项目打包成镜像,以及如何运行等 参考文档: docker日常使用,编写dockerfile等 dockerfile编写 开发完项目之后

    2024年01月20日
    浏览(51)
  • Zookeeper集群搭建记录 | 云计算[CentOS7] | Zookeeper集群搭建

    本系列文章索引以及一些默认好的条件在 传送门 在配置Zookeeper之前,建议先配置Hadoop集群,具体的操作流程博主已更新完成,链接 Zookeeper的安装包版本不太相同,大致分为有编译过的和没有编译过的(如有错请留言指正 一般情况下对于我们在解压配置就能使用的情况下,我

    2024年02月01日
    浏览(67)
  • Zookeeper 和 Kafka 工作原理及如何搭建 Zookeeper集群 + Kafka集群

    目录 1 Zookeeper 1.1 Zookeeper 定义 1.2 Zookeeper 工作机制 1.3 Zookeeper 特点 1.4 Zookeeper 数据结构 1.5 Zookeeper 应用场景 1.6 Zookeeper 选举机制 2 部署 Zookeeper 集群 2.1 安装前准备 2.2 安装 Zookeeper 3 Kafka 3.1 为什么需要消息队列(MQ) 3.2 使用消息队列的好处 3.3 消息队列的两种模式 3.4 Kafka 定义

    2024年02月08日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包