定时任务系列之 ScheduledExecutorService

本贴最后更新于 1699 天前,其中的信息可能已经渤澥桑田

回顾

上篇文章我们讲了选择定时任务框架的一个标准。提到了 crontab & Timer 的使用和实现原理。这篇我们讲 ScheduledExecutorService & Spring Scheduler。

ScheduledExecutorService

上一篇我们提到了 Timer。Timer 的一个定时器需要一个线程去处理,如果每个任务都交给一个新的 Timer 处理,线程创建等消耗资源。如果对一个 Timer 设置了多个任务,而有一个任务没有捕获住异常,那么会导致后面的任务都无法执行了。所以推荐使用 ScheduledExecutorService 来替换 Timer。【如果你使用的 IDE 有 Alibaba Java Coding Guidelines,你会发现它会直接给你提示 Timer 需要替换成 ScheduledExecutorService。 墙裂推荐装这个 idea 插件~】

代码示例

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("schedule-thread-%d").setDaemon(false).build();
// 20是核心线程数量,后面参数是一个线程工厂,采用了建造者模式创建。
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(20, threadFactory);
Runnable task = () -> log.warn("ScheduledExecutorService 5s thread-name:{}", Thread.currentThread().getName());
// 调度的方法里面参数,1-实现runnable接口的方法。2-超时时间。3-超时时间单位。
executorService.schedule(task, 5, TimeUnit.SECONDS);

源码解读

首先我们需要搞懂 ScheduledExecutorService 实现类 ScheduledThreadPoolExecutor。

// 其构造就是一般的线程池构造函数,但是里面的queue是自己的内部类,下面会提到。
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

// 调度的方法
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    // 参数校验
    if (command == null || unit == null)
        throw new NullPointerException();
    // 包装一下Runnable类,变成一个Future。
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
    // 延时执行 *【重点】*
    delayedExecute(t);
    // future对象返回,可以异步继续执行其他代码逻辑。
    return t;
}

// 延迟执行
private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 检查状态
    if (isShutdown())
        reject(task);
    else {
        // 正常状态下,先把任务加入queue。【其实没有这么单纯 : )】
        super.getQueue().add(task);
	// 再次检查状态 状态不对赶紧撤销。你要是正在执行 就执行吧。。。
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
	    // 预先 生成执行任务的线程【还是比较单纯的】
            ensurePrestart();
    }
}

// 都是父类那一套,此处不是重点。
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

// 我们一层层把这个“单纯”的add解开 : )
public boolean add(Runnable e) {
    return offer(e);
}

// 一个add 整了这么多代码。我们慢慢看
public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    // 处理queue的时候先给加个锁。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
	    // 容量不够 扩容
            grow();
        size = i + 1;
	// 添加第一个任务,小伙子 你就是第一个要执行的人了
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
	    // 当不是第一个任务的时候怎么处理呢。。。【重点】
            siftUp(i, e);
        }
        if (queue[0] == e) {
            leader = null;
	    // 当一个新的任务在队列的最前面并且可移植性,或者一个新线程可能需要成为leader时,发信号
	   // 上文 private final Condition available = lock.newCondition();
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

// 如果不是queue里面的唯一任务,就需要这么搞
private void siftUp(int k, RunnableScheduledFuture<?> key) {
    // 二话不说 先来一个queue循环,k我们上面看到是queue size。
    while (k > 0) {
	// 堆排序。compareTo方法肯定重写了!下面我们把这个方法给找出来
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    // 上面介绍过了,不提了。
    setIndex(key, k);
}

// Future对象的比较方法
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
	// 其实不用看 也知道是用的时间 : ) 
	// 将时间最靠近现在的排在前面,方便线程从前往后拿任务执行。
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

// 下面是DelayedWorkQueue的获取任务实现
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
	    // 任务没有,那么等待
            if (first == null)
                available.await();
            else {
		// 查看任务是不是到了执行时间了
                long delay = first.getDelay(NANOSECONDS);
		// 到时间了,返回任务
                if (delay <= 0)
                    return finishPoll(first);
		// 否则,把任务置为null,继续等着吧
                first = null; // don't retain ref while waiting
		// leader就是在队列头部等待执行的线程 如果为null 那么就等待吧,等待唤醒
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
			// 还是要等待delay的时间才能执行,继续检查状态。
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

OK, 我们终于看完了代码的分析...

优势

  • 上面我们已经提到了,比起 Timer,ScheduledExecutorService 可以合理利用线程池资源。在一个任务执行失败时,不会影响其他任务的执行。
  • 可以设置周期定时执行,也可以直接设置触发时间。

劣势

  • 不支持 cron 这种灵活配置。
  • 代码实现不是很灵活,修改需要重启项目。
  • 功能单一。可视化、监控等都不支持,也不支持分布式等等功能。

最后

本来这篇文章开始前想着还写个 spring scheduler,但是,越写越长。。。后面我们慢慢学习更成熟的框架吧~


  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3168 引用 • 8207 回帖
  • 定时任务
    14 引用 • 27 回帖
  • cron
    11 引用 • 3 回帖
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    941 引用 • 1458 回帖 • 150 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...