《Redis 深度历险》读书笔记 -- 之分布式锁与延时队列 1、分布式锁 --Redis 参考博文:https://juejin.im/post/5bbb0d8df265da0abd3533a5#heading-1 参考书籍:[链接] 首先 1、原子操作是什么? 原子操作是指不会被线程调度打断的操作。这种操作一旦开始 ..

Redis 分布式锁与延时队列

《Redis 深度历险》读书笔记 -- 之分布式锁与延时队列

1、分布式锁 --Redis

参考博文:https://juejin.im/post/5bbb0d8df265da0abd3533a5#heading-1
参考书籍:https://book.douban.com/subject/30386804/

首先
1、原子操作是什么?

原子操作是指不会被线程调度打断的操作。这种操作一旦开始,就会一直运行到结束。中间不会有任何线程切换。

2、CAP 原则是什么?

CAP 原则又称 CAP 定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

3、什么是分布式锁?

当在分布式模型下,数据只有一份(或有限制),此时需要利用锁技术来控制某一时刻修改数据的进程数。这种锁即为分布式锁。

4、为什么需要分布式锁?

A : 上一条所提到的正确性:加分布式锁可以避免破坏正确性的发生,如果俩个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。
B : 效率问题:使用分布式锁可以避免不同节点重复的工作,避免浪费资源。

5、 分布式锁有那些特点?

6、 常见的实现分布式锁的方式有哪些?

Redis 分布式锁的实现

分布式锁本质上要实现的目标就是在 Redis 里面占一个”坑“,当别的进程也要进来占坑时,发现那里已经有一根“大萝卜”了,就只好放弃或者稍后再试。

setnx(set if not exists)
127.0.0.1:6379> setnx lock:codehole true
(integer) 1
127.0.0.1:6379> setnx lock:codehole aaa
(integer) 0
127.0.0.1:6379> del lock:codehole
(integer) 1
127.0.0.1:6379> setnx lock:codehole aaa
(integer) 1
setnx + expire
127.0.0.1:6379> setnx lock:codehole true
(integer) 1
127.0.0.1:6379> expire lock:codehole 5
(integer) 1
...等待5秒
127.0.0.1:6379> get lock:codehole
(nil)
set key value ex 5 nx
超时问题
可重入性

关于ThreadLocal类我也不怎么懂。。
可参考博文:

Java 版本的可重入锁

import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;

//redis可重入锁,java实现
public class RedisWithReentrantLock {
	/*
	*ThreadLocal是一个关于创建线程局部变量的类。
	*通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。
	* 而使用ThreadLocal创建的变量只能被当前线程访问,其他线程则无法访问和修改。
	*/
	private ThreadLocal<Map<String , Integer>> lockers = new ThreadLocal<>();
	
	private Jedis jedis;
	
	public RedisWithReentrantLock(Jedis jedis) {
		this.jedis = jedis;
	}
	
	private boolean _lock(String key){
		//使用 set ket value ex number nx 指令上锁(“给萝卜占个坑”)
		return jedis.set(key , "","nx" , "ex" ,5L) != null;
	}
	
	private void _unlock(String key){
		jedis.del(key);
	}
	
	private Map<String , Integer> currentLockers(){
		Map<String , Integer> refs = lockers.get();
		if (refs != null){
			return refs;
		}
		lockers.set(new HashMap<>());
		return lockers.get();
	}
	
	public boolean lock(String key ){
		Map<String , Integer> refs = currentLockers();
		Integer refCnt = refs.get(key);
		if (refCnt != null){ //如果加过锁
			refs.put(key , refCnt + 1); //那就在threadlocal再加一把锁
			return true;
		}
		boolean ok = this._lock(key); //这里是没加过锁,那就进行加锁
		if (!ok){
			return false; //加锁不成功,返回false
		}
		refs.put(key , 1 ); //加锁成功,则在threalocal里面加上加锁信息
		return true;
	}
	
	public boolean unlock(String key){
		Map<String , Integer> refs = currentLockers();
		Integer refCnt = refs.get(key);
		if (refCnt == null){ //如果没有加锁的对象,那就不需要解锁
			return false;
		}
		refCnt -= 1; //如果有加锁的对象,那就去掉一层锁
		if (refCnt > 0 ){
			refs.put(key , refCnt); //更新threadlocal对象信息
		}else { //如果刚才减锁的对象只有一层锁,减一之后就没有锁了,那么就把threalocal里面的对象移除
			refs.remove(key);
			this._unlock(key);
		}
		return true;
	}

	public static void main(String[] args) {
		Jedis jedis = JedisTest.getJedis();
		RedisWithReentrantLock redis = new RedisWithReentrantLock(jedis);
		System.out.println(redis.lock("codehole"));
		System.out.println(redis.lock("codehole"));
		System.out.println(redis.unlock("codehole"));
		System.out.println(redis.unlock("codehole"));
	}
}

2、延时队列

异步消息队列
队列空了怎么办 ---> 阻塞读(blocking)
  1. 让线程睡一会(1s) ,即 Thread.sleep(1000)
  2. 阻塞读:blpop/brpop , 阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦有数据了就会立刻醒过来,几乎没有消息延迟。但是如果阻塞的时间过长,服务器就会主动断开连接,所以编写代码时一定要捕获代码后接着重试。

#####(分布式锁)加锁没成功怎么办---> 延时队列
解决方案

  1. 直接抛出异常,通知用户稍后重试。
  2. sleep 一会,然后重试
  3. 将请求转移至延时队列,过一会重试

延时队列的实现

延时队列可以通过 Redis 的 zset(有序列表来实现)。我们将消息序列化成一个字符串作为 zset 的 value,这个消息的到期处理时间作为 score,然后用多个线程轮询 zset 获取到期的任务进行处理。多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,所以需要考虑并发争抢任务,确保任务不会被多次执行。

Java 版本的延时队列

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.kk.redis.JedisFactory;
import redis.clients.jedis.Jedis;

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

public class RedisDelayingQueue<T> {

    static class TaskItem<T>{
        public String id;
        public T msg;
    }

    private Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();

    private Jedis jedis;
    private String queueKey;

    public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    public void delay(T msg){//往zset里放值
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString();//分配唯一的UUID
        task.msg = msg;
        String s = JSON.toJSONString(task);//将对象序列化
        jedis.zadd(queueKey , System.currentTimeMillis() + 5000, s);//塞入延时队列 , score为当前毫秒值+5000
    }

    public void loop(){
        while(!Thread.interrupted()){
            //根据分值区间,从头往后取一个值
            Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis() , 0 , 1);
            if (values.isEmpty()){//如果值为空,就是队列里面没有东西的时候
                try{
                    Thread.sleep(500);//睡五秒
                }catch (InterruptedException e){
                    break;
                }
                continue;//然后继续循环,继续取值
            }
            String s = values.iterator().next();//如果有值得话,就取出一个来
            if (jedis.zrem(queueKey, s) > 0){//如果能够成功删除则证明该线程抢到了使用权
                TaskItem<T> task = JSON.parseObject(s , TaskType);
                this.handleMsg(task.msg);
            }
        }
    }

    public void handleMsg(T msg){
        System.out.println(msg);
    }

    public static void main(String[] args) {
        Jedis jedis = JedisFactory.getJedis();
        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
        Thread producer = new Thread(){//生产者线程
            public void run(){
                for (int i = 0 ; i < 10 ; i++){
                    queue.delay("codehole" + i);
                }
            }
        };
        Thread consumer = new Thread(){//消费者线程
            public void run(){
                queue.loop();
            }
        };
        producer.start();
        consumer.start();
        try{
            producer.join();
            Thread.sleep(6000);
            consumer.interrupt();
            consumer.join();
        }catch (InterruptedException e){
            System.out.println("出错了");
        }
    }
}

可再优化

  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    149 引用 • 216 回帖 • 776 关注
  • 中间件
    3 引用 • 2 回帖
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    2355 引用 • 7823 回帖 • 890 关注
  • NoSQL
    10 引用 • 3 回帖
回帖
请输入回帖内容...