golang实现延迟队列(delay queue)

这篇具有很好参考价值的文章主要介绍了golang实现延迟队列(delay queue)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

golang实现延迟队列

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

  1. 定义Task结构体,包含
  • ExecuteTime time.Time
  • Job func()
  1. 定义DelayQueue
  • TaskQueue []Task
  • func AddTask
  • func RemoveTask
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package main

import (
	"fmt"
	"time"
)

/*
基于go实现延迟队列
*/
type Task struct {
	ExecuteTime time.Time
	Job         func()
}

type DelayQueue struct {
	Tasks []*Task
}

func (d *DelayQueue) AddTask(t *Task) {
	d.Tasks = append(d.Tasks, t)
}

func (d *DelayQueue) RemoveTask() {
	//FIFO: remove the first task to enqueue
	d.Tasks = d.Tasks[1:]
}

func (d *DelayQueue) ExecuteTask() {
	for len(d.Tasks) > 0 {
		//dequeue a task
		currentTask := d.Tasks[0]
		if time.Now().Before(currentTask.ExecuteTime) {
			//if the task execution time is not up, wait
			time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
		}
		//execute the task
		currentTask.Job()
		//remove task who has been executed
		d.RemoveTask()
	}

}

func main() {
	fmt.Println("start delayQueue")
	delayQueue := &DelayQueue{}
	firstTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 1),
		Job: func() {
			fmt.Println("executed task 1 after delay")
		},
	}
	delayQueue.AddTask(firstTask)
	secondTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 7),
		Job: func() {
			fmt.Println("executed task 2 after delay")
		},
	}
	delayQueue.AddTask(secondTask)
	delayQueue.ExecuteTask()
	fmt.Println("all tasks have been done!!!")
}

效果:
golang实现延迟队列(delay queue),go,golang,开发语言,后端,数据结构,延迟队列,redis

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

  1. 初始化redis连接
  2. 延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker

# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	log "github.com/ziyifast/log"
	"time"
)

/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"

func initClient() (err error) {
	redisdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // not set password
		DB:       0,  //use default db
	})
	_, err = redisdb.Ping().Result()
	if err != nil {
		log.Errorf("%v", err)
		return err
	}
	return nil
}

func main() {
	err := initClient()
	if err != nil {
		log.Errorf("init redis client err: %v", err)
		return
	}
	addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
	addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())
	//执行队列中的任务
	getAndExecuteTask()
}

// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {
	err := redisdb.ZAdd(DelayQueueKey, redis.Z{
		Score:  float64(executeTime),
		Member: task,
	}).Err()
	if err != nil {
		panic(err)
	}
}

// 从redis中取一个task并执行
func getAndExecuteTask() {
	for {
		tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
			Min:    "-inf",
			Max:    fmt.Sprintf("%d", time.Now().Unix()),
			Offset: 0,
			Count:  1,
		}).Result()
		if err != nil {
			time.Sleep(time.Second * 1)
			continue
		}
		//处理任务
		for _, task := range tasks {
			fmt.Println("Execute task: ", task)
			//执行完任务之后用 ZREM 移除该任务
			redisdb.ZRem(DelayQueueKey, task)
		}
		time.Sleep(time.Second * 1)
	}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

golang实现延迟队列(delay queue),go,golang,开发语言,后端,数据结构,延迟队列,redis文章来源地址https://www.toymoban.com/news/detail-836027.html

到了这里,关于golang实现延迟队列(delay queue)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 从C语言到C++_20(仿函数+优先级队列priority_queue的模拟实现+反向迭代器)

    目录 1. priority_queue的模拟实现 1.1 未完全的priority_queue 1.2 迭代器区间构造和无参构造 1.3 仿函数的介绍和使用 1.4 完整priority_queue代码: 1.5 相关笔试选择题 答案: 2. 反向迭代器 2.1 反向迭代器的普通实现 reverse_iterator.h(不对称版) 2.2 反向迭代器的对称实现 reverse_iterator.

    2024年02月10日
    浏览(47)
  • [数据结构 -- C语言] 队列(Queue)

    目录 1、队列 1.1 队列的概念及结构 2、队列的实现 2.1 接口 3、接口的实现 3.1 初始化队列 3.2 队尾入队列 分析: 3.3 队头出队列 分析: 3.4 获取队列头部元素 3.5 获取队列尾部元素 3.6 获取队列中有效元素个数 3.7 检测队列是否为空 3.7.1 int 类型判空 3.7.2 bool 类型判空 3.8 销毁队

    2024年02月07日
    浏览(42)
  • 采用RibbaitMq延迟队列官方插件实现延迟队列

    RibbaitMq插件实现延迟队列主要是延迟消息到交换机的时间。 TTL+死信队列实现延时队列:正常消息过期没有被消费掉,进入死信队列后立即消息。 本章主要采用第一种方式。 1.安装RabbirMQ自行百度或者参考推荐资源: RabbitMQ部署-Windows篇 - 知乎 RabbitMQ windows下的安装与配置 - 腾

    2023年04月09日
    浏览(31)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(55)
  • STM32延迟(休眠)函数delay/sleep

    参考 MSP432(Keil5)——3.delay延时驱动_keil5delay函数_大写的小写字母的博客-CSDN博客 亲手测试过, 好用。用while太愚蠢,而且不好控制。 下载链接 https://download.csdn.net/download/quantum7/87982408 delay.h delay.c

    2024年02月12日
    浏览(35)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(52)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(74)
  • 【数据结构】 队列(Queue)与队列的模拟实现

    队列 :只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有==先进先出FIFO(FirstIn First Out) ==入队列: 进行插入操作的一端称为 队尾(Tail/Rear) 出队列: 进行删除操作的一端称为 队头(Head/Front) 在Java中, Queue是个接口,底层是通过链表实现

    2024年02月11日
    浏览(49)
  • Queue 队列的实现与应用

    只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有先进先出FIFO(FirstIn First Out) 入队列:进行插入操作的一端称为队尾(Tail/Rear) 出队列:进行删除操作的一端称为队(Head/Front), 队列可以通过数组和链表两种方式来实现。 队列的基本操作

    2024年02月07日
    浏览(34)
  • 【数据结构与算法】7、队列(Queue)的实现【用栈实现队列】

    ☘️ 队列 (Queue)是一种特殊的 线性表 , 只能在头尾两端进行操作 🎁 队尾(rear):只能从 队尾添加 元素,一般叫做 enQueue , 入队 🎁 队头(front):只能从 队头移除 元素,一般叫做 deQueue , 出队 🎁 先进先出 的原则, F irst I n F irst O ut, FIFO ☘️ 队列内部的实现可

    2024年02月12日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包