IM服务集群与跨服务器消息路由策略

这篇具有很好参考价值的文章主要介绍了IM服务集群与跨服务器消息路由策略。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

在单机模式下,相对简单,认为所有的客户都在同一台服务器上,姑且认为1台服务器可以同时支持1万用户在线,在更多用户同时在线时,则需要集群来实现负载均衡。

集群的算法需要解决故障处理以及动态添加的问题,同时需要考虑如何在集群节点间路由数据。

1. 负载均衡算法

这里使用一致性哈希环来实现。

添加节点

每个设备都有唯一的名字(key),同时按照重复因子的个数添加到环中;每个名字以及重复的序号做hash算法得到一个哈希数,所以环上每个节点都有一个数值;

查询

在查询时候,提供了某个字符串,计算哈希,如果哈希比某个环上节点的数值大,就选这个节点,如果都找不到,就选第一个,所以称之为环而不是列表;

故障移除

当某个服务器故障退出时候,需要从环上删除节点,那么此服务节点上的用户再次登录时,会选到相邻的服务节点上;而正常服务节点上的用户不会受到影响;

1.1 redis服务发现

一般GO服务发现使用etcd比较多,但是为了少安装几个服务,可以考虑使用redis实现;

注册服务

每个服务上线时候,都在“IMSERVER"键值中设置节点名字的field和value,value中包含更新时间;

每个服务定期更新field中的活动时间戳,证明工作正常;

轮询可以根据更新时间确定哪些节点存活,如果某些节点失效了,需要重新计算哈希值;

如果为了使用超时机制来实现自动检测,那么可以再额外添加了一个键值,名字就是服务的节点名最后后缀,这样当某个服务节点的键值超时就可以即时发现;

1.2 etcd服务发现

在一个分布式系统中,服务注册与发现是一个关键的组件,用于在集群中动态地注册服务和发现其他服务。Etcd 提供了一种方便的方式来实现服务注册与发现,通常的做法包括以下几个步骤:

  1. 服务注册
    • 当一个服务启动时,它会将自己的地址信息(如 IP 地址和端口号)以及其他相关信息注册到 Etcd 中。
    • 通常情况下,服务会将自己注册为一个可识别的名称,比如服务的名称、版本号等,以便其他服务能够识别和访问。
  2. 服务发现
    • 当一个服务需要与其他服务通信时,它会向 Etcd 发送一个服务发现的请求,查询特定服务名称对应的地址信息。
    • Etcd 将返回一个或多个注册了该名称的服务的地址信息,然后服务可以使用这些信息来进行通信。

在 Go 中,可以使用 Etcd 的 Go 客户端库(如 etcd/clientv3)来实现服务注册与发现。以下是一个简单的示例,演示了如何使用 Etcd 实现服务注册与发现:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	// 连接 Etcd 客户端
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"http://localhost:2379"}, // Etcd 服务地址
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// 注册服务
	if err := registerService(cli, "service1", "127.0.0.1:8080"); err != nil {
		log.Fatal(err)
	}

	// 发现服务
	addrs, err := discoverService(cli, "service1")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("Service1 addresses:", addrs)
}

// 注册服务
func registerService(cli *clientv3.Client, serviceName, serviceAddr string) error {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 将服务地址信息注册到 Etcd 中
	_, err := cli.Put(ctx, "/services/"+serviceName+"/"+serviceAddr, serviceAddr)
	return err
}

// 发现服务
func discoverService(cli *clientv3.Client, serviceName string) ([]string, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 查询特定服务名称对应的所有地址信息
	resp, err := cli.Get(ctx, "/services/"+serviceName+"/", clientv3.WithPrefix())
	if err != nil {
		return nil, err
	}

	// 解析地址信息
	var addrs []string
	for _, kv := range resp.Kvs {
		addrs = append(addrs, string(kv.Value))
	}
	return addrs, nil
}

1.2 重复因子

添加重复因子是为了增加哈希环上节点的数量,从而更好地实现负载均衡。在哈希环中,服务器节点的数量越多,分布越均匀,可以提高负载均衡的效果。如果只有少量的服务器节点,哈希环上的节点分布可能不够均匀,导致负载分配不平衡。

通过添加重复因子,可以让每个服务器节点在哈希环上出现多次,增加了节点的数量,从而更好地平衡了负载。例如,如果只有3台服务器,但设置了重复因子为10,则在哈希环上每个服务器节点会出现10次,相当于虚拟出了30个节点,增加了负载均衡的可能性。

在服务器数量为3到10台之间,可以考虑将重复因子设置为3或4。这样可以在哈希环上为每个服务器节点创建3到4个虚拟节点,以增加哈希环上节点的数量,提高负载均衡的效果。

例如,如果有5台服务器,设置重复因子为3,则在哈希环上会创建15个虚拟节点,增加了节点的数量,有助于更均匀地分配负载。而如果服务器数量较少,设置更大的重复因子可能会导致哈希环上节点过多,增加计算开销;而设置更小的重复因子可能会降低负载均衡的效果。

综上所述,设置重复因子为3或4可能是一个合适的选择,可以在保证负载均衡的同时,避免哈希环节点数量过多带来的性能损耗。然而,具体的选择还应根据实际情况和性能要求进行调整和评估。

2. redis检测故障实现

可以设计一个基于定时任务的机制来实现:

  1. 服务节点定时更新超时时间:每个服务节点定期(比如每隔 5 秒)更新自己的超时时间,将当前时间加上超时阈值,并将新的超时时间写入 Redis 中。这样可以确保每个服务节点的超时时间保持最新。

  2. 其他节点定时轮询超时情况:其他节点定期(比如每隔 10 秒)轮询所有节点的超时情况。对于每个节点,获取其超时时间,如果发现某个节点超时了,就触发重新计算哈希环的操作。

  3. 重新计算哈希环:一旦发现某个节点超时,触发重新计算哈希环的操作。这意味着需要重新收集所有服务节点的信息,并更新哈希环,确保哈希环中的每个节点都是当前活动的节点。

  4. 定时任务实现:使用 Go 中的定时任务库(如 time.Timertime.Tick)来实现定时任务。每个节点维护一个定时器,在定时器触发时执行相应的操作(更新超时时间或者轮询超时情况)。

2.1 轮询查询

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-redis/redis/v8"
)

func main() {
	// 创建 Redis 客户端
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // 设置 Redis 密码
		DB:       0,  // 使用默认数据库
	})

	// 模拟每个服务节点更新超时时间的定时任务
	go func() {
		ticker := time.NewTicker(5 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				// 更新超时时间
				updateTimeout(rdb)
			}
		}
	}()

	// 模拟其他节点轮询超时情况的定时任务
	go func() {
		ticker := time.NewTicker(10 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				// 轮询超时情况并重新计算哈希环
				checkTimeoutAndRebalance(rdb)
			}
		}
	}()

	// 主程序保持运行
	select {}
}

func updateTimeout(rdb *redis.Client) {
	// 获取当前时间
	now := time.Now()
	// 更新超时时间
	rdb.Set(context.Background(), "timeout", now.Add(5*time.Second).Format(time.RFC3339), 0)
	fmt.Println("Updated timeout to", now.Add(5*time.Second))
}

func checkTimeoutAndRebalance(rdb *redis.Client) {
	// 获取当前时间
	now := time.Now()
	// 获取超时时间
	timeoutStr, err := rdb.Get(context.Background(), "timeout").Result()
	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	timeout, err := time.Parse(time.RFC3339, timeoutStr)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	// 检查超时情况
	if now.After(timeout) {
		fmt.Println("Node timeout detected. Rebalancing...")
		// 重新计算哈希环
		rebalanceHashRing()
	}
}

func rebalanceHashRing() {
	// 实现哈希环的重新计算逻辑
	fmt.Println("Rebalancing hash ring...")
}
   

2.2 订阅超时事件

当订阅了 __keyevent@0__:expired 模式后,每当有键值过期时,Redis 就会发送一个消息给订阅者。这个消息会包含过期的键名(key name),因此可以在接收到消息时获取到哪个键值超时了。

检测超时键值时候,查看是否是已知的服务器名字,如果某个服务器下线了,则需要重新调整;

以下是如何使用 Go 语言监听键值超时事件:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-redis/redis/v8"
)

func main() {
	// 创建 Redis 客户端
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // 设置 Redis 密码
		DB:       0,  // 使用默认数据库
	})

	// 订阅键值过期事件
	pubsub := rdb.Subscribe(context.Background(), "__keyevent@0__:expired")
	defer pubsub.Close()

	// 在单独的协程中处理订阅消息
	go func() {
		for {
			// 接收消息
			msg, err := pubsub.Receive(context.Background())
			if err != nil {
				fmt.Println("Error:", err)
				return
			}

			// 处理消息
			switch msg := msg.(type) {
			case *redis.Message:
				// 获取超时的键名
				key := msg.Payload
				fmt.Println("Key expired:", key)
				// 进行相应的处理逻辑
			default:
				fmt.Println("Received unknown message type")
			}
		}
	}()

	// 主程序保持运行
	select {}
}

3. etcd故障检测实现

要检测某个服务节点是否下线,可以通过以下几种方式实现:

  1. 心跳检测:服务节点定期发送心跳信号到 Etcd,表明自己仍然活跃。如果某个节点长时间没有发送心跳信号,可以认为该节点已经下线。
  2. 定时检测:定时从 Etcd 中查询特定服务的节点信息,如果发现某个节点在最新的查询中不存在,说明该节点已下线。
  3. Watch 监听:通过 Etcd 的 Watch 功能,监视特定服务的节点信息。当有节点下线时,Watch 将会收到相应的通知,从而实现及时检测。

下面是一个简单的示例代码,演示了如何使用 Etcd 的 Watch 功能来监视特定服务的节点信息,并实时检测节点的上线和下线:

package main

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	// 连接 Etcd 客户端
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"http://localhost:2379"}, // Etcd 服务地址
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// Watch 特定服务节点的变化
	watchServiceNodes(cli, "service1")
}

// Watch 特定服务节点的变化
func watchServiceNodes(cli *clientv3.Client, serviceName string) {
	watchChan := cli.Watch(context.Background(), "/services/"+serviceName+"/", clientv3.WithPrefix())

	for {
		select {
		case <-watchChan:
			// 节点发生变化,处理节点上线和下线的情况
			checkServiceNodes(cli, serviceName)
		}
	}
}

// 检查服务节点状态
func checkServiceNodes(cli *clientv3.Client, serviceName string) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 查询特定服务名称对应的所有地址信息
	resp, err := cli.Get(ctx, "/services/"+serviceName+"/", clientv3.WithPrefix())
	if err != nil {
		log.Println("Error checking service nodes:", err)
		return
	}

	// 检查节点是否下线
	// 遍历之前保存的节点信息,与当前 Etcd 中的节点信息进行对比
	// 如果某个节点之前存在,但在当前查询中不存在,则认为该节点下线
	for _, kv := range resp.Kvs {
		nodeAddr := string(kv.Value)
		log.Println("Service node:", nodeAddr)
	}

	log.Println("Service node check complete.")
}

4. 消息跨服务器路由

4.1 一对一消息路由

消息目的地用户如果不再本地服务器,则使用算法计算当前应该在哪个服务器上,使用kafka将数据转发到目标服务器,等待目标服务器去处理;

屏蔽检测

在转发消息前,入库前,首先需要检测对方是否屏蔽了自己;在集群模式下,用户登录后,在redis中更新自己的状态,同时pulish 自己的状态消息。

如果有多个订阅者同时订阅了同一个频道,那么每个订阅者都能收到发布到该频道的消息。

Redis 的 Pub/Sub(发布/订阅)模式允许多个订阅者同时订阅同一个频道,并且每个订阅者都能接收到频道中发布的消息。

当消息发布到频道时,Redis 会将该消息发送给所有订阅了该频道的客户端,即使有多个订阅者也是如此。每个订阅者都会收到相同的消息,并且消息的顺序与发布的顺序一致。

这种设计使得 Pub/Sub 模式非常适合实现广播、实时通知等场景,因为每个订阅者都能及时地收到发布的消息,无需额外的请求或轮询。

但是:Redis 的 Pub/Sub 模式下,消息不会被缓存。当消息发布到频道时,它会立即发送给所有订阅了该频道的客户端,而不会保存在 Redis 中。这意味着如果有订阅者在消息发布之后才订阅频道,它们将无法接收到之前发布的消息。

补充:用户的状态信息同时需要保存到redis中,便于错过消息的服务器加载;

三级缓存: 服务器也不是缓存所有用户状态信息,只有通信中需要路由的用户才会将状态加载到内存;可以设置30分钟超时,如果不使用则清理掉缓存;这样就是一个数据库,redis,内存的三级缓存模式;

相关发布消息的示例:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/go-redis/redis/v8"
)

func newRedisClient() *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // Redis 服务地址
		Password: "",                // Redis 访问密码
		DB:       0,                 // 选择的数据库
	})
}

func publishMessage(channel, message string) error {
	ctx := context.Background()
	client := newRedisClient()

	// 发布消息到指定频道
	if err := client.Publish(ctx, channel, message).Err(); err != nil {
		return err
	}

	// 关闭 Redis 客户端连接
	if err := client.Close(); err != nil {
		log.Println("Error closing Redis client:", err)
	}

	return nil
}

func main() {
	channel := "user_actions" // 频道名称
	message := "User login"   // 消息内容

	if err := publishMessage(channel, message); err != nil {
		log.Fatal("Error publishing message:", err)
	} else {
		fmt.Println("Message published successfully!")
	}
}

这样就可以通过调用 publishMessage 函数来发布消息到指定的频道了。在 main 函数中,指定了要发布的频道名称和消息内容,然后调用 publishMessage 函数将消息发布到指定的频道中。

订阅部分示例代码:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"time"

	"github.com/go-redis/redis/v8"
)

func main() {
	// 创建 Redis 客户端
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // Redis 服务器地址
		Password: "",               // 连接密码,若无密码则留空
		DB:       0,                // 选择数据库,默认为 0
	})

	// 创建一个新的上下文,用于控制订阅的生命周期
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 创建一个 WaitGroup,用于等待订阅结束
	var wg sync.WaitGroup

	// 订阅频道
	sub := rdb.Subscribe(ctx, "channel1")

	// 使用 goroutine 处理订阅消息
	go func() {
		defer wg.Done()
		for {
			select {
			case <-ctx.Done():
				return
			default:
				msg, err := sub.ReceiveMessage(ctx)
				if err != nil {
					log.Println("Error receiving message:", err)
					continue
				}
				fmt.Println("Received message:", msg.Payload)
			}
		}
	}()

	fmt.Println("Subscribed to channel1. Waiting for messages...")

	// 等待程序退出信号
	stop := make(chan os.Signal, 1)
	signal.Notify(stop, os.Interrupt)

	// 等待订阅结束或接收到退出信号
	wg.Add(1)
	select {
	case <-stop:
		fmt.Println("Received interrupt signal. Exiting...")
		cancel() // 取消订阅
	case <-ctx.Done():
	}

	// 等待订阅处理结束
	wg.Wait()
}

故障情况

如果目标服务器出现故障,暂时没有检测到,消息执行了错误的路由,但当用户再次登录,更换服务器后,若干秒后也能正确路由;

用户重新登录时候,会加载离线数据;

4.2 群聊消息路由

我们需要三个表:

群内成员表:用于查看群状态信息以及群成员信息;

群用户在各个服务器上分布状态:转发前需要查表,查看需要转发到哪些服务器;

群内当前服务器在线成表:接收到转发的消息后,服务器根据此表来确定需要转发到哪些用户;理想情况下,所有的用户都是一直在线的;

tinode那种模式是直接用tcp连接来做集群,这样的模式在实际用的时候会比较复杂,n个节点的服务器每个节点都需要维护n-1个连接,结构复杂;如果有BUG,难以调试发现问题;我更喜欢松耦合的结构,使用kafka给每个服务节点一个队列,从里面收取自己需要转发的消息即可;文章来源地址https://www.toymoban.com/news/detail-842029.html

到了这里,关于IM服务集群与跨服务器消息路由策略的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 服务器弱口令漏洞修复策略

    vi /etc/pam.d/system-auth password requisite pam_cracklib.so try_first_pass retry=3 minlen=8 ocredit=-1 ucredit=-1 lcredit=-1 dcredit=-1 try_first_pass retry=3:在密码设置交互界面,用户有 3 次机会重设密码。 minlen=:此选项用来设置新密码的最小长度ucredit= :此选项用来设定新密码中可以包含的大写字母的最

    2024年02月07日
    浏览(48)
  • MySQL 服务器的调优策略

    点击上方 ↑ “追梦 Java”关注,一起追梦! 在工作中,我们发现慢查询一般有2个途径,一个是被动的,一个是主动的。被动的是当业务人员反馈某个查询界面响应的时间特别长,你才去处理。主动的是通过通过分析慢查询日志来主动发现执行效率缓慢的 sql 语句,或者通过

    2024年02月15日
    浏览(43)
  • 物理寻址和功能寻址,服务器不同的应答策略和NRC回复策略

    详细策略上,又分为服务有子功能,和不存在子功能。 存在子功能的情况下,又分为supress postive response (即子功能字节的bit7)位=1,和=0两种情况 iso 14229-1也给我们列出了表格 1.1.1先讲功能寻址,supress postive response =0的情况! 看图之前,对图中描述作出必要的解释 *1)Yes代

    2024年04月28日
    浏览(40)
  • 阿里云国外服务器价格购买与使用策略

    阿里云国外服务器优惠活动「全球云服务器精选特惠」,国外服务器租用价格24元一个月起,免备案适合搭建网站,部署独立站等业务场景,阿里云服务器网aliyunfuwuqi.com分享阿里云国外服务器优惠活动: 官方活动 https://t.aliyun.com/U/bLuDyL 精选性能更强,性价比更高的计算产品

    2024年01月18日
    浏览(53)
  • 服务器主机:复杂理论的视角与SEO策略

    本文分享自天翼云开发者社区《服务器主机:复杂理论的视角与SEO策略》,作者:不知不觉 在数字世界的演变中,服务器主机在信息存储和数据处理方面发挥着核心作用。本文将带你重新认识服务器主机的价值,并通过复杂理论解释其重要性和必要性,同时结合SEO布局来

    2024年02月08日
    浏览(42)
  • 服务器主机安全的重要性及防护策略

    在数字化时代,服务器主机安全是任何组织都必须高度重视的问题。无论是大型企业还是小型企业,无论是政府机构还是个人用户,都需要确保其服务器主机的安全,以防止数据泄露、网络攻击和系统瘫痪等严重后果。 一、服务器主机安全的重要性 1.数据保护:服务器主机存

    2024年02月05日
    浏览(54)
  • 配置Nginx作为静态资源服务器及安全策略

    上一篇文章写了Nginx负载均衡实现方案详解,有同学私信我说能不能写一篇关于nginx代理静态资源的文章。当然没问题,这篇文章就分享一下如何配置Nginx作为静态资源服务器同时也分享一些常用的安全策略配置。 静态资源指的是在服务器端存储的不会变化的文件,这些文件的

    2024年02月21日
    浏览(50)
  • 企业微信消息推送(一)接收消息服务器URL

    1.点击左上角头像,打开微信管理平台 2.创建应用 3.获取五个参数 3.1获取应用的AgentId、Secret 3.2获取企业ID 配置接收消息服务器URL 企业微信限制过多,公司域名所有权检验不通过。采用接收消息服务器URL的方式。 3.3 获取token、EncodingAESKey 4.1 内网穿透、本地开发 先将请求打到

    2024年02月04日
    浏览(45)
  • Kafka查看消息是否堆积(服务器)

    查询消费组 先进入kafka中的bin目录 ./kafka-consumer-groups.sh --bootstrap-server ip:9092 --list  查询消费组的消费情况 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092 --describe --group 组名称   解析 TOPIC :topic消息队列id CURRENT-OFFSET :当前消费的偏移量 LAG:消息堆积数量 查看分区中消息偏移量

    2024年02月06日
    浏览(46)
  • 等保三级安全加固,服务器三权分立设置,mysql密码策略登录策略

    1、安全计算环境 1)数据库、服务器未配置口令复杂度策略。 建议强制配置口令的复杂度策略(复杂度包含字母大小写,数字,特殊字符,密码长度八位以上),防止口令被轻易破解。 2)数据库、服务器未配置口令有效期策略。 建议配置数据库口令有效期策略,最短更改时间

    2024年02月08日
    浏览(70)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包