问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

基于RedisTemplate和线程池实现Redis分布式锁

创作时间:
作者:
@小白创作中心

基于RedisTemplate和线程池实现Redis分布式锁

引用
1
来源
1.
https://www.cnblogs.com/1399z3blog/p/18290473

在分布式系统中,多个服务实例可能同时访问共享资源,因此需要分布式锁来保证数据的一致性和完整性。本文将详细介绍如何使用Redis和线程池实现一个可靠的分布式锁方案。

分布式锁需求

随着系统规模的扩大,后台服务往往不再只是单机部署,而是通过集群的方式运行在多个服务器上。在这种架构下,用户请求会被负载均衡器分发到不同的服务器。如果需要对集群中的某个代码片段进行加锁,传统的synchronized关键字就无法满足需求了,因为synchronized只能控制同一JVM进程内的锁资源。

分布式锁的实现方式有多种,如Redis分布式锁、Zookeeper分布式锁等。本文将重点介绍基于Redis的分布式锁实现方案。

Redis为什么能实现分布式锁?

Redis是一个基于键值对存储的NoSQL数据库,非常适合用来实现分布式锁。作为“锁”,需要保证在某一时刻只有一个线程能够执行特定代码片段。而Redis的主线程模型是单线程的,这意味着在同一时刻只有一个线程在执行Redis数据相关操作。

这种特性使得在Redis中存入锁数据后,其他服务器的线程能够立即获取到锁的状态,从而实现对集群中指定代码片段的加锁。

如何实现Redis分布式锁?

前置知识

  • Redis的setget命令
  • setnx命令:如果键不存在,则设置键值对
  • RedisTemplate的setIfPresent方法对应setnx命令

实现步骤

  1. 在第一个线程访问时,在Redis中添加一项缓存数据作为锁资源
  2. 每个线程在执行该片段开始时,执行setnx命令进行缓存锁资源更新
  3. 如果更新失败(返回false),说明已有线程正在执行该片段,可以选择阻塞线程或给用户反馈提示
  4. 在线程结束时,需要主动删除该锁资源
try{
    // 获取分布式锁
    Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", "resource");
    // 如果锁资源未正常更新,则返回提示
    if(!lock){
        return "系统繁忙";
    }
    // 如果正常更新,则进行业务逻辑代码
    // todo 业务逻辑
    
}finally {
    // 执行完成后,删除锁
    redisTemplate.delete("lock");
}

处理服务挂掉的情况

如果在执行业务逻辑时服务挂掉,锁资源将无法被删除,导致所有后续请求都被阻塞。解决方案是在设置锁时添加过期时间:

try{
    // 获取分布式锁
    Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", "resource", 10, TimeUnit.SECONDS);
    // 如果锁资源未正常更新,则返回提示
    if(!lock){
        return "系统繁忙";
    }
    // 如果正常更新,则进行业务逻辑代码
    // todo 业务逻辑
    
}finally {
    // 执行完成后,删除锁
    redisTemplate.delete("lock");
}

处理运行时间超过过期时间的情况

如果业务逻辑执行时间超过锁的过期时间,会导致并发执行和锁资源意外释放的问题。解决方案包括:

  1. 创建子线程对过期时间进行续命
  2. 为每个线程创建唯一标识,在删除锁时进行校验
String clientID = UUID.randomUUID().toString();
try{
  // 获取分布式锁
  Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", clientID, 10, TimeUnit.SECONDS);
  // 如果锁资源未正常更新,则返回提示
  if(!lock){
      return "系统繁忙";
  }
  // 创建线程续命
  new Thread(new Runnable() {
      @Override
      public void run() {
          // 对 redis的锁过期时间进行续命
      }
  }).start();
  // 如果正常更新,则进行业务逻辑代码
  // todo 业务逻辑
}finally {
  // 执行完成后,判断为自己创建的锁,则删除锁
  if(clientID.equals(redisTemplate.opsForValue().get("lock"))){
      redisTemplate.delete("lock");
  }
}

优化方案

在实际开发中,建议使用ScheduledThreadPoolExecutor来管理线程:

@Resource
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

ScheduledFuture<?> addLockLifeThread = null;
try{
    String clientId = UUID.randomUUID().toString();
    Boolean lock = redisTemplate.opsForValue().setIfPresent(LOCK_KEY, clientId, LOCK_TTL, TimeUnit.SECONDS);
    if (lock == null || !lock) {
        return false;
    }
    addLockLifeThread = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
        lengthenLockLife(clientId);
    }, ADD_LOCK_TTL, ADD_LOCK_TTL, TimeUnit.SECONDS); 

    // todo 完成需要进行加锁的业务逻辑

} catch (Exception e){
    log.info("执行出错:{}", e.getMessage());
}finally{
    if(addLockLifeThread != null){
        addLockLifeThread.cancel(true);
    }
    redisTemplate.delete(LOCK_KEY);
}

public void lengthenLockLife(String clientId) {
    String redisLock = redisTemplate.opsForValue().get(LOCK_KEY);
    if (clientId.equals(redisLock)) {
        redisTemplate.expire(LOCK_KEY, LOCK_TTL, TimeUnit.SECONDS);
        log.info("线程id {},进行续命", clientId);
    }
}

Redis分布式锁工具类

为了方便使用,可以封装一个Redis分布式锁工具类:

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(factory);
        RedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setHashValueSerializer(stringRedisSerializer);
        return redisTemplate;
    }
}

@Component
public class RedisUtil {
    public static final String LOCK_PREFIX = "redis_lock_";
    private static final Long SUCCESS = 1L;
    public static final int LOCK_EXPIRE = 60 * 10;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public boolean lock(String key) {
        String lock = LOCK_PREFIX + key;
        return (Boolean) redisTemplate.execute((RedisCallback) connection -> {
            long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1;
            Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(expireAt).getBytes());
            if (acquire) {
                return true;
            } else {
                byte[] value = connection.get(lock.getBytes());
                if (Objects.nonNull(value) && value.length > 0) {
                    long expireTime = Long.parseLong(new String(value));
                    if (expireTime < System.currentTimeMillis()) {
                        byte[] oldValue = connection.getSet(lock.getBytes(), String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes());
                        return Long.parseLong(new String(oldValue)) < System.currentTimeMillis();
                    }
                }
            }
            return false;
        });
    }

    public boolean getLock(String lockKey, String value, Integer expireTime){
        if(StringUtils.isTrimBlank(expireTime)){
            expireTime = LOCK_EXPIRE;
        }
        try{
            String script = "if redis.call('setNx',KEYS[1],ARGV[1]) then if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end end";
            RedisScript<String> redisScript = new DefaultRedisScript<>(script, String.class);
            Object result = redisTemplate.execute(redisScript, Collections.singletonList(LOCK_PREFIX+lockKey),value,String.valueOf(expireTime));
            if(SUCCESS.equals(result)){
                return true;
            }
        }catch(Exception e){
            return false;
        }
        return false;
    }

    public boolean releaseLock(String lockKey, String value) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        RedisScript<String> redisScript = new DefaultRedisScript<>(script, String.class);
        Object result = redisTemplate.execute(redisScript, Collections.singletonList(LOCK_PREFIX+lockKey), value);
        if (SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

    public void delete(String key) {
        redisTemplate.delete(LOCK_PREFIX+key);
    }
}

通过以上方案,可以实现一个健壮且灵活的Redis分布式锁机制,适用于各种需要保证数据一致性的场景。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号