Redis 分布式锁与延时队列

本贴最后更新于 1658 天前,其中的信息可能已经时移世改

《Redis 深度历险》读书笔记 -- 之分布式锁与延时队列

1、分布式锁 --Redis

参考博文:https://juejin.im/post/5bbb0d8df265da0abd3533a5#heading-1
参考书籍:https://book.douban.com/subject/30386804/

首先
1、原子操作是什么?

原子操作是指不会被线程调度打断的操作。这种操作一旦开始,就会一直运行到结束。中间不会有任何线程切换。

2、CAP 原则是什么?

CAP 原则又称 CAP 定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

3、什么是分布式锁?

当在分布式模型下,数据只有一份(或有限制),此时需要利用锁技术来控制某一时刻修改数据的进程数。这种锁即为分布式锁。

4、为什么需要分布式锁?

A : 上一条所提到的正确性:加分布式锁可以避免破坏正确性的发生,如果俩个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。
B : 效率问题:使用分布式锁可以避免不同节点重复的工作,避免浪费资源。

5、 分布式锁有那些特点?

  • 互斥性
  • 可重入性
  • 锁超时
  • 高效性
  • 高可用性
  • 支持阻塞和非阻塞

6、 常见的实现分布式锁的方式有哪些?

  • Mysql
  • Memcached
  • Redis
  • Zookeeper : 利用 Zookeeper 的顺序临时节点,来实现分布式锁和等待队列。Zookeeper 设计的初衷,就是为了实现分布式锁服务的。
  • Chubby :Google 公司实现的粗粒度分布式锁服务,底层利用了 Paxos 一致性算法。

Redis 分布式锁的实现

分布式锁本质上要实现的目标就是在 redis 里面占一个”坑“,当别的进程也要进来占坑时,发现那里已经有一根“大萝卜”了,就只好放弃或者稍后再试。

setnx(set if not exists)
  • setnx 指令只允许一被一个客户端使用(占坑)。先来先占,用完了之后再调用 del 指令释放掉(坑)。 setnx key value
127.0.0.1:6379> setnx lock:codehole true
(integer) 1
127.0.0.1:6379> setnx lock:codehole aaa
(integer) 0
127.0.0.1:6379> del lock:codehole
(integer) 1
127.0.0.1:6379> setnx lock:codehole aaa
(integer) 1
setnx + expire
  • 单纯的 setnx 指令的话,如果逻辑处理开始时上锁成功,但是逻辑执行过程中出现异常,就会导致 del 命令永远得不到执行,故而成了死锁
  • 所以我们可以在拿到锁之后,给锁设置一个过期时间,即使逻辑处理过程中出现异常了,锁也会在指定时间内自动释放
  • 但是这样做得话,还是会有问题的,首先如果程序逻辑在锁规定的时间内没有执行完,那么这个锁还是会照常释放掉,这样别的程序就会得到锁,从而造成了俩个程序共享数据。-------超时问题
  • 还有一个问题就是,如果在执行 setnx 和 expire 指令之间,客户端突然失去了服务器连接,那么 expire 就会得不到执行,因为毕竟这是俩条指令,俩条指令的输入是有时间间隔的。
127.0.0.1:6379> setnx lock:codehole true
(integer) 1
127.0.0.1:6379> expire lock:codehole 5
(integer) 1
...等待5秒
127.0.0.1:6379> get lock:codehole
(nil)
set key value ex 5 nx
  • set key value ex 5 nx 是 redis 为了解决上述 expire 可能得不到执行而推出的,setnx 和 expire 组合在一起的原子指令。
超时问题
  • 相对安全一点的解决方案:将 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key。
  • 但是匹配 key 和删除 value 都不是一个原子操作,可以采用 lua 脚本来处理。
可重入性
  • 线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程的多次加锁,那么这个锁就是可重入的

关于 ThreadLocal 类我也不怎么懂。。
可参考博文:

JAVA 版本的可重入锁

import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;

//redis可重入锁,java实现
public class RedisWithReentrantLock {
	/*
	*ThreadLocal是一个关于创建线程局部变量的类。
	*通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。
	* 而使用ThreadLocal创建的变量只能被当前线程访问,其他线程则无法访问和修改。
	*/
	private ThreadLocal<Map<String , Integer>> lockers = new ThreadLocal<>();
	
	private Jedis jedis;
	
	public RedisWithReentrantLock(Jedis jedis) {
		this.jedis = jedis;
	}
	
	private boolean _lock(String key){
		//使用 set ket value ex number nx 指令上锁(“给萝卜占个坑”)
		return jedis.set(key , "","nx" , "ex" ,5L) != null;
	}
	
	private void _unlock(String key){
		jedis.del(key);
	}
	
	private Map<String , Integer> currentLockers(){
		Map<String , Integer> refs = lockers.get();
		if (refs != null){
			return refs;
		}
		lockers.set(new HashMap<>());
		return lockers.get();
	}
	
	public boolean lock(String key ){
		Map<String , Integer> refs = currentLockers();
		Integer refCnt = refs.get(key);
		if (refCnt != null){ //如果加过锁
			refs.put(key , refCnt + 1); //那就在threadlocal再加一把锁
			return true;
		}
		boolean ok = this._lock(key); //这里是没加过锁,那就进行加锁
		if (!ok){
			return false; //加锁不成功,返回false
		}
		refs.put(key , 1 ); //加锁成功,则在threalocal里面加上加锁信息
		return true;
	}
	
	public boolean unlock(String key){
		Map<String , Integer> refs = currentLockers();
		Integer refCnt = refs.get(key);
		if (refCnt == null){ //如果没有加锁的对象,那就不需要解锁
			return false;
		}
		refCnt -= 1; //如果有加锁的对象,那就去掉一层锁
		if (refCnt > 0 ){
			refs.put(key , refCnt); //更新threadlocal对象信息
		}else { //如果刚才减锁的对象只有一层锁,减一之后就没有锁了,那么就把threalocal里面的对象移除
			refs.remove(key);
			this._unlock(key);
		}
		return true;
	}

	public static void main(String[] args) {
		Jedis jedis = JedisTest.getJedis();
		RedisWithReentrantLock redis = new RedisWithReentrantLock(jedis);
		System.out.println(redis.lock("codehole"));
		System.out.println(redis.lock("codehole"));
		System.out.println(redis.unlock("codehole"));
		System.out.println(redis.unlock("codehole"));
	}
}

2、延时队列

异步消息队列
  • Redis 中 list(列表)的 rpush 与 lpop 、 lpush 与 rpop 结合使用,常用做异步消息队列,可以支持多个生产者和多个消费者并发进出消息,每个消费者都是不同的列表元素
队列空了怎么办 ---> 阻塞读(blocking)
  • 客户端使用 pop 操作来对队列进行轮询,获取消息然后进行处理。
  • 如果队列空了,就会得不到消息,继而会一直 pop 服务器,从而陷入死循环,产生空轮询
    解决方案
  1. 让线程睡一会(1s) ,即 Thread.sleep(1000)
  2. 阻塞读:blpop/brpop , 阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦有数据了就会立刻醒过来,几乎没有消息延迟。但是如果阻塞的时间过长,服务器就会主动断开连接,所以编写代码时一定要捕获代码后接着重试。

#####(分布式锁)加锁没成功怎么办---> 延时队列
解决方案

  1. 直接抛出异常,通知用户稍后重试。
  2. sleep 一会,然后重试
  3. 将请求转移至延时队列,过一会重试

延时队列的实现

延时队列可以通过 Redis 的 zset(有序列表来实现)。我们将消息序列化成一个字符串作为 zset 的 value,这个消息的到期处理时间作为 score,然后用多个线程轮询 zset 获取到期的任务进行处理。多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,所以需要考虑并发争抢任务,确保任务不会被多次执行。

JAVA 版本的延时队列

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.kk.redis.JedisFactory;
import redis.clients.jedis.Jedis;

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

public class RedisDelayingQueue<T> {

    static class TaskItem<T>{
        public String id;
        public T msg;
    }

    private Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();

    private Jedis jedis;
    private String queueKey;

    public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    public void delay(T msg){//往zset里放值
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString();//分配唯一的UUID
        task.msg = msg;
        String s = JSON.toJSONString(task);//将对象序列化
        jedis.zadd(queueKey , System.currentTimeMillis() + 5000, s);//塞入延时队列 , score为当前毫秒值+5000
    }

    public void loop(){
        while(!Thread.interrupted()){
            //根据分值区间,从头往后取一个值
            Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis() , 0 , 1);
            if (values.isEmpty()){//如果值为空,就是队列里面没有东西的时候
                try{
                    Thread.sleep(500);//睡五秒
                }catch (InterruptedException e){
                    break;
                }
                continue;//然后继续循环,继续取值
            }
            String s = values.iterator().next();//如果有值得话,就取出一个来
            if (jedis.zrem(queueKey, s) > 0){//如果能够成功删除则证明该线程抢到了使用权
                TaskItem<T> task = JSON.parseObject(s , TaskType);
                this.handleMsg(task.msg);
            }
        }
    }

    public void handleMsg(T msg){
        System.out.println(msg);
    }

    public static void main(String[] args) {
        Jedis jedis = JedisFactory.getJedis();
        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
        Thread producer = new Thread(){//生产者线程
            public void run(){
                for (int i = 0 ; i < 10 ; i++){
                    queue.delay("codehole" + i);
                }
            }
        };
        Thread consumer = new Thread(){//消费者线程
            public void run(){
                queue.loop();
            }
        };
        producer.start();
        consumer.start();
        try{
            producer.join();
            Thread.sleep(6000);
            consumer.interrupt();
            consumer.join();
        }catch (InterruptedException e){
            System.out.println("出错了");
        }
    }
}

可再优化

  • 可以使用 lua 脚本 ,将 zrangebyscore 和 zrem 俩条指令变成一个原子操作,这样多个进程之间抢任务时就不会造成浪费。
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    284 引用 • 247 回帖 • 175 关注
  • 中间件
    4 引用 • 2 回帖
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3168 引用 • 8207 回帖
  • NoSQL
    11 引用 • 4 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...