redis — redis cluster集群模式下如何实现批量可重入锁?

这篇具有很好参考价值的文章主要介绍了redis — redis cluster集群模式下如何实现批量可重入锁?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、redis cluster 集群版

在Redis 3.0版本以后,Redis发布了Redis Cluster。该集群主要支持搞并发和海量数据处理等优势,当 Redis 在集群模式下运行时,它处理数据存储的方式与作为单个实例运行时不同。这是因为它应该准备好跨多个节点分发数据,从而实现水平可扩展性。具体能力表现为:

  • 自动分割数据到不同的节点上
  • 整个集群的部分节点失败或者不可达的情况下能够继续处理命令

Redis没有使用一致性hash,而是引入哈希槽的概念,也就是 Hash Slot。Redis集群由16384个哈希槽slot,每个key通过CRC16校验后对16384取模来决定放置那个槽,集群的每个节点负责一部分hash槽,也就是说数据存放在hash槽里,而每个节点只负责部分hash槽(这样数据就存放在不同的节点)。

例如:node1、node2、node3三个节点,node1节点负责0到5500号hash槽,node2节点负责5501到11000号hash槽,node3节点负责11001到16384号hash槽。这种结构很容易添加或者删除节点,比如如果我想新添加个节点node4, 我需要从节点 node1, node2, node3中得部分槽到node4上. 如果我想移除节点node1,需要将node1中的槽移到node2和node3节点上,然后将没有任何槽的node1节点从集群中移除即可. 由于从一个节点将哈希槽移动到另一个节点并不会停止服务,所以无论添加删除或者改变某个节点的哈希槽的数量都不会造成集群不可用的状态。
redis — redis cluster集群模式下如何实现批量可重入锁?,redis,redis,哈希算法,java,后端,缓存,spring boot

在某些集群方案中,涉及多个key的操作会被限制在一个slot中,如Redis Cluster中的mget/mset操作。这里就会涉及到 哈希标签 Hash Tag 的概念。

Hash Tag是用于计算哈希槽时的一个特殊场景,是一种确保多个键分配到同一个哈希槽中的方法。这是为了在Redis集群中实现多键操作而使用的。为了实现Hash Tag,在某些情况下,会以稍微不同的方式计算key的哈希槽。如果key包含"{…}"模式,则仅对{和}之间的子字符串进行散列以获取哈希槽。但由于可能存在多个{或}出现,因此该算法遵循以下规则:

  • 如果key包含字符 {
  • 并且如果 } 字符位于 { 的右侧
  • 并且在第一个 { 和第一个 } 之间存在一个或多个字符

对于符合上述规则的key,则不会对整个key进行散列处理,而只会对第一次出现 { 和随后第一次出现 } 之间的内容进行散列。否则,对整个key进行散列处理。
redis — redis cluster集群模式下如何实现批量可重入锁?,redis,redis,哈希算法,java,后端,缓存,spring boot

不使用hash tag批量设置不同名称的key:

127.0.0.1:6379> mset name name1 name2 name3
(error) CROSSSLOT Keys in request don't hash to the same slot

显示错误信息:CROSSSLOT 请求中的key没有哈希到同一个插槽。这个问题是因为多键操作的时候每个键对应的slot可能不是一个,客户端没法做move操作。

解决思路就是采用redis cluster的hashTag,当redis的key加上hashTag时,集群算key的slot是按照hashTag进行计算,即可保证hashTag一致的key能分配到相同的stlot中。:

127.0.0.1:6379> mset name {name} {name}1 {name}2 {name}3

二、redis 分布式锁

Redis锁使用起来比较简单,既可以锁定单个键,也可以批量锁定多个键,以实现更大规模的操作。它也是分布式应用中使用最广泛的分布式锁实施方式,可以有效解决单点故障、死锁和负载失衡等问题。

大规模锁定Redis,实现批量操作,一般通过以下实现:

  • 使用Redis的消息订阅机制,创建消息频道,用于锁定指定键之间的多个键。消息频道的名字称为锁名,它代表锁定的范围和跨度。
  • 然后,通过Redis的SUBSCRIBE命令订阅消息频道名字,比如“ lock_key”,并调用Redis BLPOP,将锁定的键占据,以实现批量锁定。
  • 此外,也可以使用Redis的Lua脚本实现批量锁定。获取带锁的Key数组,这里以数组形式表示。同时,以原子的形式执行多个SETNX命令,一旦全部执行成功,则实现批量锁定:
local locks = red:lrange("lock_keys", 1, -1)
for i, v in iprs(locks) do
    if redis.call("setnx", v, field) == 1 then
        red.lpush("locked_keys", v)
    end
end

释放锁定的键,实现批量解锁,语句如下:

local unlocked_locks = red:lrange("locked_keys",1, -1)
for i, v in iprs(unlocked_locks) do
    red.del(v)
end
red.del("locked_keys")

使用Redis的WATCH功能,防止多个客户端同时更新同一键,即如果更新发生乐观锁的冲突的情况下,返回失败给客户端,从而保证了锁定的原子性:

-- 使用Redis watch,开始监听
red.watch("lock_keys")
-- 进行具体操作
---- 解锁操作
red.unwatch()

Redis锁使用起来非常简单,可以用于单个键锁定和大规模锁定,从而实现批量操作,有效解决分布式应用中的死锁、负载失衡、单点故障等问题。

三、如何使用 redis 实现批量可重入锁?

1、方案一:Lua脚本批量加锁

Lua加锁脚本处理:

	/**
	 * 加锁脚本
	 * KEYS[1] key
	 * ARGV[1] value
	 * ARGV[2] expire
	 * 判断key是否存在,不存在则加锁,并记录加锁次数+1;若存在,则判断value是否相等,相等则记录加锁次数+1,不相等则返回0
	 */
	private static final String REENTRANT_LOCK_SCRIPT = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
        "    redis.call('SET', KEYS[1], ARGV[1]) " +
        "    redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "    redis.call('INCR', 'lockCount:' .. KEYS[1]) " +
        "    return 1 " +
        "else " +
        "    if redis.call('GET', KEYS[1]) == ARGV[1] then " +
        "        redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "        redis.call('INCR', 'lockCount:' .. KEYS[1]) " +
        "        return 1 " +
        "    else " +
        "        return 0 " +
        "    end " +
        "end";
        
    @Bean
    public DefaultRedisScript<Long> reentrantLockRedisScript(){
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(REENTRANT_LOCK_SCRIPT, Long.class);
        try {
            List<Boolean> results= redisTemplate.getConnectionFactory().getConnection().scriptExists(redisScript.getSha1());
            if(Boolean.FALSE.equals(results.get(0)))
            {
                redisTemplate.getConnectionFactory().getConnection().scriptLoad(redisScript.getScriptAsString().getBytes());
                log.info("redis reentrantLockRedisScript load success");
            }

        }catch (Exception ex){
            log.error("redis reentrantLockRedisScript load error",ex);
        }

        return redisScript;
    }

Lua解锁脚本处理:

    private static final String RELEASE_REENTRANT_LOCK_SCRIPT = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
            "    return 0 " +
            "else " +
            "    if redis.call('GET', KEYS[1]) == ARGV[1] then " +
            "        redis.call('DECR', 'lockCount:' .. KEYS[1]) " +
            "        if redis.call('GET', 'lockCount:' .. KEYS[1]) == '0' then " +
            "            redis.call('DEL', KEYS[1]) " +
            "        end " +
            "        return 1 " +
            "    else " +
            "        return 0 " +
            "    end " +
            "end";
    @Bean
    public DefaultRedisScript<Long> releaseReentrantLockRedisScript(){
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_REENTRANT_LOCK_SCRIPT, Long.class);
        try {
            List<Boolean> results= redisTemplate.getConnectionFactory().getConnection().scriptExists(redisScript.getSha1());
            if(Boolean.FALSE.equals(results.get(0)))
            {
                redisTemplate.getConnectionFactory().getConnection().scriptLoad(redisScript.getScriptAsString().getBytes());
                log.info("redis releaseReentrantLockRedisScript load success");
            }

        }catch (Exception ex){
            log.error("redis releaseReentrantLockRedisScript load error",ex);
        }

        return redisScript;
    }

redis set 多个key场景需要hash tag处理:

private static final String KEY_FORMAT = "%s{%s}";
String slotKey = String.format(KEY_FORMAT, key, key);

结合以上封装,批量可重入锁的方法处理如下:

/**
     * 可重入锁,value相同的情况下,可重复加锁,当所有加锁方都解锁后才会释放锁
     *
     * @param key           锁的key
     * @param value         锁的value
     * @param expireSeconds 锁过期时间
     * @param waitSeconds   等待时间
     * @param process
     * @return
     */
    public boolean reentrantLock(String key, String value, long expireSeconds, long waitSeconds, Runnable process) {
        boolean lock = false;
        try {
            //特殊处理key,为了保证原key和计数key落在同一个slot,将原key拼装成: key{key}
            String slotKey = String.format(KEY_FORMAT, key, key);
            long start = System.currentTimeMillis();
            while (!(lock = redisUtil.execute(reentrantLockRedisScript, Collections.singletonList(slotKey), value, expireSeconds) == 1)) {
                Thread.sleep(100);
                if ((System.currentTimeMillis() - start) / 1000 > waitSeconds) {
                    break;
                }
            }
            //加锁成功,执行传入的方法,最后用lua脚本判断锁的value是否还是当前的value,是则执行解锁
            if (lock) {
                try {
                    process.run();
                } catch (Exception ex) {
                    throw ex instanceof BusinessException ? ex : new BusinessException(ex.getMessage());
                } finally {
                    redisUtil.execute(releaseReentrantLockRedisScript, Collections.singletonList(slotKey), value);
                }
            }

        } catch (BusinessException businessException) {
            throw businessException;
        } catch (Exception e) {
            log.error("redis lockAndRun error!lockKey=" + key, e);
        }
        return lock;
    }

    /**
     * 先获取可重入锁,获取成功后批量加锁,执行传入的方法
     *
     * @param keys
     * @param reentrantKey
     * @param value
     * @param expireSeconds
     * @param waitSeconds
     * @param process
     * @return
     */
    public boolean batchReentrantLock(Set<String> keys, String reentrantKey, String value, long expireSeconds, long waitSeconds, Runnable process) {
        List<Boolean> result = new ArrayList<>(1);
        boolean reentrantLock = reentrantLock(reentrantKey, value, expireSeconds, waitSeconds, () -> {
            result.add(batchLockAndRun(keys, expireSeconds, waitSeconds, process));
        });
        return reentrantLock && result.get(0);
    }
2、方案二:pipeline批量加锁
  • 不用lua以避免cross slot error
  • 批量加锁失败后立即全部解锁,防止死锁
	/**
	 * 使用redisTemplate
	 * @param script
	 * @param keys
	 * @param args
	 * @param <T>
	 */
	public List<Object> executePipelined(String[] keys, String[] values, long time, TimeUnit timeUnit) {
	      return redisTemplate.executePipelined(new SessionCallback<Object>() {
	           @Override
	           public Object execute(RedisOperations operations) throws DataAccessException {
	               for (int i = 0; i < keys.length; i++) {
	                   operations.opsForValue().setIfAbsent(keys[i], values[i], time, timeUnit);
	               }
	               return null;
	           }
	       });
	}
	
	/**
	 * pipeline批量加锁
	 * @param keys          需要加锁的key
	 * @param values        锁的value,用于解锁时判断是否是当前线程加的锁
	 * @param expireSeconds 锁过期时间
	 * @return boolean 是否加锁成功
	 */
	private boolean tryBatchLock(String[] keys, String[] values, long expireSeconds) {
	    List<Object> results = redisUtil.executePipelined(keys, values, expireSeconds, TimeUnit.SECONDS);
	    if (results == null || results.size() != keys.length || results.contains(false)) {
	        //加锁失败,立即解锁
	        redisUtil.executePipelined(releaseReentrantLockRedisScript, keys, values);
	        return false;
	    }
	    return true;
	}

四、总结

以上是使用redis cluster集群版所遇到的问题以及解决方案,主要在业务实现过程中,需要注意redis cluster key会被划分到不同的槽中的问题,以及redis可重入锁是否会有死锁的问题等。在redis集群环境下还有什么样的问题欢迎补充!文章来源地址https://www.toymoban.com/news/detail-813253.html

到了这里,关于redis — redis cluster集群模式下如何实现批量可重入锁?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • redis 集群模式(redis cluster)介绍

    目录 一    redis cluster 相关定义 1, redis cluster 是什么 2,redis 集群的组成 3,集群的作用 4,集群架构图 二     Redis集群的数据分片 1,哈希槽是什么 2,哈希槽如何排布 3,Redis集群的主从复制模型 4,哈希槽架构图 三     实验模拟redis 集群 1,实验环境 2,清理实验环

    2024年04月13日
    浏览(42)
  • Redis集群(cluster模式)搭建

    目录 1、什么是集群 2、为什么使用 3、集群连接 4、redis cluster 如何分配这六个节点? 5、集群搭建: Redis 集群(包括很多小集群)实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N,即一个小集群存储1/N的数据,每

    2024年02月15日
    浏览(49)
  • 【2023】Redis cluster集群模式搭建

    Redis Cluster是Redis提供的一种分布式方案,可以将数据分散到多个节点上进行存储和处理,提高数据的可用性和性能。Redis Cluster采用哈希槽的方式来分片数据,将整个数据集分为 16384 个哈希槽,每个节点负责一部分哈希槽的数据存储和处理,节点之间通过gossip协议进行信息交

    2024年02月08日
    浏览(54)
  • Redis 主从复制 + 哨兵模式 + Cluster 集群

    redis群集有三种模式: 分别是主从同步/复制、哨兵模式、Cluster,下面会讲解一下三种模式的工作方式,以及如何搭建cluster群集 主从复制是高可用Redis的基础,哨兵和集群都是在主从复制基础上实现高可用的。主从复制主要实现了数据的多机备份,以及对于读操作的负载均衡

    2024年02月09日
    浏览(41)
  • 【Redis】三种集群模式(主从复制、哨兵模式、Cluster)

    redis有三种集群模式,其中主从是最常见的模式。Sentinel 哨兵模式是为了弥补主从复制集群中主机宕机后,主备切换的复杂性而演变出来的。哨兵顾名思义,就是用来监控的,主要作用就是监控主从集群,自动切换主备,完成集群故障转移。cluster 模式是redis官方提供的集群模

    2024年01月21日
    浏览(65)
  • redis7部署集群:包含主从模式、哨兵模式、Cluster集群模式等三种模式

    前言: redis部署集群常见的一般有三种模式:主从模式,Sentinel(哨兵模式),Redis Cluster(高可用Cluster集群),根据不同的需求可自定义选择部署方式。 Redis 主从模式(Replication) 优点: 数据备份:主节点的数据会复制到从节点,提供了数据冗余和一定程度的故障恢复能力

    2024年01月20日
    浏览(106)
  • Redis追本溯源(四)集群:主从模式、哨兵模式、cluster模式

    Redis 有多种集群搭建方式,比如,主从模式、哨兵模式、Cluster 模式。 Redis 主从模式还解决了单点的问题。Redis 主库在进行修改操作的时候,会把相应的写入命令近乎实时地同步给从库,从库回放这些命令,就可以保证自己的数据与主库保持一致。那么,当主库发生宕机的时

    2024年02月14日
    浏览(44)
  • redis高可用——主从复制、哨兵模式、cluster集群

    目录 1、redis群集有三种模式 2、主从复制 2.1、概述: 2.2、Redis主从复制有以下几个重要作用: 2.3、主从复制流程: 2.4、redis主从复制实验 3、哨兵模式. 3.1、概述: 3.2、 哨兵的核心功能: 3.3、哨兵模式原理: 3.6、 哨兵模式的作用; 3.7、故障转移机制 3.8、主节点的选举: 3.9、主

    2024年02月09日
    浏览(43)
  • Redis 7 第八讲 集群模式(cluster)架构篇

    Redis 集群架构图         Redis 集群是一个提供在多个Redis节点间共享数据的程序集;Redis集群可以支持多个master  Redis集群支持多个master,每个master又可以挂载多个slave 读写分离 支持数据的高可用 支持海量数据的读写存储操作 集群自带Sentinel的故障转移机制,内置支持高可用,

    2024年02月10日
    浏览(35)
  • curator实现的zookeeper可重入锁

    Curator是一个Apache开源的ZooKeeper客户端库,它提供了许多高级特性和工具类,用于简化在分布式环境中使用ZooKeeper的开发。其中之一就是可重入锁。 Curator提供了 InterProcessMutex 类来实现可重入锁。以下是使用Curator实现ZooKeeper可重入锁的示例: import org.apache.curator.framework.Curato

    2024年02月15日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包