基础概念
- 进程(process):进程是计算机中的一个任务,比如打开浏览器、IntelliJ IDEA。
- 线程(thread):进程内部有多个子任务,叫线程。比如IDEA在敲代码的同时还能自动保存、自动导包,都是子线程做的。
线程是操作系统调度的最小任务单位。线程自己不能决定什么时候执行,由操作系统决定什么时候调度。因此多线程编程中,代码的先后顺序不代表代码的执行顺序。
- 提高应用程序的性能。异步编程让程序更快的响应。
- 提高CPU利用率。一个线程阻塞,另一个线程继续执行,充分利用CPU。
同时多线程也会带来安全问题,比如多个线程读写一个共享变量,会出现数据不一致的问题。
- 高并发。系统在同一时间要处理多个任务时,需要用多线程。
- 很耗时的操作。如文件读写,异步执行不让进程阻塞。
- 不影响方法主流程逻辑,但又影响接口性能的操作,如数据同步,使用异步方式能提高接口性能。
创建线程的方式
多线程的创建方法基本有四种:
- 继承
- 实现
Runnalble
接口 - 实现
Callable
接口 - 线程池
Thread
类
1.继承Thread类
public class ThreadTest extends Thread {
@Override
public void run( {
System.out.println("新线程开始...";
}
public static void main(String[] args {
ThreadTest t = new ThreadTest(;
t.start(;
System.out.println("main线程结束...";
}
}
main线程结束...
新线程开始...
启动一个新线程总是调用它的start(
方法,而不是run(
方法;ThreadTest
子线程启动后,它跟main
就开始同时运行了,谁先执行谁后执行由操作系统调度。所以多线程代码的执行顺序跟代码顺序无关。
2.实现Runnable接口
Runnable接口,重写run(
方法,作为构造器参数传给Thread
,调用start(
方法启动线程。
public class Test {
public static void main(String[] args {
RunnableThread r = new RunnableThread(;
new Thread(r.start(;
new Thread(r.start(;
}
}
class RunnableThread implements Runnable {
@Override
public void run( {
System.out.println("新线程开始...";
}
}
一般推荐使用实现Runnable
的方式来创建新线程,它的优点有:
- Java中只有单继承,接口则可以多实现。如果一个类已经有父类,它就不能再继承
- 实现
Runnable
接口的类具有共享数据的特性,它可以同时作为多个线程的执行单位(target
),此时多个线程操作的是同一个对象的run
方法,这个对象所有变量在这几个线程间是共享的。而继承Thread的方式做不到,比如A extends Thread
,每次启动线程都是new A(.start(
,每次的A对象都不同。
Thread
类了,继承了Thread
类就不能再继承其他类,有局限性。实现Runnable
接口则没有局限性。
3. 实现Callable接口
Callable区别于Runnable
接口的点在于,Callable
的方法有返回值,还能抛出异常。
public interface Callable<V> {
V call( throws Exception;
}
Callable
的用法:
- 配合
- 使用线程池时,调用
ExecutorService#submit
方法,返回一个Future
对象。 -
Future
对象的get(
方法能返回异步执行的结果。调用get(
方法时,如果异步任务已经完成,就直接返回结果。如果异步任务还没完成,那么get(
方法会阻塞,一直等待任务完成才返回结果,这一点也是FutureTask
的缺点。
FutureTask
一起使用,FutureTask
是RunnableFuture
接口的典型实现,RunnableFuture
接口从名字来看,它同时具有Runnable
和Future
接口的的能力。FutureTask
提供2个构造器,同时支持Callable
方式和Runnable
方式的任务。FutureTask
可作为任务传给Thread
的构造器。
Callable和FutureTask
一起使用的例子:
public class CallableTest {
public static void main(String[] args {
// 创建Callable接口实现类的对象
CallableThread sumThread = new CallableThread(;
// 创建FutureTask对象
FutureTask<Integer> futureTask = new FutureTask<>(sumThread;
// 将FutureTask的对象作为参数传递到Thread类的构造器中,创建Thread对象,并调用start(
new Thread(futureTask.start(;
try {
// 获取Callable中call方法的返回值
Integer sum = futureTask.get(;
System.out.println("总和为" + sum;
} catch (InterruptedException | ExecutionException e {
e.printStackTrace(;
}
System.out.println("main线程结束";
}
}
class CallableThread implements Callable<Integer> {
@Override
public Integer call( throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++ {
sum += i;
}
Thread.sleep(2000; // 等待2s验证futureTask.get(是否等待
return sum;
}
}
总和为5050
main线程结束
在JDK源码中可看到get(方法执行时,会判断线程状态如果是未完成,会进入一个无限循环,直到任务完成才返回执行结果。
public V get( throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING // 如果未完成,则等待完成
s = awaitDone(false, 0L;
return report(s;
}
private int awaitDone(boolean timed, long nanos throws InterruptedException {
// ...
for (; ; { // 无线循环,直到任务完成
// ...
int s = state;
if (s > COMPLETING {
if (q != null
q.thread = null;
return s;
}
// ...
}
}
Future和FutureTask
使用Callable
接口前,需要了解Future
和FutureTask
。
Future接口代表着异步计算结果。它定义的方法有:
-
get(long timeout, TimeUnit unit
:获取结果,但只等待指定的时间;添加超时时间可以让调用线程及时释放,不会死等; -
cancel(boolean mayInterruptIfRunning
:取消当前任务;mayInterruptIfRunning
的作用是,如果任务在执行中,这时取消任务,如果mayInterruptIfRunning
为true
就中断,否则不中断。 -
isCancelled(
:任务在执行完成前被取消,返回true
,否则返回false
; -
isDone(
:判断任务是否已完成。任务完成包括:正常完成、抛出异常而完成、任务被取消。
get(
:获取结果,任务未完成前会一直等待,直到完成;
FutureTask作为Future
的实现类,也有局限性。比如get(
方法会阻塞调用线程;不能将多个异步计算结果合并到一起等等,针对这些局限,Java8提供了CompletableFuture
。
4.线程池
- 线程池是什么?
- 为什么使用线程池,或者说使用线程池的好处是什么?
- 线程池怎么使用?
- 线程池的原理是什么,它怎么做到重复利用线程的?
线程池是什么
线程池(Thread Pool)是一种基于池化思想的管理线程的工具,它内部维护了多个线程,目的是能重复利用线程,控制并发量,降低线程创建及销毁的资源消耗,提升程序稳定性。
为什么使用线程池
-
降低资源消耗:重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
线程池解决的核心问题就是资源管理问题,在并发场景下,系统不能够确定在任意时刻,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能非常巨大。
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,会降低系统的稳定性。
线程池怎么使用
线程池的的核心实现类是ThreadPoolExecutor
,调用execute
或者submit
方法即可开启一个子任务。
public class ThreadPoolTest {
private static ThreadPoolExecutor poolExecutor =
new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1;
public static void main(String[] args throws ExecutionException, InterruptedException {
Runnable runnableTask = ( -> System.out.println("runnable task end";
poolExecutor.execute(runnableTask;
Callable<String> callableTask = ( -> "callable task end";
Future<String> future = poolExecutor.submit(callableTask;
System.out.println(future.get(;
}
}
ThreadPoolExecutor
的核心构造器有7个参数,我们来分析一下每个参数的含义:public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler { // 省略... }
-
workQueue
:阻塞队列。当线程池中的线程数超过corePoolSize
,新任务会被放到队列中,等待执行。 -
maximumPoolSize
:线程池的最大线程数量。 -
keepAliveTime
:非核心线程空闲时的存活时间。非核心线程即workQueue
满了之后,再提交任务时创建的线程。非核心线程如果空闲了,超过keepAliveTime
后会被回收。 -
unit
:keepAliveTime
的时间单位。 -
threadFactory
:创建线程的工厂。默认的线程工厂会把提交的任务包装成一个新的任务。 -
handler
:拒绝策略。当线程池的workQueue
已满且线程数达到最大线程数时,新提交的任务执行对应的拒绝策略。
corePoolSize
:线程池的核心线程数。线程池中的线程数小于corePoolSize
时,直接创建新的线程来执行任务。
JDK也提供了一个快速创建线程池的工具类Executors
,它提供了多种创建线程池的方法,但通常不建议使用Executors
来创建线程池,因为它提供的很多工具方法,要么使用的阻塞队列没有设置边界,要么是没有设置最大线程的上限。任务一多容易发生OOM。实际开发应该根据业务自定义线程池。
线程池的原理
execute
execute方法,所有的任务调度都是通过execute
方法完成的。
public void execute(Runnable command {
// ...
int c = ctl.get(;
if (workerCountOf(c < corePoolSize { // (1
if (addWorker(command, true
return;
c = ctl.get(;
}
if (isRunning(c && workQueue.offer(command { // (2
int recheck = ctl.get(;
// 重新检查状态,如果是非运行状态,接着执行队列删除操作,然后执行拒绝策略
if (! isRunning(recheck && remove(command
reject(command;
// 如果是因为remove(command删除队列元素失败,再判断池中线程数量
// 如果池中线程数为0则新增一个任务为null的非核心线程
else if (workerCountOf(recheck == 0
addWorker(null, false;
}
else if (!addWorker(command, false // (3
reject(command;
}
透过execute
方法的3个if
判断,可以把它的逻辑梳理为3个部分:
- 第一个
- 第二个
if
:如果线程数量大于等于核心线程数,则将任务添加到该阻塞队列中。 -
else if
:线程池状态不对,或者添加到队列失败即队列满了,则创建一个非核心线程执行新提交的任务。如果非核心线程创建失败就执行拒绝策略。
if
:如果线程数量小于核心线程数,则创建一个线程来执行新提交的任务。
addWorker
execute中的核心逻辑要看addWoker
方法,它承担了核心线程和非核心线程的创建。addWorker
方法前半部分代码用一个双重for循环确保线程池状态正确,后半部分的逻辑是创建一个线程对象Worker
,开启新线程执行任务的过程。
Worker是对提交进来的线程的封装,创建的worker
会被添加到一个HashSet
,线程池中的线程都维护在这个名为workers
的HashSet
中并被线程池所管理。
Worker本身也是一个线程对象,它实现了Runnable
接口,在addWorker
中会启动一个新的任务,所以我们要看它的run
方法,而run
方法的核心逻辑是runWorker
方法。
final void runWorker(Worker w {
// ...
try {
while (task != null || (task = getTask( != null {
// ...
try {
try {
task.run(; // 执行普通的run方法
} finally {
task = null; // task置空
}
}
}
} finally {
processWorkerExit(w, completedAbruptly; // 回收空闲线程
}
}
可以看到runWorker
方法中有一个while
循环,循环执行task的run方法,这里的task就是提交到线程池的任务,它对当成了普通的对象,执行完task.run(
,最后会把task
设置为null
。
(task = getTask( != null这个条件,如果getTask( == null
则跳出循环执行processWorkerExit
方法,processWorkerExit
方法的作用是回收空闲线程。
getTask
getTask(方法中。
private Runnable getTask( {
boolean timedOut = false; // Did the last poll( time out?
for (; ; { // (1
// 校验线城池状态的代码,先省略...
int wc = workerCountOf(c;
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // (2
if ((wc > maximumPoolSize || (timed && timedOut
&& (wc > 1 || workQueue.isEmpty( {
if (compareAndDecrementWorkerCount(c // 线程数减1
return null; // 这里时终端外层while循环的时机
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS :
workQueue.take(; // (3
if (r != null
return r; // 取到值了就在外层的while循环中执行任务
timedOut = true; // 否则就标记为获取队列任务超时
} catch (InterruptedException retry {
timedOut = false;
}
}
}
结合(1、(3)这两个地方可以看出,getTask(
方法是一个无限循环,不断从阻塞队列中取任务,取到了任务就返回,到外层runWorker
方法中,执行这个任务的run
方法,即线程池通过启动一个Worker子线程来执行提交进来的任务,并且一个Worker线程会执行多个任务!
getTask(何时返回null
,因为返回null
才可以看下一步的processWorkerExit
方法。
getTask(返回null
主要看timed && timedOut
这个条件。变量值timed
为true
的条件是:允许核心线程超时或者线程数大于核心线程数。timedOut
变量为true
的条件是从workQueue
为空了,取不到任务了,但是这个前提是timed == true
,执行workQueue.poll
的时候,因为workQueue.poll
方法获取任务最多等待keepAliveTime
的时间,超过这个时间获取不到就返回null
,而workQueue.take(
方法获取不到任务会一直等待!
因此,在核心线程不会超时的情况下,如果池中的线程数小于核心线程数,这个getTask(会一直循环下去,这就是在这种情况下线程池不会自动关闭的原因!
allowCoreThreadTimeOut == true,即核心线程也能超时,当阻塞队列为空,所有Worker
线程都会被回收。
ThreadPoolExecutor的注释说,当池中没有剩余线程,线程池会自动关闭。
但我也没找到证据,没看到哪里显式调用shutdown(
,但确实会自动关闭。
processWorkerExit
getTask(获取不到任务后,会执行processWorkerExit
方法回收线程。在这里,Worker
线程集合随机删除一个线程对象,然后再随机中断一个workers
中的线程。可见线程销毁线程的方式时删除线程引用,让JVM自动回收。
private void processWorkerExit(Worker w, boolean completedAbruptly {
// ...
try {
workers.remove(w;
}
// 调用interrupt(方法中断线程,一次中断一个
tryTerminate(;
// ...
}
线程池原理总结
最后我们回到最初的问题,线程池的原理是什么,线程池怎么做到重复利用线程的?
Worker的线程对象来处理任务。在线程数不超过核心线程数的情况下,一个任务对应一个Worker
线程,超过核心线程数,新的任务会提交到阻塞队列。一个Worker
线程在启动后,除了执行第一次任务之外,还会不断向阻塞队列中消费任务。如果队列里没任务了,Worker
线程会一直轮询,不会退出;只有在池中线程数超过核心线程数时才退出轮询,然后回收多余的空闲线程。即一个Worker
线程会处理多个任务,且Worker
线程受线程池管理,不会随意回收。
线程池的拒绝策略
RejectedExecutionHandler接口代表拒绝策略,并提供了4个实现类。线程池的默认拒绝策略是AbortPolicy
,丢弃任务并抛出异常。实际开发中用户可以通过实现这个接口去定制拒绝策略。
线程的状态
-
Runnable
:运行中的线程,正在执行run(
方法的Java代码; -
Blocked
:运行中的线程,因为某些操作被阻塞而挂起; -
Waiting
:运行中的线程,因为某些操作在等待中; -
Timed Waiting
:运行中的线程,因为执行sleep(
方法正在计时等待; -
Terminated
:线程已终止,因为run(
方法执行完毕。
New
:新创建的线程,尚未执行;
Runnable、Blocked
、Waiting
和Timed Waiting
这几个状态之间切换,直到最后变成Terminated
状态,线程终止。
- 线程正常终止:
- 线程意外终止:
run(
方法因为未捕获的异常导致线程终止; - 对某个线程的
Thread
实例调用stop(
方法强制终止(过时方法,不推荐使用)。
run(
方法执行到return
语句返回;
Thread类的常用方法
-
currentThread(
:返回当前代码执行的线程 -
yield(
: 释放当前CPU的执行权 -
join(
:join(
方法可以让其他线程等待,直到自己执行完了,其他线程才继续执行。 -
setDaemon(boolean on
:设置守护线程,也叫后台线程。JVM退出时,不必关心守护线程是否已结束。 -
interrupt(
:中断线程。 -
sleep(long millis
:让线程睡眠指定的毫秒数,在指定时间内,线程是阻塞状态 -
isAlive(
:判断当前线程是否存活。
start(
:启动当前线程
public class ThreadJoinTest {
public static void main(String[] args throws InterruptedException {
Thread t = new Thread(( -> {
System.out.println("hello";
};
System.out.println("start";
t.start(;
t.join(;
System.out.println("end";
}
}
start
hello
end
volatile
线程间共享变量需要使用volatile
关键字标记,确保每个线程都能读取到更新后的变量值。
volatile声明?这涉及到Java的内存模型(JMM)。
在Java虚拟机中,共享变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!
a = true,线程1执行a = false
时,它在此刻仅仅是把变量a
的副本变成了false
,主内存的变量a
还是true
,在JVM把修改后的a
回写到主内存之前,其他线程读取到的a
的值仍然是true
,这就造成了多线程之间共享的变量不一致。
volatile关键字的目的是告诉虚拟机:
- 每次访问变量时,总是获取主内存的最新值;
- 每次修改变量后,立刻回写到主内存。
volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。
volatile不能保证原子性,原子性问题需要根据实际情况做同步处理。
线程同步
在Java中,最常见的方法是用synchronized
关键字实现同步效果。
synchronized
synchronized可以修饰实例方法、静态方法、代码块。
synchronized的底层是使用操作系统的互斥锁(mutex lock)实现的,它的特点是保证内存可见性、操作原子性。
- 内存可见性:可见性的原理还要回到Java内存模型(上面JMM的那张图)。
- 操作原子性:持有同一个锁的两个同步块只能串行地执行。
synchronized
上锁时,会清空工作内存中变量的值,去主内存中获取该变量的值;解锁时,会把工作内存中变量的值同步回主内存中。
synchronized解决了多线程同步访问共享变量的正确性问题。但是,它的缺点是带来了性能下降。因为synchronized
代码块无法并发执行。此外,加锁和解锁需要消耗一定的时间,所以,synchronized
会降低程序的执行效率。
不需要synchronized的操作
- 基本类型(
- 引用类型赋值,例如:
List list = anotherList
。
long
和double
除外)赋值,例如:int n = 1
;
long
和double
是64位(8字节)数据,在32位和64位操作系统上是不一样的。JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把long
和double
的赋值作为原子操作实现的。