版权声明:本文为博主原创文章,遵循CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接: http://blog.shiyi.online/articles/2019/08/14/1565796937508.html 应用场景 创建订单 10 分钟之后自动支付 订单超时取消 .......等等 ..

基于 Redis 实现 DelayQueue 延迟队列设计方案

<font face="黑体" color=green size=2> 版权声明:本文为博主原创文章,遵循CC 4.0 by-sa
版权协议,转载请附上原文出处链接和本声明。
本文链接: http://blog.shiyi.online/articles/2019/08/14/1565796937508.html

应用场景


实现方式


基于 Redis 自研延迟队列


既然上面没有很好的解决方案,因为 Redis 的 zset、list 的特性,我们可以利用 Redis 来实现一个延迟队列 RedisDelayQueue

设计目标

数据结构

设计图

Redis 延迟队列设计图 2.jpg

任务的生命周期

  1. 新增一个Job,会在Redis_Delay_Table中插入一条数据,记录了业务消费方的 数据结构; RD_ZSET_BUCKET 也会插入一条数据,记录了执行时间戳;
  2. 搬运线程会去RD_ZSET_BUCKET中查找哪些执行时间戳runTimeMillis比现在的时间小;将这些记录全部删除;同时会解析出来每个任务的Topic是什么,然后将这些任务pushTopic对应的列表RD_LIST_TOPIC中;
  3. 每个 Topic 的 List 都会有一个监听线程去批量获取 List 中的待消费数据;获取到的数据全部扔给这个Topic的消费线程池
  4. 消息线程池执行会去 Redis_Delay_Table 查找数据结构,返回给回调接口,执行回调方法;

以上所有操作,都是基于 Lua 脚本做的操作,Lua 脚本执行的优点在于,批量命令执行具有原子性,事务性, 并且降低了网络开销,毕竟只有一次网络开销;


搬运线程操作流程图

搬运操作流程图.jpg

设计细节


搬运操作

1.搬运操作的时机

为了避免频繁的执行搬运操作, 我们基于 wait(time)/notify 的方式来通知执行搬运操作;

搬运线程执行流程图.jpg

我们用一个AtomicLong nextTime 来保存下一次将要搬运的时间;服务启动的时候nextTime=0;所以肯定比当前时间小,那么就会先去执行一次搬运操作,然后返回搬运操作之后的ZSET的表头时间戳,这个时间戳就是下一次将要执行的时间戳, 把这个时间戳赋值给 nextTime; 如果表中没有元素了则将nextTime=Long.MaxValue ;因为 while 循环,下一次又会跟当前时间对比;如果nextTime比当前时间大,则说明需要等待; 那么我们wait(nextTime-System.currentTimeMills()); 等到时间到了之后,再次去判断一下,就会比当前时间小,就会执行一次搬运操作;

那么当有新增延迟任务 Job 的时间怎么办,这个时候又会将当前新增 Job 的执行时间戳跟nextTime做个对比;如果小的话就重新赋值;
重新赋值之后,还是调用一下 notifyAll() 通知一下搬运线程;让他重新去判断一下 新的时间是否比当前时间小;如果还是大的话,那么就继续wait(nextTime-System.currentTimeMills()); 但是这个时候wait的时间又会变小;更精准;

2.一次搬运操作的最大数量
Redis 的执行速度非常快,在一个 Lua 里面循环遍历 1000 个 10000 个根本没差; 而且是在 Lua 里面操作,就只有一次网络开销;一次操作多少个元素根本就不会是问题;


搬运操作的防护机制

1.每分钟唤醒定时线程

在消费方多实例部署的情况下, 如果某一台机器挂掉了,但是这台机器的 nextTime 是最小的,就在一分钟之后( 新增 job 的时候落到这台机器,刚好时间戳很小), 其他机器可能是 1 个小时之后执行搬运操作; 如果这台机器立马重启,那么还会立马执行一次搬运操作;万一他没有重启;那可能就会很久之后才会搬运;
所以我们需要一种防护手段来应对这种极端情况;
比如每分钟将 nextTime=0;并且唤醒 wait;
那么就会至少每分钟会执行一次搬运操作! 这是可以接受的


LrangeAndLTrim 批量获取且删除待消费任务

1.执行时机以及如何防止频繁请求 Redis
这是一个守护线程,循环去做这样的操作,把拿到的数据给线程池去消费;
但是也不能一直不停的去执行操作,如果 list 已经没有数据了去操作也没有任何意义,不然就太浪费资源了,幸好 List 中有一个BLPOP阻塞原语,如果 list 中有数据就会立马返回,如果没有数据就会一直阻塞在那里,直到有数据返回,可以设置阻塞的超时时间,超时会返回 NULL;
第一次去获取 N 个待消费的任务扔进到消费线程池中;如果获取到了 0 个,那么我们就立马用BLPOP来阻塞,等有元素的时候 BLPOP 就返回数据了,下次就可以尝试去LrangeAndLTrim一次了. 通过BLPOP阻塞,我们避免了频繁的去请求 redis,并且更重要的是提高了实时性;

2.批量获取的数量和消费线程池的阻塞队列

执行上面的一次获取 N 个元素是不定的,这个要看线程池的 maxPoolSize 最大线程数量; 因为避免消费的任务过多而放入线程池的阻塞队列, 放入阻塞队列有宕机丢失任务的风险,关机重启的时候还要讲阻塞队列中的任务重新放入 List 中增加了复杂性;

所以我们每次LrangeAndLTrim获取的元素不能大于当前线程池可用的线程数; 这样的一个控制可用用信号量Semaphore来做


Codis 集群对 BLPOP 的影响

如果 Redis 集群用了 codis 方案或者 Twemproxy 方案; 他们不支持 BLPOP 的命令;
codis不支持的命令集合
那么就不能利用 BLPOP 来防止频繁请求 redis;那么退而求其次改成每秒执行一次 LrangeAndLTrim 操作;


集群对 Lua 的影响

Lua 脚本的执行只能在单机器上, 集群的环境下如果想要执行 Lua 脚本不出错,那么 Lua 脚本中的所有 key 必须落在同一台机器;
为了支持集群操作 Lua,我们利用 hashtag; 用{}把三个 jey 的关键词包起来;
{projectName}:Redis_Delay_Table
{projectName}:Redis_Delay_Table
{projectName}:RD_LIST_TOPIC
那么所有的数据就会在同一台机器上了


重试机制

消费者回调接口如果抛出异常了,或者执行超时了,那么会将这个 Job 重新放入到 RD_LIST_TOPIC 中等待被下一次消费;默认重试 2 次;可以设置不重试;

超时机制

超时机制的主要思路都一样,就是监听一个线程的执行时间超过设定值之后抛出异常打断方法的执行;

这是使用的方式是 利用 Callable 接口实现异步超时处理

public class TimeoutUtil {

    /**执行用户回调接口的 线程池;    计算回调接口的超时时间           **/
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * 有超时时间的方法
     * @param timeout 时间秒
     * @return
     */
    public static void timeoutMethod(long timeout, Function function) throws InterruptedException, ExecutionException, TimeoutException {
        FutureTask futureTask = new FutureTask(()->(function.apply("")));
        executorService.execute(futureTask);
        //new Thread(futureTask).start();
        try {
            futureTask.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            //e.printStackTrace();
            futureTask.cancel(true);
            throw e;
        }

    }
}

这种方式有一点不好就是太费线程了,相当于线程使用翻了一倍;但是相比其他的方式,这种算是更好一点的

优雅停机

在 Jvm 那里注册一个 Runtime.getRuntime().addShutdownHook(Runnable)停机回调接口;在这里面做好善后工作;

优雅停止线程一般是用下面的方式
①、 while(!stop)的形式 用标识位来停止线程
②.先 调用 executor.shutdown(); 阻止接受新的任务;然后等待当前正在执行的任务执行完; 如果有阻塞则需要调用 executor.shutdownNow()强制结束;所以要给一个等待时间;

  /**
     * shutdownNow 终止线程的方法是通过调用Thread.interrupt()方法来实现的
     * 如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。
     * 上面的情况中断之后还是可以再执行finally里面的方法的;
     * 但是如果是其他的情况 finally是不会被执行的
     * @param executor
     */
    public static void closeExecutor(ExecutorService executor, String executorName) {
        try {
            //新的任务不进队列
            executor.shutdown();
            //给10秒钟没有停止完强行停止;
            if(!executor.awaitTermination(20, TimeUnit.SECONDS)) {
                logger.warn("线程池: {},{}没有在20秒内关闭,则进行强制关闭",executorName,executor);
                List<Runnable> droppedTasks = executor.shutdownNow();
                logger.warn("线程池: {},{} 被强行关闭,阻塞队列中将有{}个将不会被执行.", executorName,executor,droppedTasks.size() );
            }
            logger.info("线程池:{},{} 已经关闭...",executorName,executor);
        }  catch (InterruptedException e) {
            logger.info("线程池:{},{} 打断...",executorName,executor);
        }
    }

BLPOP 阻塞的情况如何优雅停止监听 Redis 的线程

如果不是在codis集群的环境下,BLPOP 是可以很方便的阻塞线程的;但是停机的时候可能会有点问题;

假如正在关机,当前线程正在BLPOP阻塞, 那关机线程等我们 20 秒执行, 刚好在倒数 1 秒的时候BLPOP获取到了数据,丢给消费线程去消费;如果消费线程 1 秒执行不完,那么 20 秒倒计时到了,强制关机,那么这个任务就会被丢失了; 怎么解决这个问题呢?

①. 不用BLPOP, 每次都 sleep 一秒去调用LrangeAndLTrim操作;
②.关机的时候杀掉 Redis 的 blpop 客户端; 杀掉之后 BLPOP 会立马返回 null; 进入下一个循环体;


不足

总结

  1. 实时性
    正常情况下 消费的时间误差不超过 1 秒钟; 极端情况下,一台实例宕机,另外的实例 nextTime 很迟; 那么最大误差是 1 分钟; 真正的误差来自于业务方的接口的消费速度

  2. QPS
    完全视业务方的消费速度而定; 延迟队列不是瓶颈

项目已经开源 ,源码地址: RedisDelayQueue

接入方法:

RedisDelayQueue接入

  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    143 引用 • 209 回帖 • 803 关注
  • 延时队列
    2 引用 • 7 回帖
7 回帖   
请输入回帖内容...
  • InkDP  

    图全被劫持了

  • shirenchuang  

    哎哟 我去 还真是
    我想想图片传哪儿去

  • shirenchuang  

    重新上传了 ❤️

  • clhey  

    不错不错,很有参考价值

  • shirenchuang  

    谢谢

  • clhey  

    GitHub 上的项目删了吗,方便发一份源码不

  • shirenchuang  

    删了, 可以关注下我公众号获取

请输入回帖内容 ...