使用默认的线程池
方式一:通过@Async
注解调用
public class AsyncTest {
@Async
public void async(String name throws InterruptedException {
System.out.println("async" + name + " " + Thread.currentThread(.getName(;
Thread.sleep(1000;
}
}
启动类上需要添加@EnableAsync
注解,否则不会生效。
@SpringBootApplication
//@EnableAsync
public class Test1Application {
public static void main(String[] args throws InterruptedException {
ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args;
AsyncTest bean = run.getBean(AsyncTest.class;
for(int index = 0; index <= 10; ++index{
bean.async(String.valueOf(index;
}
}
}
方式二:直接注入 ThreadPoolTaskExecutor
需要加上 @EnableAsync
注解
@SpringBootTest
class Test1ApplicationTests {
@Resource
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Test
void contextLoads( {
Runnable runnable = ( -> {
System.out.println(Thread.currentThread(.getName(;
};
for(int index = 0; index <= 10; ++index{
threadPoolTaskExecutor.submit(runnable;
}
}
}
线程池默认配置信息
SpringBoot线程池的常见配置:
spring:
task:
execution:
pool:
core-size: 8
max-size: 16 # 默认是 Integer.MAX_VALUE
keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
allow-core-thread-timeout: true # 是否允许核心线程超时,默认true
queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE
shutdown:
await-termination: false # 线程关闭等待
thread-name-prefix: task- # 线程名称的前缀
SpringBoot 线程池的实现原理
TaskExecutionAutoConfiguration
类中定义了ThreadPoolTaskExecutor
,该类的内部实现也是基于java原生的ThreadPoolExecutor
类。initializeExecutor(
方法在其父类中被调用,但是在父类中RejectedExecutionHandler
被定义为了private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(;
,并通过initialize(
方法将AbortPolicy
传入initializeExecutor(
中。TaskExecutionAutoConfiguration 类中,
ThreadPoolTaskExecutor
类的bean的名称为:applicationTaskExecutor
和taskExecutor
。// TaskExecutionAutoConfiguration#applicationTaskExecutor( @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAUL T_TASK_EXECUTOR_BEAN_NAME } @ConditionalOnMissingBean(Executor.class public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder { return builder.build(; }
// ThreadPoolTaskExecutor#initializeExecutor( @Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity; ThreadPoolExecutor executor; if (this.taskDecorator != null { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler { @Override public void execute(Runnable command { Runnable decorated = taskDecorator.decorate(command; if (decorated != command { decoratedTaskMap.put(decorated, command; } super.execute(decorated; } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler; } if (this.allowCoreThreadTimeOut { executor.allowCoreThreadTimeOut(true; } this.threadPoolExecutor = executor; return executor; }
// ExecutorConfigurationSupport#initialize( public void initialize( { if (logger.isInfoEnabled( { logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""; } if (!this.threadNamePrefixSet && this.beanName != null { setThreadNamePrefix(this.beanName + "-"; } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler; }
覆盖默认的线程池
覆盖默认的
taskExecutor
对象,bean的返回类型可以是ThreadPoolTaskExecutor
也可以是Executor
。@Configuration public class ThreadPoolConfiguration { @Bean("taskExecutor" public ThreadPoolTaskExecutor taskExecutor( { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(; //设置线程池参数信息 taskExecutor.setCorePoolSize(10; taskExecutor.setMaxPoolSize(50; taskExecutor.setQueueCapacity(200; taskExecutor.setKeepAliveSeconds(60; taskExecutor.setThreadNamePrefix("myExecutor--"; taskExecutor.setWaitForTasksToCompleteOnShutdown(true; taskExecutor.setAwaitTerminationSeconds(60; //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy(; //初始化线程池 taskExecutor.initialize(; return taskExecutor; } }
管理多个线程池
如果出现了多个线程池,例如再定义一个线程池
taskExecutor2
,则直接执行会报错。此时需要指定bean的名称即可。@Bean("taskExecutor2" public ThreadPoolTaskExecutor taskExecutor2( { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(; //设置线程池参数信息 taskExecutor.setCorePoolSize(10; taskExecutor.setMaxPoolSize(50; taskExecutor.setQueueCapacity(200; taskExecutor.setKeepAliveSeconds(60; taskExecutor.setThreadNamePrefix("myExecutor2--"; taskExecutor.setWaitForTasksToCompleteOnShutdown(true; taskExecutor.setAwaitTerminationSeconds(60; //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy(; //初始化线程池 taskExecutor.initialize(; return taskExecutor; }
引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。
@Resource ThreadPoolTaskExecutor taskExecutor2;
对于使用
@Async
注解的多线程则在注解中指定bean的名字即可。@Async("taskExecutor2" public void async(String name throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread(.getName(; Thread.sleep(1000; }
线程池的四种拒绝策略
JAVA常用的四种线程池
ThreadPoolExecutor 类的构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(, defaultHandler; }
newCachedThreadPool
不限制最大线程数(
maximumPoolSize=Integer.MAX_VALUE
),如果有空闲的线程超过需要,则回收,否则重用已有的线程。new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(;
newFixedThreadPool
定长线程池,超出线程数的任务会在队列中等待。
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(;
newScheduledThreadPool
类似于
newCachedThreadPool
,线程数无上限,但是可以指定corePoolSize
。可实现延迟执行、周期执行。public ScheduledThreadPoolExecutor(int corePoolSize { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(; }
周期执行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5; scheduledThreadPool.scheduleAtFixedRate((->{ System.out.println("rate"; }, 1, 1, TimeUnit.SECONDS;
延时执行:
scheduledThreadPool.schedule((->{ System.out.println("delay 3 seconds"; }, 3, TimeUnit.SECONDS;
newSingleThreadExecutor
单线程线程池,可以实现线程的顺序执行。
public static ExecutorService newSingleThreadExecutor( { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(; }
Java 线程池中的四种拒绝策略
CallerRunsPolicy
:线程池让调用者去执行。
AbortPolicy:如果线程池拒绝了任务,直接报错。
DiscardPolicy:如果线程池拒绝了任务,直接丢弃。
DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。
CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy( { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e {
if (!e.isShutdown( {
r.run(;
}
}
}
效果类似于:
Runnable thread = (->{
System.out.println(Thread.currentThread(.getName(;
try {
Thread.sleep(0;
} catch (InterruptedException e {
throw new RuntimeException(e;
}
};
thread.run(;
AbortPolicy
直接抛出RejectedExecutionException
异常,并指示任务的信息,线程池的信息。、
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy( { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e {
throw new RejectedExecutionException("Task " + r.toString( +
" rejected from " +
e.toString(;
}
}
DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy( { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e {
}
}
DiscardOldestPolicy
-
e.execute(r : 当前任务入队。
e.getQueue(.poll( : 取出队列最旧的任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy( { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e {
if (!e.isShutdown( {
e.getQueue(.poll(;
e.execute(r;
}
}
}
java 线程复用的原理
java
的线程池中保存的是java.util.concurrent.ThreadPoolExecutor.Worker
对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>(;
。workQueue
是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue
队列中。private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none */ Worker(Runnable firstTask { setState(-1; // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory(.newThread(this; } /** Delegates main run loop to outer runWorker */ public void run( { runWorker(this; } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively( { return getState( != 0; } protected boolean tryAcquire(int unused { if (compareAndSetState(0, 1 { setExclusiveOwnerThread(Thread.currentThread(; return true; } return false; } protected boolean tryRelease(int unused { setExclusiveOwnerThread(null; setState(0; return true; } public void lock( { acquire(1; } public boolean tryLock( { return tryAcquire(1; } public void unlock( { release(1; } public boolean isLocked( { return isHeldExclusively(; } void interruptIfStarted( { Thread t; if (getState( >= 0 && (t = thread != null && !t.isInterrupted( { try { t.interrupt(; } catch (SecurityException ignore { } } } }
work对象的执行依赖于
runWorker(
,与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。final void runWorker(Worker w { Thread wt = Thread.currentThread(; Runnable task = w.firstTask; w.firstTask = null; w.unlock(; // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask( != null { w.lock(; // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(, STOP || (Thread.interrupted( && runStateAtLeast(ctl.get(, STOP && !wt.isInterrupted( wt.interrupt(; try { beforeExecute(wt, task; Throwable thrown = null; try { task.run(; } catch (RuntimeException x { thrown = x; throw x; } catch (Error x { thrown = x; throw x; } catch (Throwable x { thrown = x; throw new Error(x; } finally { afterExecute(task, thrown; } } finally { task = null; w.completedTasks++; w.unlock(; } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly; } }