type
status
date
slug
summary
tags
category
icon
password
1、线程介绍
1.1 线程介绍
在进入 Java 线程主题之前,有必要讲解一下线程库(
Thread library
) 的概念。线程库就是为开发人员提供创建和管理线程的一套 API。三个主要的线程库如下:- POSIX Pthreads:可以作为用户或内核库提供,作为 POSIX 标准的扩展
- Win32 线程:用于 Window 操作系统的内核级线程库
- Java 线程:Java 线程 API 通常采用宿主系统的线程库来实现,也就是说在 Win 系统上,Java 线程 API 通常采用 Win API 来实现,在 UNIX 类系统上,采用 Pthread 来实现。
在 JDK 1.2 之前,Java 线程是基于称为 "绿色线程"(Green Threads)的用户级线程实现的,也就是说程序员大佬们为 JVM 开发了自己的一套线程库或者说线程管理机制。
而在 JDK 1.2 及以后,JVM 选择了更加稳定且方便使用的操作系统原生的内核级线程,通过系统调用,将线程的调度交给了操作系统内核。而对于不同的操作系统来说,它们本身的设计思路基本上是完全不一样的,因此它们各自对于线程的设计也存在种种差异,所以 JVM 中明确声明了:虚拟机中的线程状态,不反应任何操作系统中的线程状态。
也就是说,在 JDK 1.2 及之后的版本中,Java 的线程很大程度上依赖于操作系统采用什么样的线程模型,这点在不同的平台上没有办法达成一致,JVM 规范中也并未限定 Java 线程需要使用哪种线程模型来实现,可能是一对一,也可能是多对多或多对一。
Java 中线程的本质,其实就是操作系统中的线程,其线程库和线程模型很大程度上依赖于操作系统(宿主系统)的具体实现,比如在 Windows 中 Java 就是基于 Wind32 线程库来管理线程,且 Windows 采用的是一对一的线程模型。
1.2 创建线程
有三种创建线程的方法:
- 实现
Runnable
接口,这种方式没有返回结果。
- 实现
Callable
接口,这种方式有返回结果,一般需要配合FutureTask
使用。
- 继承
Thread
类。Thread
类实现了Runnable
接口,所以直接重写run()
方法即可。这种方式的优点是:如果需要访问当前线程的 API 方法,则无需使用Thread.currentThread()
方法,直接使用this
即可获得当前线程。
1.3 线程的状态切换
Java 中线程的状态分为六种:
初始(NEW)
:新创建了一个线程对象,但还没有调用start()
方法。
运行(RUNNABLE)
:Java线程中将就绪(ready
)和运行中(running
)两种状态笼统的称为运行
。- 线程对象创建后,其他线程(比如main线程)调用了该对象的
start()
方法。该状态的线程位于可运行线程池中,等待被线程调度选中获取CPU的使用权,此时处于就绪状态(ready
)。 - 就绪状态的线程在获得CPU时间片后变为运行中状态(
running
)。
阻塞(BLOCKED)
:表示线程阻塞于锁。一般是线程阻塞在进入synchronized
关键字修饰的方法或代码块(获取锁)时的状态。此时线程进入同步队列(SynchronizedQueue)。
等待(WAITING)
:进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。该状态的线程必须被显式地唤醒,否则会处于无限期等待的状态。此时线程进入等待队列(WatiQueue)。
超时等待(TIMED_WAITING)
:状态类似于WAITING
,但不同于WAITING
,它可以在指定的时间后自行返回。此时线程进入等待队列(WatiQueue)。
终止(TERMINATED)
:表示该线程已经执行完毕。
1.4 线程的优先级
每一个 Java 线程都有一个优先级,这样有助于操作系统确定线程的调度顺序。
Java 线程的优先级是一个整数,数字越高优先级越高,其取值范围是
1
(Thread.MIN_PRIORITY
) - 10
(Thread.MAX_PRIORITY
)。默认情况下,每一个线程都会分配一个优先级
NORM_PRIORITY
(5
)。具有较高优先级的线程对程序更重要,并且应该在低优先级的线程之前分配处理器资源。但是,线程优先级不能保证线程执行的顺序,而且非常依赖于平台。
1.5 线程API介绍
1.5.1 Thread类
Thread 类本身提供了一些线程相关的方法。
start()
:线程被创建后就进入了NEW
状态。调用线程的start()
方法,线程就进入RUNNABLE
状态。RUNNABLE
状态的线程不一定在运行,有可能处于READY
状态,也有可能处于RUNNING
状态,这主要看是否被调度器调度运行。
yield()
:yield()
是静态方法。当前线程调用后主动放弃 CPU 时间片,由RUNNING
状态变为READY
状态,不释放锁资源,线程调度器会再次选择线程。yield()
的作用是让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()
达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。我们日常编码中其实很少会用到yield()
方法,在并发工具类中倒是有被大量使用。
sleep(long millis)
:sleep(long millis)
是静态方法,当前线程调用后进入TIMED_WAITING
状态,不释放锁资源,millis
后线程自动苏醒进入READY
状态。- 调用后线程状态不一致:调用
sleep(long millis)
后线程变成TIMED_WAITING
状态,调用yield()
后线程变成READY
状态。 sleep(long millis)
可以设置超时时间,yield()
不能设置超时时间。
sleep(long millis)
和yield()
的作用都是当前线程让出 CPU 资源让其他线程执行,都是静态方法,而且都不释放锁资源,区别是
join()/join(long millis)
:作用是让主线程阻塞等待子线程的终止,使用方法是当前线程里调用其他线程的join()/join(long millis)
- 如果是调用
join()
,被调用线程进入WAITING
状态,等待其他线程执行完毕或者 millis 时间到,当前线程一般情况下进入RUNNABLE
状态,也有可能进入BLOCKED
状态。 - 如果是调用
join(long millis)
,被调用线程进入TIMED_WAITING
状态,等待其他线程执行完毕或者 millis 时间到,当前线程一般情况下进入RUNNABLE
状态,也有可能进入BLOCKED
状态。 - 线程之所以有可能进入
BLOCKED
状态,是因为join()
是基于wait()
实现的,Thread 类线程执行完run()
方法后,会自动执行notifyAll()
方法。 - 注意
join()
会释放被调用的 Thread 实例的对象锁,但不会释放其它对象锁(包括 main 线程)。
interrupt()
:调用一个线程的interrupt()
可以中断该线程,如果该线程处于BLOCKED
、WAITING
或者TIME_WAITING
状态,那么就会抛出InterruptedException
,从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞。
1.5.2 Object类
Object 是所有类的父类,在 Object 中提供了一些线程相关的方法,所有对象都可以调用这些方法。
wait()
:当前线程调用某个对象的wait()
后,线程状态由RUNNING
变为WAITING
,并释放该对象的锁资源,并将当前线程放置到对象的等待队列。
notify()
:调用某个对象的notify()
,随机唤醒在该对象上等待的线程。等待线程不一定会立刻从wait()
返回,需要调用notify()
的线程释放了锁,等待线程获得了锁之后才会从wait()
返回。
notifyAll()
:调用某个对象的notifyAll()
,唤醒在该对象上等待的所有线程。其他基本同notify()
。
这三个方法都必须在 synchronized 同步的方法或代码块中调用,否则会被抱错。两个线程通过中间对象的
wait()
和 notify/notifyAll()
方法完成等待方和通知方之间的交互工作,可以实现等待/通知的经典范式。等待方遵循如下原则:
- 获取对象的锁。
- 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
- 条件满足则执行对应的逻辑。
例如下面例子
wait()/notify()/notifAll()
的原理是:任意一个 Object 对象都会关联一个 Monitor
对象,如果使用 synchronized
给对象上锁(重量级锁),Mark Word
位置就会指向 Monitor
对象的引用地址。当 Thread-2 执行下面代码时
- Thread-2 线程执行上述代码时,对象 obj 会被上一把锁,这是一把重量级锁,obj 对象头的
Mark Word
字段指向了操作系统创建的Monitor
对象引用地址。
Monitor
对象只能有一个Owner
,此时如果有其它线程如 Thread-3 或 Thread-4 线程要获取这把锁就要进入Monitor
对象的同步队列EntryList
中,等待 Thread2 释放锁。
- Thread-2 拿到 obj 对象锁之后,如果主动释放锁(通过调用
obj.wait()
),Thread-2 变成 WAITING 状态,进入Monitor
对象的等待队列WaitSet
中,等待其他线程调用obj.notify()
来唤醒它。
Monitor
对象和 wait()/notify()/notifAll()
的状态转换过程如下图所示:1.5.3 LockSupport类
LockSupport
是rt.jar
提供的工具类,用于阻塞或唤醒一个线程,是构建同步组件(AQS、Condition等)的基础工具。LockSupport
所有的方法都是静态方法,可以让线程在任意位置阻塞,以 park
开头的方法用来阻塞当前线程,以及 unpark(Thread thread)
方法来唤醒一个被阻塞的线程。常用的 API 有:park()
: 阻塞当前线程,如果调用unpark(Thread thread)
方法或者当前线程被中断,才能从park()
方法返回。
parkNanos(long nanos)
: 阻塞当前线程,最长不超过 nanos 纳秒,返回条件在park()
的基础上增加了超时返回。
parkUntil(long deadline)
: 阻塞当前线程,知道 deadline 时间 (从 1970 年开始到 deadline 时间的毫秒数)。
unpark(Thread thread)
: 唤醒处于阻塞状态的线程 thread。
park(Object blocker)
: 阻塞当前线程,blocker 用来标识当前线程在等待的对象,该对象主要用于问题排查和系统监控
。
parkNanos(Object blocker, long nanos)
: 比park(Object blocker)
增加一个超时时间。
parkUntil(Object blocker, long deadline)
: 比parkUntil(long deadline)
多一个阻塞当前对象。
例如下面这个例子,thread1 通过调用
LockSupport.park()
进入阻塞状态,thread2 通过调用 LockSupport.unpark(thread1)
唤醒 thread1,整个过程都没有加锁操作。LockSupport
的底层原理:LockSupport
调用的 Unsafe
中的 native 代码。- LockSupport类使用了一种名为 Permit(许可) 的概念来做到阻塞和唤醒线程的功能。
- 每个线程都有一个 permit,permit 只有两个值
1
和0
,默认是0
。可以把 permit 看成是一种(0-1
)信号量(Semaphore),但与 Semaphore 不同的是,permit 的累加上限是1
。
- 因为 permit 默认是
0
,当前线程 thread 调用park()
方法,thread 线程就会阻塞。
- 直到其他线程调用
unpark(thread)
方法后,就会将 thread 线程的 permit 设置成1
(注意多次调用unpark
方法,不会累加,permit 值还是1
),会自动唤醒 thread 线程。
注意:唤醒两次后阻塞两次,最终结果还会阻塞线程。因为 permit 的数量最多为1
,连续调用两次unpark
和调用一次unpark
效果一样,只会增加一个 permit。而调用两次park
却需要消费两个permit
。permit
数量不够,所以线程还是会被阻塞。
1.5.4 Condition类
Condition
需要配合 Lock
使用(调用 Lock
对象的 newCondition()
方法创建出来),提供了类似 Object 的监视器方法,await()/signal()/signalAll()
方法分别对应 Object 类的 wait()/notify()/notifyAll()
方法。Condition
的使用方式基本和 Object 类相同,但底层是使用 LockSupport
工具来实现的。常用的 API 有:
await()
:当前线程进入等待状态直到被通知(signal)或中断。如果当前等待线程从 await() 方法返回,那么表明该线程已经获取了Condition 对象所对应的锁。当前线程从 await() 方法返回的情况包括:1、其他线程调用该 Condition 的 signal() 或 signalAll() 方法 ,而当前线程被选中唤醒;2、其他线程调用 Interrupt()方法中断当前线程
awaitUninterruptibly()
:当前线程进入等待状态被通知,从方法返回名称上可以看出该方法对中断不敏感
awaitNanos(long nanosTimeout)
:当前线程进入等待状态直到被通知,中断或者超时。返回值表示剩余的时间,如果在 nanosTimeout 纳秒之前被唤醒,那么返回值就是 (nanosTimeout-实际耗时)。如果返回 0 或者负数,那么可以认定已经超时了
awaitUntil(Date deadline)
:当前线程进入等待状态知道被通知、中断或者到某一个时间。如果没有到指定时间就被通知,方法返回 true ,否则,表示到了指定时间,方法返回 false。
signal()
:唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关的锁
signalAll()
:唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与 Condition相关的锁
Object.wait()
、LockSupport.park()
、Condition.await()
的区别:
Object.wait()
和Condition.await()
需要获取锁,LockSupport.park()
不需要获得锁就可以让线程进入WAITING/TIMED_WAITING
状态,可以在任何地方执行,比前者更灵活,性能更好。
Object.wait()
和Condition.await()
需要捕获InterruptedException
,LockSupport.park()
不需要捕获异常。
Object.wait()
本身就是 native 方法,LockSupport.park()
底层调用了 Unsafe 的 native 方法,Condition.await()
底层使用LockSupport
来实现。
1.5.5 线程状态转换过程
结合上面介绍的线程六种状态和线程 API,整体的转换过程如下所示:
2、线程池介绍
线程池就是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》书中的部分内容来总结一下使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。
3、Executor框架
Executor
框架是 Java5 用于统一创建与运行的接口,通过 Executor
来启动线程替代以前的 Thread.start()
启动线程。Executor
框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor
框架让并发编程变得更加简单。严格意义上讲Executor
并不是一个线程池,只是一个执行工具,真正的线程池接口是ExecutorService
。
Executor
框架整体的继承关系如下图所示:Executor
框架结构主要由三大部分组成:- 任务(
Runnable
/Callable
)。也就是工作单元,包括被执行任务需要实现的接口:Runnable
接口或者Callable
接口;
- 任务的执行(
Executor
)。包括任务执行机制的核心接口Executor
,以及继承自Executor
的ExecutorService
接口。ExecutorService
接口有两个关键实现类:ThreadPoolExecutor
和ScheduledThreadPoolExecutor
。
- 异步计算的结果(
Future
)。包括Future
接口及实现了Future
接口的FutureTask
类。
Executor 框架的使用过程如下图所示:
- 主线程首先要创建实现
Runnable
或者Callable
接口的任务对象。
- 然后可以把创建完成的
Runnable
/Callable
对象交给ExecutorService
执行,有两种方式: - 使用
ExecutorService.execute(Runnable command)
方法,这种方式没有返回结果。 - 使用
ExecutorService.submit(Callable task)
方法,这种方式会返回一个FutureTask 对象
。(由于FutureTask
实现了Runnable
,我们也可以创建FutureTask
,然后直接交给ExecutorService
执行。)主线程可以执行FutureTask.get()
方法来等待任务执行完成,可以执行FutureTask.cancel()
来取消此任务的执行。
execute
和submit
的区别:
submit
可以获取异常信息,execute
不能获取异常信息。
submit
可以提交 Runnable 任务和 Callable 任务,如果提交的是 Callable 任务可以获取到任务返回值,execute
只能提交 Runnable 任务,不能获取任务返回值。
3.1 Runnable和Callable
java.lang.Runnable 接口只声明了一个 run() 方法,run() 的返回值为 void,执行完任务之后无法返回任何结果。
Callable 接口位于 java.util.concurrent 下,是为了解决 Runnable 没有返回值的问题后面才加上的,也只声明了一个 call() 方法,返回的类型就是传递进来的V类型。
Runnable 和 Callable 接口都可以配合 ExecutorService 来使用的,在 ExecutorService 接口中声明了若干个 submit 方法的重载版本。
一般情况下我们使用第一个 submit 方法和第三个 submit 方法,第二个 submit 方法很少使用。
3.2 Executor
从上面的类继承结构列出
Executor
接口的关键接口和类:Executor
接口:Executor 框架的基础,只是一个执行器,只定义了execute()
一个方法,将任务和任务的执行分离开来。
ExecutorService
接口:真正的线程池接口,定义了submit()
、shutdown()
等线程池功能。ThreadPoolExecutor
:A
bstractExecutorService
实现了Executor
接口,而ThreadPoolExecutor
继承了AbstractExecutorService
接口。ThreadPoolExecutor
是ExecutorService
接口的默认实现类,是线程池的最核心实现类。
ScheduledExecutorService
接口:定期任务线程池的接口,继承了ExecutorService
。ScheduledThreadPoolExecutor
:ScheduledExecutorService
接口的实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor
比 Timer 更灵活,功能更强大。
Executors
工厂类:提供了常见配置线程池的方法,因为ThreadPoolExecutor
的参数众多且意义重大,为了避免配置出错,才有了Executors
工厂类。利用Executors
可以基于ThreadPoolExecutor
实现类快速创建线程池:FixedThreadPool
:固定线程数的线程池。SingleThreadExecutor
:单个线程的线程池。CachedThreadPool
:根据需要创建新线程,大小无界的线程
下面介绍下最核心的
ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
实现类。3.2.1 ThreadPoolExecutor
ThreadPoolExecutor 的构造函数如下:
ThreadPoolExecutor 构造方法有七个参数,这也是面试常考点,应该重点掌握:
corePoolSize
:核心线程池大小。当线程池中的线程数目达到 corePoolSize 后,就会把暂时无法处理的任务放到缓存队列当中。如果调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,会直接预先创建corePoolSize的线程;否则会等有任务来之后,才创建一个线程去执行任务。
maximumPoolSize
:最大线程池大小。表示在线程池中最多能创建多少个线程,如果运行中的线程超过了这个数字,那么相当于线程池已满,新来的任务会使用 RejectedExecutionHandler 进行处理。
keepAliveTime
:线程最大空闲时间。如果线程数超过 corePoolSize 且线程空闲时长超过 keepAliveTime,就会减少线程数维持在 corePoolSize 大小。
unit
:时间单位。参数 keepAliveTime 的时间单位。
workQueue
:线程等待队列。一个阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了 corePoolSize 大小,才会放在这里。可供使用的阻塞队列有:LinkedBlockingQueue
:链表实现的有界阻塞队列,最常用的阻塞队列。在初始化时指定一个大小,如果不指定就会使用默认大小为 Integer.MAX_VALUE 的容量。注意要配置一下队列大小,设置成有界队列。否则可能会把内存撑爆。用于 FixedThreadPool 和 SingleThreadExecutor。ArrayBlockingQueue
:数组实现的有界队列。DelayQueue
:延时队列。用于周期任务线程池中。PriorityBlockingQueue
:优先级队列。SynchronousQueue
:同步队列,用于 CachedThreadPool。
threadFactory
:线程创建工厂,主要用来创建线程,比如可以指定线程的名字。默认可以使用 Executors.defaultThreadFactory()。
handler
:拒绝策略。当线程池满了的时候,用于处理新任务的策略。可以通过实现 RejectedExecutionHandler 接口自定义拒绝策略,ThreadPoolExecutor 内部也提供了四种拒绝策略:AbortPolicy
:这是默认的策略,直接抛出异常。CallerRunsPolicy
:由调用者所在线程来运行任务。DiscardOldestPolicy
:丢弃队列中最老的任务,并执行当前任务。DiscardPolicy
:不处理,直接把当前任务丢弃。
我们可以通过下面这种方式来新建一个线程池。
线程池执行任务的过程如下所示:
- 当任务量小于 corePoolSize 的时候,它会创建一个线程来执行此任务。
- 当任务量大于 corePoolSize,并且没有空闲线程的时候,且线程池的线程数小于 maximumPoolSize,此时会将任务存到 workQueue 里面。
- 当前任务量继续增大,并且 workQueue 已经满了,就创建线程来执行任务,直到线程数等于 maximumPoolSize。
- 当 workQueue 已经满了,且线程数等于 maximumPoolSize,这时候就会对处理不过来的任务执行拒绝策略。
线程池的关闭过程:可以通过
shutdown()
或者 shutDownNow()
来关闭线程池。shutDown()
:把线程池的状态设置为SHUTDOWN
,然后中断所有没有正在执行任务的线程,而已经在执行任务的线程继续执行直到任务执行完毕。
shutDownNow()
:把当前线程池状态设为STOP
,尝试停止所有的正在执行或者暂停的线程,并返回等待执行的任务的列表。
3.2.2 Executors工具类
有时候我们不想使用这么复杂的参数创建线程池,比如我们只是快速创建一个包含 5 个线程的线程池用于本地测试,这时候可以使用
Executors
工具类来创建线程池。Executors
创建的线程池有三种:FixedThreadPool
:固定线程数的线程池。
SingleThreadExecutor
:单个线程的线程池。
CachedThreadPool
:根据需要创建新线程,大小无界的线程。
阿里巴巴开发手册不允许使用Executors
去创建线程池,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。还有一种原因是FixedThreadPool
和SingleThreadExecutor
的任务队列最大队列长度是Integer.MAX_VALUE
,约等于无限队列,可能会因为任务数量过多出现 OOM 问题;CachedThreadPool
允许创建的线程数量为Integer.MAX_VALUE
,约等于可以无限创建线程,可能会因为创建大量线程出现 OOM 问题。
FixedThreadPool:线程数量固定的线程池,适用于为了满足资源管理的需求,而需要适当限制当前线程数量的情景。
- 把线程池最大线程数量
maxmumPoolSize
和核心线程池的数量corePoolSize
设置为相等,值为 FixedThreadPool 时指定的参数 nThreads,所以线程数量是固定的。
keepAliveTime
设置为0L,意味着多余的空闲线程会被立即终止。因为maxmumPoolSize
和corePoolSize
相等,所以这是个无效参数。
- 使用
LinkedBlockingQueue
作为阻塞队列,但是没有设置队列大小,默认为 Integer.MAX_VALUE,约等于无限队列,所以不会执行拒绝策略。所以 handler 也是个无效参数。由于使用无界队列,SingleThreadExecutor 不会拒绝任务,任务会一直添加到任务队列中,可能会因为任务数量过多出现 OOM 问题。
SingleThreadExecutor:单线程的线程池。适用于需要让线程顺序执行,并且在任意时间,只能有一个任务被执行,而不能有多个线程同时执行的场景。
- corePoolSize 和 maximumPoolSize 被设置为1。其他参数与 FixedThreadPool 相同。
CachedThreadPool:可根据需要创建新线程的线程池(已经创建的线程在空闲时会被重用),适用于执行很多的短期异步任务的场景。
- corePoolSize 被设置为 0,maximumPoolSize 被设置为 Integer.MAX_VALUE,相当于无限,所以最大线程数不受限制。
- keepAliveTime 设置为 60L,意味着 CachedThreadPool 中的空闲线程等待新任务的最长时间为60秒。
- 使用没有容量的 SynchronousQueue 作为线程池的工作队列,但是因为 maximumPool 是无界的,如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。可能会因为线程数量过多出现 OOM 问题。
3.2.3 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
主要用来在给定的延迟之后运行任务,或者定期执行任务。 ScheduledThreadPoolExecutor
的功能与 Timer
类似,但功能更强大、更灵活。Timer
对应的是单个后台线程,而 ScheduledThreadPoolExecutor
可以在构造函数中指定多个线程。ScheduledThreadPoolExecutor
通常使用 Executors
创建,可以创建 2 种类型:ScheduledThreadPoolExecutor
:包含若干个线程的ScheduledThreadPoolExecutor
。
SingleThreadScheduledExecutor
:只包含一个线程的ScheduledThreadPoolExecutor
。
- corePoolSize 由使用者自行设置,maximumPoolSize 虽然被设置为 Integer.MAX_VALUE ,但是由于 DelayedWorkQueue 是无界队列,所以 maximumPoolSize 的大小没有意义,相当于无效参数。
- 待调度的任务(ScheduledFutureTask)放到一个DelayQueue,DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 ScheduledFutureTask 进行排序。排序时,time 小的排在前面(时间早的任务将被先执行)。
3.3 FutureTask
3.3.1 FutureTask使用
Future
是一个接口,用来获取异步计算的结果,具体功能是对 Runnable
或者 Callable
对象任务执行的结果进行获取、取消、判断是否完成等操作。Future
接口的源码如下:通过方法分析我们也知道实际上 Future 提供了3种功能:
- 能够中断执行中的任务。
- 判断任务是否执行完成。
- 获取任务执行完成后的结果。
FutureTask
是 Future
接口的实现类,除此之外还实现了 Runable
接口。FutureTask
的执行过程如下所示:get()
方法:- 当 FutureTask 处于未启动或者已启动的状态时,调用
get()
方法会将导致调用线程阻塞。 - 当 FutureTask 处于已完成的状态时,调用
get()
方法会立即放回调用结果或者抛出异常。
cancel(...)
方法:- 当 FutureTask 处于未启动状态时,调用
cancel(...)
方法将导致线程永远不会被执行。 - 当 FutureTask 处于已启动状态时,调用
cancel(true)
方法将以中断执行此任务的线程的方式来试图停止此任务;调用cancel(false)
方法将不会对正在进行的任务产生任何影响。 - 当 FutureTask 处于已完成状态时,调用
cancel(...)
方法将返回 false。
FutureTask
常见的有两种用法:第一种,也是最常见的,通过
ExecutorService.submit(new Callable())
获取 FutureTask
对象。FutureTask
对象可以通过轮询(循环调用isDone()
)或者阻塞(调用 get()
)的方式获取结果,也可以通过 cancel()
取消任务执行。第二种,因为
FutureTask
实现了 Runable
接口,所以我们可以使用 FutureTask
对 Callable
或者 Runnable
对象进行进一步封装,然后把 FutureTask
对象通过 ExecutorService.submit(new FutureTask())
交给线程池执行。3.3.2 FutureTask原理
在 jdk1.8 以前 FutureTask 是通过内部类 Sync 继承 AQS 来实现的,在 jdk1.8 中移除了 Sync 组件,直接使用 state 变量和对 state 变量进行 CAS 操作,以及一个简单的 Treiber 堆栈来保存等待的线程。
FutureTask 使用
volatile int state
来保存 FutureTask 的状态,所有操作都围绕 state 变量展开,state 有以下状态:NEW
:表示是个新的任务或者还没被执行完的任务。这是初始状态。
COMPLETING
:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。
NORMAL
:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。
EXCEPTIONAL
:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。
CANCELLED
:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。
INTERRUPTING
:任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。
INTERRUPTED
:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。
3.4 Executor实现原理
在向线程池提交任务时有两个比较中要的参数会决定任务的去向,这两个参数分别是线程池的状态和线程池中的线程数。因为涉及多线程的操作,这里为了保证原子性,在 ThreadPoolExecutor 内部使用了一个 AtomicInteger 类型的整数
ctl
来表示这两个参数,以及一系列修改 ctl
变量的方法。最核心的是
execute()
方法,虽然通过 submit()
也可以提交任务,但是实际上 submit()
方法里面最终调用的还是 execute()
方法。整个流程上面其实已经介绍过,这里再补充一下
- 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
- 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
- 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
- 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用
RejectedExecutionHandler.rejectedExecution()
方法。
在
execute
方法中,多次调用 addWorker
方法。addWorker
这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。这里涉及到两个重要概念:
- Task:任务,实现了 Runnable接口,但是并没有通过 start 方法执行,而是被 Worker 调用了 run 方法来执行。
- Worker:线程,线程池会把每个线程封装成一个 Worker 对象。Worker 类本身既实现了 Runnable,又继承了 AQS,所以其既是一个可执行的线程,又可以达到锁的效果。
4、Fork/Join框架
Fork/Join
框架是一种在 JDK 7 引入的线程池,用于并行执行把一个大任务拆成多个小任务并行执行,最终汇总每个小任务结果得到大任务结果的特殊任务。通过其命名也很容易看出框架主要分为 Fork 和 Join 两个阶段,第一阶段 Fork 是把一个大任务拆分为多个子任务并行的执行,第二阶段 Join 是合并这些子任务的所有执行结果,最后得到大任务的结果。Java 8 Stream 的并行操作底层就是用到了
Fork/Join
框架。但 Fork/Join 是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。不是所有的场景都适合使用 Fork/Join 框架。4.1 Fork/Join使用
4.2 Fork/Join原理
Fork/Join 框架的核心思想是
分而治之
,并且使用了工作窃取算法(work-stealing)
。工作窃取算法是指某个线程从其他队列里窃取任务来执行。我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。
但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
Fork/Join
框架包括三部分:ForkJoinPool
:线程池。
ForkJoinTask
:任务,但其本身是抽象类,通过会使用它的实现类:RecursiveAction
:一个递归无结果的ForkJoinTask(没有返回值)。RecursiveTask
:一个递归有结果的ForkJoinTask(有返回值)
ForkJoinWorkerThread
:负责执行任务。
整个工作过程是:
ForkJoinPool
的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask
)。
- 每个工作线程在运行中产生新的任务(通常是因为调用了
fork()
)时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
- 在遇到
join()
时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
- 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
4.2.1 ForkJoinPool
ForkJoinPool
是 Fork/Join
框架的核心类,是用于执行 ForkJoinTask
任务的执行(线程)池。ForkJoinPool
同时维护着执行池中的线程和任务队列。ForkJoinPool
有一个核心内部类 WorkQueue
,这是一个双端队列,用于存储有执行任务的线程( ForkJoinWorkerThread owner
),还有这个线程需要处理的任务( ForkJoinTask<?>[] array
)。基于
WorkQueue
,ForkJoinPool
维护了一个核心变量工作队列数组( WorkQueue[] workQueues
),所以每个工作线程都维护着一个工作队列。当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。
4.2.2 ForkJoinTask
ForkJoinTask
表示 ForkJoin 任务,在使用框架时首先必须先定义任务,通常只需要继承自 ForkJoinTask
类的子类 RecursiveAction
(无返回结果) 或者 RecursiveTask
(有返回结果)即可。ForkJoinTask
也是 Future 的子类,所以也需要等待返回结果。ForkJoinTask 有一个 int 类型的 status 字段,高16位存储任务执行状态,低16位预留用于用户自定义的标记。
任务未完成之前
status
大于等于0,完成之后就是 NORMAL
、CANCELLED
或 EXCEPTIONAL
这几个小于 0 的值NORMAL
已完成
CANCELLED
被取消
SIGNAL
信号
EXCEPTIONAL
发生异常
这几个值也是按大小顺序的:
0
(初始状态) > NORMAL
> CANCELLED
> EXCEPTIONAL
ForkJoinTask 常用方法:
fork()
在当前线程运行的线程池中安排一个异步执行,简单的理解就是再创建一个子任,将任务通过push
方法加入到当前工作线程的工作队列或者提交队列(外部非ForkJoinWorkerThread
线程通过submit
、execute
方法提交的任务),等待被线程池调度执行,这是一个非阻塞的立即返回方法。
join()
当任务完成的时候返回计算结果。- 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
- 查看任务的完成状态,如果已经完成,直接返回结果。
- 如果任务尚未完成,但处于自己的工作队列内,则完成它。
- 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
- 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
- 递归地执行第5步。
invoke()
开始执行任务,如果必要,等待计算完成。
4.2.3 ForkJoinWorkerThread
ForkJoinWorkThread
比较简单,继承了 Thread 类,持有 ForkJoinPool
和 ForkJoinPool.WorkQueue
的引用,以表明该线程属于哪个线程池,它的工作队列是哪个。其他的和普通线程差不多。- Author:mcbilla
- URL:http://mcbilla.com/article/ea1270eb-85bb-4216-91e8-01547dd0661d
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts