用Go写一个缓存工具

这篇具有很好参考价值的文章主要介绍了用Go写一个缓存工具。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

在项目发开过程中,一直有用到本地缓存和分布式本地缓存,最近从Java转到Go,也需要在Go里面重新实现下这个缓存工具。

本地缓存:堆内缓存,访问速度快,系统重启后缓存清除。

分布式本地缓存:分布式本地缓存本质上还是本地缓存,只有在分布式环境下,本地缓存数据不同步,但是有时候为了快速访问做了这样的一个缓存工具,就是在某个节点缓存变化之后,立即通知其他节点清除其他节点的缓存key,重新从数据库同步缓存数据。

代码实现

本地缓存

在java里本地缓存的实现是通过Guava Cache来实现的,查看Guava Cache可以参照之前写的一篇文章:Guava常用工具# Cache本地缓存_guava 缓存-CSDN博客,那么在Go里面这里我们选择常用的BigCache来替代Guava Cache,有关BigCache的知识可以线下去了解,实现也比较简单。

  1. 定义缓存接口模型
    package cache
    
    //这里使用泛型,可以放入任何类型的数据
    type Cache[T any] interface {
    
        //添加缓存
    	Put(key string, t T)
    
        //获取缓存
    	Get(key string) T
    
        //删除缓存
    	EvictKey(key string)
    
        //批量删除缓存
    	EvictKeys(keys []string)
    
        //获取所有缓存key
    	GetAllKeys() []string
    }
    

    缓存接口模型是缓存工具对外使用的基础接口,具体实现的缓存类型需要实现这些接口。

  2. 创建本地缓存
    package cache
    
    import (
    	"bytes"
    	"encoding/gob"
    	"reflect"
    
    	"git.qingteng.cn/ms-public/qtmf/logx"
    	"github.com/allegro/bigcache"
    )
    
    //localcache里有个Bigcache类型,缓存的操作也是对Bigcache的操作
    type LocalCache[T any] struct {
    	Cache *bigcache.BigCache
    }
    
    
    //加入缓存,注意这里有个序列化的问题,如果我们出入的对象不能进行序列化,这里会出现报错
    func (lc *LocalCache[T]) Put(key string, t T) {
    	var buf bytes.Buffer
    	enc := gob.NewEncoder(&buf)
    	err := enc.Encode(t)
    	if err != nil {
    		logx.WithoutContext().Error("local cache encode cache value error", key, err)
    		return
    	}
    	data := buf.Bytes()
    	lc.Cache.Set(key, data)
    }
    
    //获取缓存
    func (lc *LocalCache[T]) Get(key string) T {
    	var value T
    	data, err := lc.Cache.Get(key)
    	if err != nil {
    		logx.WithoutContext().Error("local cache get cache value error", key, err)
    		return reflect.New(reflect.TypeOf(value)).Elem().Interface().(T)
    	}
    
    	if data == nil {
    		return reflect.New(reflect.TypeOf(value)).Elem().Interface().(T)
    	}
    
    	buf := bytes.NewBuffer(data)
    	dec := gob.NewDecoder(buf)
    	err = dec.Decode(&value)
    	if err != nil {
    		logx.WithoutContext().Error("local cache decode cache value error", key, err)
    		return value
    	}
    	return value
    }
    
    //删除缓存
    func (lc *LocalCache[T]) EvictKey(key string) {
    	lc.Cache.Delete(key)
    }
    
    
    //批量删除缓存
    func (lc *LocalCache[T]) EvictKeys(keys []string) {
    	for _, key := range keys {
    		lc.Cache.Delete(key)
    	}
    }
    
    //获取所有缓存key
    func (lc *LocalCache[T]) GetAllKeys() []string {
    	var keys []string
    
    	// 遍历 bigcache
    	iterator := lc.Cache.Iterator()
    	for iterator.SetNext() {
    		current, _ := iterator.Value()
    		keys = append(keys, current.Key())
    	}
    	return keys
    }

到这里本地缓存的工具类就已经实现完了,我们只需要创建一个LocaclCache结构体即可开始使用,使用方式后续会介绍。 

分布式本地缓存

分布式本地缓存还是一个本地缓存,只不过多了一个分布式环境节点通知的步骤,这个通知我们采用的是redis的pud/sub机制,所以可以想象下分布式本地缓存的一个构造,肯定会有个bigcache属性用来做实际的存储,另外还需要有一个消息广播器来往其他节点广播消息。

  1. 定义分布式缓存结构体
    package cache
    
    type HeapDistributedCache[T any] struct {
    	LocalCache  *LocalCache[T]    //本地缓存
    	Config      *DistributedCacheConfig  //缓存的配置,广播的时候需要用到
    	Broadcaster *Broadcaster[T]  //消息广播器
    }
    
    //添加缓存并且进行消息广播
    func (hc *HeapDistributedCache[T]) Put(key string, t T) {
    	hc.LocalCache.Put(key, t)
    	if hc.Config.Broadcast {
    		hc.Broadcaster.broadcastEvict(hc.Config.Channel, []string{key})
    	}
    }
    
    //获取缓存
    func (hc *HeapDistributedCache[T]) Get(key string) T {
    	return hc.LocalCache.Get(key)
    }
    
    //删除缓存并且进行消息广播
    func (hc *HeapDistributedCache[T]) EvictKey(key string) {
    	hc.LocalCache.EvictKey(key)
    	if hc.Config.Broadcast {
    		hc.Broadcaster.broadcastEvict(hc.Config.Channel, []string{key})
    	}
    }
    
    //删除缓存并且进行消息广播
    func (hc *HeapDistributedCache[T]) EvictKeys(keys []string) {
    	hc.LocalCache.EvictKeys(keys)
    	if hc.Config.Broadcast {
    		hc.Broadcaster.broadcastEvict(hc.Config.Channel, keys)
    	}
    }
    
    //获取所有缓存key
    func (hc *HeapDistributedCache[T]) GetAllKeys() []string {
    	return hc.LocalCache.GetAllKeys()
    }
  2. 消息广播器
    消息广播器代码示例:

    package cache
    
    import (
    	"context"
    
    	"git.qingteng.cn/ms-app-ids/service-ids/internal/common/util"
    
    	"git.qingteng.cn/ms-app-ids/service-ids/internal/broadcast"
    	"git.qingteng.cn/ms-app-ids/service-ids/internal/infra/logw"
    	"github.com/go-redis/redis/v8"
    )
    
    type Broadcaster[T any] struct {
    	RedisClient redis.UniversalClient  //redis客户端
    	Node        *broadcast.Node   //节点信息
    }
    
    //注册消息通道
    func (b *Broadcaster[T]) doSubscribe(channel string, localCache *LocalCache[T]) {
    	pubsub := b.RedisClient.Subscribe(context.Background(), channel)
    	ch := pubsub.Channel()
        //注册消息通道成功后会有一个channel返回,后续这个通道有消息都会发过来,这里用select- 
        //channel的方式进行监听
    	go func() {
    		for {
    			select {
    			case msg := <-ch:
    				{
    					b.handleMessage(msg, b.Node.Id, localCache)
    				}
    			}
    		}
    	}()
    }
    
    //收到消息之后进行的处理
    func (b *Broadcaster[T]) handleMessage(msg *redis.Message, currentNodeId string, localCache *LocalCache[T]) {
    	logger := logw.WithoutContext()
    	var broadCastCacheMsg BroadcastCacheMessage
    	err := util.Unmarshal([]byte(msg.Payload), &broadCastCacheMsg)
    	if err != nil {
    		logger.Error("receiver cache broadcast cache msg failed", broadCastCacheMsg.Action, broadCastCacheMsg.NodeId)
    	}
    
        //如果是当前节点发出的消息不需要处理,因为当前节点的缓存已经更新过
    	if currentNodeId == broadCastCacheMsg.NodeId {
    		logger.Info("receiver save node msg")
    		return
    	}
    
        //不是当前节点的,说明缓存有更新,那就直接把缓存清除,后续会重新加载最新数据
    	if broadCastCacheMsg.Action == ActionEvict {
    		localCache.EvictKeys(broadCastCacheMsg.Keys)
    	}
    }
    
    //广播缓存key清除消息
    func (b *Broadcaster[T]) broadcastEvict(channel string, keys []string) {
    	message := &BroadcastCacheMessage{
    		NodeId: b.Node.Id,
    		Action: ActionEvict,
    		Keys:   keys,
    	}
    	b.publishCacheMsg(channel, message)
    }
    
    //广播缓存清空消息
    func (b *Broadcaster[T]) broadcastClear(channel string) {
    	message := &BroadcastCacheMessage{
    		NodeId: b.Node.Id,
    		Action: ActionEvict,
    	}
    	b.publishCacheMsg(channel, message)
    }
    
    //最终发布消息
    func (b *Broadcaster[T]) publishCacheMsg(channel string, message *BroadcastCacheMessage) {
    	logger := logw.WithoutContext()
    	sendMsgBytes, err := util.Marshal(message)
    	if err != nil {
    		logger.Error("send cache broadcast msg failed", "channel", channel, err)
    	}
    
    	err = b.RedisClient.Publish(context.Background(), channel, string(sendMsgBytes)).Err()
    	if err != nil {
    		logger.Error("send cache broadcast msg error", "channel", channel, "msg", message)
    		return
    	}
    }

    广播的时候用到了本地的一个节点,所以我们需要定义一个当前节点,可以想到这个节点是个单例,有个唯一的ID。
    节点示例:

    package broadcast
    
    import (
    	"sync"
    
    	uuidX "github.com/google/uuid"
    )
    
    var (
    	nodeInstance *Node
    	mutex        sync.Mutex
    )
    
    type Message struct {
    	NodeId  string
    	MsgType messageType
    	Body    string
    }
    
    type Node struct {
    	Id string
    }
    
    // GetNodeInstance 获取node单例对象
    func GetNodeInstance() *Node {
    	mutex.Lock()
    	defer mutex.Unlock()
    	if nodeInstance != nil {
    		return nodeInstance
    	}
    	nodeInstance = &Node{
    		Id: uuidX.New().String(),
    	}
    	return nodeInstance
    }

    缓存配置结构体定义:

    package cache
    
    import "time"
    
    const (
    	ActionEvict = "evict"
    )
    
    type BroadcastCacheMessage struct {
    	NodeId string
    	Action string
    	Keys   []string
    }
    
    type DistributedCacheConfig struct {
    	Ttl       time.Duration
    	Channel   string
    	Broadcast bool
    }



    消息广播器的实现就是一个发布订阅的机制,Redis的发布订阅是基于channel来的,就跟topic一样,每个缓存可以指定channel,然后监听对应的channel即可。

缓存构造器

我们创建好了两种缓存的底层实现,那么还需要定义一个缓存构造器,方便使用。

缓存构造器cache_builder示例:文章来源地址https://www.toymoban.com/news/detail-759389.html

package cache

import (
	"time"

	"git.qingteng.cn/ms-app-ids/service-ids/internal/broadcast"
	"git.qingteng.cn/ms-app-ids/service-ids/internal/infra/logw"
	"git.qingteng.cn/ms-public/qtmf/providers/redisx"
	"github.com/allegro/bigcache"
)

var (
	DefaultSecond       = 300
	DefaultCleanCWindow = 3
)

// ConfigLocalCache ttl传值需要跟上跟上单位,直接填入数字默认的单位是纳秒
func ConfigLocalCache[T any](ttl time.Duration) (*LocalCache[T], error) {
	if ttl <= 0 {
		ttl = time.Duration(DefaultSecond) * time.Second
	}
	config := bigcache.DefaultConfig(ttl)
	config.CleanWindow = time.Duration(DefaultCleanCWindow) * time.Second
	bigCache, err := bigcache.NewBigCache(config)
	if err != nil {
		logw.WithoutContext().Error("create local cache error")
		return nil, err
	}

	return &LocalCache[T]{
		Cache: bigCache,
	}, nil
}

func DefaultLocalCache[T any]() (*LocalCache[T], error) {
	config := bigcache.DefaultConfig(time.Duration(DefaultSecond) * time.Second)
	config.CleanWindow = time.Duration(DefaultCleanCWindow) * time.Second
	bigCache, err := bigcache.NewBigCache(config)
	if err != nil {
		logw.WithoutContext().Error("create local cache error")
		return nil, err
	}

	return &LocalCache[T]{
		Cache: bigCache,
	}, nil
}

//构建分布式本地缓存
func ConfigHeapDistributedCache[T any](config *DistributedCacheConfig) (*HeapDistributedCache[T], error) {
	if config.Ttl <= 0 {
		config.Ttl = time.Duration(DefaultSecond) * time.Second
	}
	localCache, err := ConfigLocalCache[T](config.Ttl)
	if err != nil {
		return nil, err
	}
	redisClient, err := redisx.Client()
	if err != nil {
		return nil, err
	}
	broadcaster := &Broadcaster[T]{
		RedisClient: redisClient,
		Node:        broadcast.GetNodeInstance(),
	}
	if config.Broadcast {
		broadcaster.doSubscribe(config.Channel, localCache)
	}
	return &HeapDistributedCache[T]{
		LocalCache:  localCache,
		Config:      config,
		Broadcaster: broadcaster,
	}, nil
}

缓存测试  

func TestLoaclCache(t *testing.T) {
	intcache, _ := cache.ConfigLocalCache[int](10 * time.Second)
	intcache.Put("test", 111)
	intValue := intcache.Get("test")
	println(intValue)

	stringCache, _ := cache.DefaultLocalCache[string]()
	stringCache.Put("test2", "hello world")
	stringValue := stringCache.Get("test2")
	println(stringValue)

	personCache, _ := cache.DefaultLocalCache[*Person]()
	person := &Person{
		Name: "张三",
		Age:  20,
	}
	personCache.Put("person", person)
	personCache.Put("person2", &Person{})
	presult := personCache.Get("person")
	println(presult.Name, presult.Age)
	fmt.Println(personCache.GetAllKeys())
}


localCache测试输出:
111
hello world
张三 20
[person person2]


func TestHeapDistributeCache(t *testing.T) {
	config := &cache.DistributedCacheConfig{
		Ttl:       5,
		Channel:   "test_heap_distribute",
		Broadcast: true,
	}
	stringcache, _ := cache.ConfigHeapDistributedCache[string](config)
	stringcache.Put("test_heap", "test_heap")
	println(stringcache.Get("test_heap"))

	pconfig := &cache.DistributedCacheConfig{
		Ttl:       5,
		Channel:   "test_person_distribute",
		Broadcast: true,
	}
	personCache, _ := cache.ConfigHeapDistributedCache[*Person](pconfig)
	person := &Person{
		Name: "张三",
		Age:  20,
	}
	personCache.Put("test_heap_person", person)
	presult := personCache.Get("test_heap_person")
	println(presult.Name, presult.Age)

	time.Sleep(time.Second * 5)
}

分布式缓存测试输出:
test_heap
张三 20

到了这里,关于用Go写一个缓存工具的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【go项目-geecache】动手写分布式缓存 day2 - 单机并发缓存

    [ Github- geecache ](luckly-0/geecache (github.com)) 收获总结: 了解接口的使用场景,它和函数之间的差别和优略势 测试文件要以_test结尾 系统设计要严谨,要考虑后期的拓展性和维护 ,比如load函数考虑到了分布式场景 数据结构之间的封装 sync.Mutex 互斥锁 如果我们要是实现并发缓存

    2023年04月18日
    浏览(65)
  • 使用go_concurrent_map 管理 并发更新缓存

    在后台服务中,为了提速,我在内存还做了一个告诉缓存来管理用户信息,根据更新通知,或者定时去redis中同步信息,那么在加载或者更新某个用户元素时,要防止并发, 当: 1)如果内存缓存没有; 2)去数据库或者redis加载; 3)添加到内存缓存; 这里就有个并发重复的

    2024年04月25日
    浏览(28)
  • Golang 通过开源库 go-redis 操作 NoSQL 缓存服务器

    前置条件: 1、导入库: import ( \\\"github.com/go-redis/redis/v8\\\" ) 2、搭建哨兵模式集群 具体可以百度、谷歌搜索,网上现成配置教程太多了,不行还可以搜教程视频,跟着视频博主一步一个慢动作,慢慢整。 本文只介绍通过 “主从架构 / 哨兵模式” 访问的形式,这是因为,单个

    2024年01月23日
    浏览(38)
  • 初识Go语言25-数据结构与算法【堆、Trie树、用go中的list与map实现LRU算法、用go语言中的map和堆实现超时缓存】

      堆是一棵二叉树。大根堆即任意节点的值都大于等于其子节点。反之为小根堆。   用数组来表示堆,下标为 i 的结点的父结点下标为(i-1)/2,其左右子结点分别为 (2i + 1)、(2i + 2)。 构建堆   每当有元素调整下来时,要对以它为父节点的三角形区域进行调整。 插入元素

    2024年02月12日
    浏览(39)
  • 【Redis】封装Redis缓存工具解决缓存穿透与缓存击穿问题

    基于StringRedisTemplate封装一个缓存工具,主要有一下几个方法 方法1:将任意Java对象序列化为json并存储在String的指定key中且设置TTL 方法2:将任意Java对象序列化为json并存储在String的指定key中,并可以设置逻辑过期时间,用户处理缓存击穿问题 方法3:根据指定的key进行查询缓

    2024年02月06日
    浏览(42)
  • 设计一个LRU(最近最少使用)缓存

    约束和假设 我们正在缓存什么? 我们正在缓存Web Query的结果 我们可以假设输入是有效的,还是需要对其验证? 假设输入是有效的 我们可以假设它适应内存吗? 对 编码实现

    2024年01月24日
    浏览(45)
  • 设计一个支持并发的前端接口缓存

    目录 ​​​​​​​ 缓存池 并发缓存 问题 思考 优化🤔 总结 最后         缓存池不过就是一个 map ,存储接口数据的地方,将接口的路径和参数拼到一块作为 key ,数据作为 value 存起来罢了,这个咱谁都会。 封装一下调用接口的方法,调用时先走咱们缓存数据。 然后

    2024年02月07日
    浏览(33)
  • Redis缓存设计与性能优化【并发创建同一个缓存,解决方案:互斥锁】

    开发人员使用“缓存+过期时间”的策略既可以加速数据读写, 又保证数据的定期更新, 这种模式基本能够满足绝大部分需求。 但是有两个问题如果同时出现, 可能就会对应用造成致命的危害: 当前key是一个热点key(例如一个热门的娱乐新闻),并发量非常大。 重建缓存不

    2024年04月09日
    浏览(39)
  • [Python] 缓存实用工具

    cachetools 是一个 Python 库,提供了用于缓存的实用工具,包括各种缓存算法和数据结构,如 LRU (最近最少使用)缓存、 TTL (时间到期)缓存等。使用 cachetools 可以轻松地在 Python 应用程序中实现缓存功能,提高性能并减少对重复计算的需求。 以下是一个简单示例代码,演示

    2024年03月09日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包