前言
在单机模式下,相对简单,认为所有的客户都在同一台服务器上,姑且认为1台服务器可以同时支持1万用户在线,在更多用户同时在线时,则需要集群来实现负载均衡。
集群的算法需要解决故障处理以及动态添加的问题,同时需要考虑如何在集群节点间路由数据。
1. 负载均衡算法
这里使用一致性哈希环来实现。
添加节点
每个设备都有唯一的名字(key),同时按照重复因子的个数添加到环中;每个名字以及重复的序号做hash算法得到一个哈希数,所以环上每个节点都有一个数值;
查询
在查询时候,提供了某个字符串,计算哈希,如果哈希比某个环上节点的数值大,就选这个节点,如果都找不到,就选第一个,所以称之为环而不是列表;
故障移除
当某个服务器故障退出时候,需要从环上删除节点,那么此服务节点上的用户再次登录时,会选到相邻的服务节点上;而正常服务节点上的用户不会受到影响;
1.1 redis服务发现
一般GO服务发现使用etcd比较多,但是为了少安装几个服务,可以考虑使用redis实现;
注册服务
每个服务上线时候,都在“IMSERVER"键值中设置节点名字的field和value,value中包含更新时间;
每个服务定期更新field中的活动时间戳,证明工作正常;
轮询可以根据更新时间确定哪些节点存活,如果某些节点失效了,需要重新计算哈希值;
如果为了使用超时机制来实现自动检测,那么可以再额外添加了一个键值,名字就是服务的节点名最后后缀,这样当某个服务节点的键值超时就可以即时发现;
1.2 etcd服务发现
在一个分布式系统中,服务注册与发现是一个关键的组件,用于在集群中动态地注册服务和发现其他服务。Etcd 提供了一种方便的方式来实现服务注册与发现,通常的做法包括以下几个步骤:
-
服务注册:
- 当一个服务启动时,它会将自己的地址信息(如 IP 地址和端口号)以及其他相关信息注册到 Etcd 中。
- 通常情况下,服务会将自己注册为一个可识别的名称,比如服务的名称、版本号等,以便其他服务能够识别和访问。
-
服务发现:
- 当一个服务需要与其他服务通信时,它会向 Etcd 发送一个服务发现的请求,查询特定服务名称对应的地址信息。
- Etcd 将返回一个或多个注册了该名称的服务的地址信息,然后服务可以使用这些信息来进行通信。
在 Go 中,可以使用 Etcd 的 Go 客户端库(如 etcd/clientv3
)来实现服务注册与发现。以下是一个简单的示例,演示了如何使用 Etcd 实现服务注册与发现:
package main
import (
"context"
"fmt"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 连接 Etcd 客户端
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://localhost:2379"}, // Etcd 服务地址
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 注册服务
if err := registerService(cli, "service1", "127.0.0.1:8080"); err != nil {
log.Fatal(err)
}
// 发现服务
addrs, err := discoverService(cli, "service1")
if err != nil {
log.Fatal(err)
}
fmt.Println("Service1 addresses:", addrs)
}
// 注册服务
func registerService(cli *clientv3.Client, serviceName, serviceAddr string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 将服务地址信息注册到 Etcd 中
_, err := cli.Put(ctx, "/services/"+serviceName+"/"+serviceAddr, serviceAddr)
return err
}
// 发现服务
func discoverService(cli *clientv3.Client, serviceName string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 查询特定服务名称对应的所有地址信息
resp, err := cli.Get(ctx, "/services/"+serviceName+"/", clientv3.WithPrefix())
if err != nil {
return nil, err
}
// 解析地址信息
var addrs []string
for _, kv := range resp.Kvs {
addrs = append(addrs, string(kv.Value))
}
return addrs, nil
}
1.2 重复因子
添加重复因子是为了增加哈希环上节点的数量,从而更好地实现负载均衡。在哈希环中,服务器节点的数量越多,分布越均匀,可以提高负载均衡的效果。如果只有少量的服务器节点,哈希环上的节点分布可能不够均匀,导致负载分配不平衡。
通过添加重复因子,可以让每个服务器节点在哈希环上出现多次,增加了节点的数量,从而更好地平衡了负载。例如,如果只有3台服务器,但设置了重复因子为10,则在哈希环上每个服务器节点会出现10次,相当于虚拟出了30个节点,增加了负载均衡的可能性。
在服务器数量为3到10台之间,可以考虑将重复因子设置为3或4。这样可以在哈希环上为每个服务器节点创建3到4个虚拟节点,以增加哈希环上节点的数量,提高负载均衡的效果。
例如,如果有5台服务器,设置重复因子为3,则在哈希环上会创建15个虚拟节点,增加了节点的数量,有助于更均匀地分配负载。而如果服务器数量较少,设置更大的重复因子可能会导致哈希环上节点过多,增加计算开销;而设置更小的重复因子可能会降低负载均衡的效果。
综上所述,设置重复因子为3或4可能是一个合适的选择,可以在保证负载均衡的同时,避免哈希环节点数量过多带来的性能损耗。然而,具体的选择还应根据实际情况和性能要求进行调整和评估。
2. redis检测故障实现
可以设计一个基于定时任务的机制来实现:
-
服务节点定时更新超时时间:每个服务节点定期(比如每隔 5 秒)更新自己的超时时间,将当前时间加上超时阈值,并将新的超时时间写入 Redis 中。这样可以确保每个服务节点的超时时间保持最新。
-
其他节点定时轮询超时情况:其他节点定期(比如每隔 10 秒)轮询所有节点的超时情况。对于每个节点,获取其超时时间,如果发现某个节点超时了,就触发重新计算哈希环的操作。
-
重新计算哈希环:一旦发现某个节点超时,触发重新计算哈希环的操作。这意味着需要重新收集所有服务节点的信息,并更新哈希环,确保哈希环中的每个节点都是当前活动的节点。
-
定时任务实现:使用 Go 中的定时任务库(如
time.Timer
或time.Tick
)来实现定时任务。每个节点维护一个定时器,在定时器触发时执行相应的操作(更新超时时间或者轮询超时情况)。
2.1 轮询查询
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 设置 Redis 密码
DB: 0, // 使用默认数据库
})
// 模拟每个服务节点更新超时时间的定时任务
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 更新超时时间
updateTimeout(rdb)
}
}
}()
// 模拟其他节点轮询超时情况的定时任务
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 轮询超时情况并重新计算哈希环
checkTimeoutAndRebalance(rdb)
}
}
}()
// 主程序保持运行
select {}
}
func updateTimeout(rdb *redis.Client) {
// 获取当前时间
now := time.Now()
// 更新超时时间
rdb.Set(context.Background(), "timeout", now.Add(5*time.Second).Format(time.RFC3339), 0)
fmt.Println("Updated timeout to", now.Add(5*time.Second))
}
func checkTimeoutAndRebalance(rdb *redis.Client) {
// 获取当前时间
now := time.Now()
// 获取超时时间
timeoutStr, err := rdb.Get(context.Background(), "timeout").Result()
if err != nil {
fmt.Println("Error:", err)
return
}
timeout, err := time.Parse(time.RFC3339, timeoutStr)
if err != nil {
fmt.Println("Error:", err)
return
}
// 检查超时情况
if now.After(timeout) {
fmt.Println("Node timeout detected. Rebalancing...")
// 重新计算哈希环
rebalanceHashRing()
}
}
func rebalanceHashRing() {
// 实现哈希环的重新计算逻辑
fmt.Println("Rebalancing hash ring...")
}
2.2 订阅超时事件
当订阅了 __keyevent@0__:expired
模式后,每当有键值过期时,Redis 就会发送一个消息给订阅者。这个消息会包含过期的键名(key name),因此可以在接收到消息时获取到哪个键值超时了。
检测超时键值时候,查看是否是已知的服务器名字,如果某个服务器下线了,则需要重新调整;
以下是如何使用 Go 语言监听键值超时事件:
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 设置 Redis 密码
DB: 0, // 使用默认数据库
})
// 订阅键值过期事件
pubsub := rdb.Subscribe(context.Background(), "__keyevent@0__:expired")
defer pubsub.Close()
// 在单独的协程中处理订阅消息
go func() {
for {
// 接收消息
msg, err := pubsub.Receive(context.Background())
if err != nil {
fmt.Println("Error:", err)
return
}
// 处理消息
switch msg := msg.(type) {
case *redis.Message:
// 获取超时的键名
key := msg.Payload
fmt.Println("Key expired:", key)
// 进行相应的处理逻辑
default:
fmt.Println("Received unknown message type")
}
}
}()
// 主程序保持运行
select {}
}
3. etcd故障检测实现
要检测某个服务节点是否下线,可以通过以下几种方式实现:
- 心跳检测:服务节点定期发送心跳信号到 Etcd,表明自己仍然活跃。如果某个节点长时间没有发送心跳信号,可以认为该节点已经下线。
- 定时检测:定时从 Etcd 中查询特定服务的节点信息,如果发现某个节点在最新的查询中不存在,说明该节点已下线。
- Watch 监听:通过 Etcd 的 Watch 功能,监视特定服务的节点信息。当有节点下线时,Watch 将会收到相应的通知,从而实现及时检测。
下面是一个简单的示例代码,演示了如何使用 Etcd 的 Watch 功能来监视特定服务的节点信息,并实时检测节点的上线和下线:
package main
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 连接 Etcd 客户端
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://localhost:2379"}, // Etcd 服务地址
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// Watch 特定服务节点的变化
watchServiceNodes(cli, "service1")
}
// Watch 特定服务节点的变化
func watchServiceNodes(cli *clientv3.Client, serviceName string) {
watchChan := cli.Watch(context.Background(), "/services/"+serviceName+"/", clientv3.WithPrefix())
for {
select {
case <-watchChan:
// 节点发生变化,处理节点上线和下线的情况
checkServiceNodes(cli, serviceName)
}
}
}
// 检查服务节点状态
func checkServiceNodes(cli *clientv3.Client, serviceName string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 查询特定服务名称对应的所有地址信息
resp, err := cli.Get(ctx, "/services/"+serviceName+"/", clientv3.WithPrefix())
if err != nil {
log.Println("Error checking service nodes:", err)
return
}
// 检查节点是否下线
// 遍历之前保存的节点信息,与当前 Etcd 中的节点信息进行对比
// 如果某个节点之前存在,但在当前查询中不存在,则认为该节点下线
for _, kv := range resp.Kvs {
nodeAddr := string(kv.Value)
log.Println("Service node:", nodeAddr)
}
log.Println("Service node check complete.")
}
4. 消息跨服务器路由
4.1 一对一消息路由
消息目的地用户如果不再本地服务器,则使用算法计算当前应该在哪个服务器上,使用kafka将数据转发到目标服务器,等待目标服务器去处理;
屏蔽检测
在转发消息前,入库前,首先需要检测对方是否屏蔽了自己;在集群模式下,用户登录后,在redis中更新自己的状态,同时pulish 自己的状态消息。
如果有多个订阅者同时订阅了同一个频道,那么每个订阅者都能收到发布到该频道的消息。
Redis 的 Pub/Sub(发布/订阅)模式允许多个订阅者同时订阅同一个频道,并且每个订阅者都能接收到频道中发布的消息。
当消息发布到频道时,Redis 会将该消息发送给所有订阅了该频道的客户端,即使有多个订阅者也是如此。每个订阅者都会收到相同的消息,并且消息的顺序与发布的顺序一致。
这种设计使得 Pub/Sub 模式非常适合实现广播、实时通知等场景,因为每个订阅者都能及时地收到发布的消息,无需额外的请求或轮询。
但是:Redis 的 Pub/Sub 模式下,消息不会被缓存。当消息发布到频道时,它会立即发送给所有订阅了该频道的客户端,而不会保存在 Redis 中。这意味着如果有订阅者在消息发布之后才订阅频道,它们将无法接收到之前发布的消息。
补充:用户的状态信息同时需要保存到redis中,便于错过消息的服务器加载;
三级缓存: 服务器也不是缓存所有用户状态信息,只有通信中需要路由的用户才会将状态加载到内存;可以设置30分钟超时,如果不使用则清理掉缓存;这样就是一个数据库,redis,内存的三级缓存模式;
相关发布消息的示例:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
)
func newRedisClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 服务地址
Password: "", // Redis 访问密码
DB: 0, // 选择的数据库
})
}
func publishMessage(channel, message string) error {
ctx := context.Background()
client := newRedisClient()
// 发布消息到指定频道
if err := client.Publish(ctx, channel, message).Err(); err != nil {
return err
}
// 关闭 Redis 客户端连接
if err := client.Close(); err != nil {
log.Println("Error closing Redis client:", err)
}
return nil
}
func main() {
channel := "user_actions" // 频道名称
message := "User login" // 消息内容
if err := publishMessage(channel, message); err != nil {
log.Fatal("Error publishing message:", err)
} else {
fmt.Println("Message published successfully!")
}
}
这样就可以通过调用 publishMessage
函数来发布消息到指定的频道了。在 main
函数中,指定了要发布的频道名称和消息内容,然后调用 publishMessage
函数将消息发布到指定的频道中。
订阅部分示例代码:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 服务器地址
Password: "", // 连接密码,若无密码则留空
DB: 0, // 选择数据库,默认为 0
})
// 创建一个新的上下文,用于控制订阅的生命周期
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 创建一个 WaitGroup,用于等待订阅结束
var wg sync.WaitGroup
// 订阅频道
sub := rdb.Subscribe(ctx, "channel1")
// 使用 goroutine 处理订阅消息
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
msg, err := sub.ReceiveMessage(ctx)
if err != nil {
log.Println("Error receiving message:", err)
continue
}
fmt.Println("Received message:", msg.Payload)
}
}
}()
fmt.Println("Subscribed to channel1. Waiting for messages...")
// 等待程序退出信号
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
// 等待订阅结束或接收到退出信号
wg.Add(1)
select {
case <-stop:
fmt.Println("Received interrupt signal. Exiting...")
cancel() // 取消订阅
case <-ctx.Done():
}
// 等待订阅处理结束
wg.Wait()
}
故障情况
如果目标服务器出现故障,暂时没有检测到,消息执行了错误的路由,但当用户再次登录,更换服务器后,若干秒后也能正确路由;
用户重新登录时候,会加载离线数据;
4.2 群聊消息路由
我们需要三个表:
群内成员表:用于查看群状态信息以及群成员信息;
群用户在各个服务器上分布状态:转发前需要查表,查看需要转发到哪些服务器;
群内当前服务器在线成表:接收到转发的消息后,服务器根据此表来确定需要转发到哪些用户;理想情况下,所有的用户都是一直在线的;文章来源:https://www.toymoban.com/news/detail-842029.html
tinode那种模式是直接用tcp连接来做集群,这样的模式在实际用的时候会比较复杂,n个节点的服务器每个节点都需要维护n-1个连接,结构复杂;如果有BUG,难以调试发现问题;我更喜欢松耦合的结构,使用kafka给每个服务节点一个队列,从里面收取自己需要转发的消息即可;文章来源地址https://www.toymoban.com/news/detail-842029.html
到了这里,关于IM服务集群与跨服务器消息路由策略的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!