SpringBoot线程池和Java线程池的实现原理

科技资讯 投稿 6400 0 评论

SpringBoot线程池和Java线程池的实现原理

使用默认的线程池

方式一:通过@Async注解调用

public class AsyncTest {
    @Async
    public void async(String name throws InterruptedException {
        System.out.println("async" + name + " " + Thread.currentThread(.getName(;
        Thread.sleep(1000;
    }
}

启动类上需要添加@EnableAsync注解,否则不会生效。

@SpringBootApplication
//@EnableAsync
public class Test1Application {
   public static void main(String[] args throws InterruptedException {
      ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args;
      AsyncTest bean = run.getBean(AsyncTest.class;
      for(int index = 0; index <= 10; ++index{
         bean.async(String.valueOf(index;
      }
   }
}

方式二:直接注入 ThreadPoolTaskExecutor

需要加上 @EnableAsync注解

@SpringBootTest
class Test1ApplicationTests {

   @Resource
   ThreadPoolTaskExecutor threadPoolTaskExecutor;

   @Test
   void contextLoads( {
      Runnable runnable = ( -> {
         System.out.println(Thread.currentThread(.getName(;
      };

      for(int index = 0; index <= 10; ++index{
         threadPoolTaskExecutor.submit(runnable;
      }
   }

}

线程池默认配置信息

SpringBoot线程池的常见配置:

spring:
  task:
    execution:
      pool:
        core-size: 8
        max-size: 16                          # 默认是 Integer.MAX_VALUE
        keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
        allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
        queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
      shutdown:
        await-termination: false              # 线程关闭等待
      thread-name-prefix: task-               # 线程名称的前缀

SpringBoot 线程池的实现原理

TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor(方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(;,并通过initialize(方法将AbortPolicy传入initializeExecutor(中。

TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutortaskExecutor

// TaskExecutionAutoConfiguration#applicationTaskExecutor(
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
      AsyncAnnotationBeanPostProcessor.DEFAUL
          T_TASK_EXECUTOR_BEAN_NAME }
@ConditionalOnMissingBean(Executor.class
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder {
   return builder.build(;
}
// ThreadPoolTaskExecutor#initializeExecutor(
@Override
protected ExecutorService initializeExecutor(
      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler {

   BlockingQueue<Runnable> queue = createQueue(this.queueCapacity;

   ThreadPoolExecutor executor;
   if (this.taskDecorator != null {
      executor = new ThreadPoolExecutor(
            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
            queue, threadFactory, rejectedExecutionHandler {
         @Override
         public void execute(Runnable command {
            Runnable decorated = taskDecorator.decorate(command;
            if (decorated != command {
               decoratedTaskMap.put(decorated, command;
            }
            super.execute(decorated;
         }
      };
   }
   else {
      executor = new ThreadPoolExecutor(
            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
            queue, threadFactory, rejectedExecutionHandler;

   }

   if (this.allowCoreThreadTimeOut {
      executor.allowCoreThreadTimeOut(true;
   }

   this.threadPoolExecutor = executor;
   return executor;
}
// ExecutorConfigurationSupport#initialize(
public void initialize( {
   if (logger.isInfoEnabled( {
      logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "";
   }
   if (!this.threadNamePrefixSet && this.beanName != null {
      setThreadNamePrefix(this.beanName + "-";
   }
   this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler;
}

覆盖默认的线程池

覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

@Configuration
public class ThreadPoolConfiguration {

    @Bean("taskExecutor"
    public ThreadPoolTaskExecutor taskExecutor( {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(;
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10;
        taskExecutor.setMaxPoolSize(50;
        taskExecutor.setQueueCapacity(200;
        taskExecutor.setKeepAliveSeconds(60;
        taskExecutor.setThreadNamePrefix("myExecutor--";
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true;
        taskExecutor.setAwaitTerminationSeconds(60;
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy(;
        //初始化线程池
        taskExecutor.initialize(;
        return taskExecutor;
    }
}

管理多个线程池

如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

@Bean("taskExecutor2"
public ThreadPoolTaskExecutor taskExecutor2( {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(;
    //设置线程池参数信息
    taskExecutor.setCorePoolSize(10;
    taskExecutor.setMaxPoolSize(50;
    taskExecutor.setQueueCapacity(200;
    taskExecutor.setKeepAliveSeconds(60;
    taskExecutor.setThreadNamePrefix("myExecutor2--";
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true;
    taskExecutor.setAwaitTerminationSeconds(60;
    //修改拒绝策略为使用当前线程执行
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy(;
    //初始化线程池
    taskExecutor.initialize(;
    return taskExecutor;
}

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

@Resource
ThreadPoolTaskExecutor taskExecutor2;

对于使用@Async注解的多线程则在注解中指定bean的名字即可。

    @Async("taskExecutor2"
    public void async(String name throws InterruptedException {
        System.out.println("async" + name + " " + Thread.currentThread(.getName(;
        Thread.sleep(1000;
    }

线程池的四种拒绝策略

JAVA常用的四种线程池

ThreadPoolExecutor 类的构造函数如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(, defaultHandler;
}

newCachedThreadPool

不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>(;

newFixedThreadPool

定长线程池,超出线程数的任务会在队列中等待。

return new ThreadPoolExecutor(nThreads, nThreads,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>(;

newScheduledThreadPool

类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

public ScheduledThreadPoolExecutor(int corePoolSize {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(;
}

周期执行:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5;
scheduledThreadPool.scheduleAtFixedRate((->{
   System.out.println("rate";
}, 1, 1, TimeUnit.SECONDS;

延时执行:

scheduledThreadPool.schedule((->{
   System.out.println("delay 3 seconds";
}, 3, TimeUnit.SECONDS;

newSingleThreadExecutor

单线程线程池,可以实现线程的顺序执行。

public static ExecutorService newSingleThreadExecutor( {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(;
}

Java 线程池中的四种拒绝策略

    CallerRunsPolicy:线程池让调用者去执行。

  • AbortPolicy:如果线程池拒绝了任务,直接报错。

  • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

  • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {
 
    public CallerRunsPolicy( { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e {
        if (!e.isShutdown( {
            r.run(;
        }
    }
}

效果类似于:

Runnable thread = (->{
   System.out.println(Thread.currentThread(.getName(;
   try {
      Thread.sleep(0;
   } catch (InterruptedException e {
      throw new RuntimeException(e;
   }
};

thread.run(;

AbortPolicy

直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

public static class AbortPolicy implements RejectedExecutionHandler {
 
    public AbortPolicy( { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e {
        throw new RejectedExecutionException("Task " + r.toString( +
                                             " rejected from " +
                                             e.toString(;
    }
}

DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {
 
    public DiscardPolicy( { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e {
    }
}

DiscardOldestPolicy

    e.getQueue(.poll( : 取出队列最旧的任务。

  • e.execute(r : 当前任务入队。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 
    public DiscardOldestPolicy( { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e {
        if (!e.isShutdown( {
            e.getQueue(.poll(;
            e.execute(r;
        }
    }
}

java 线程复用的原理

java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>(;workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none
     */
    Worker(Runnable firstTask {
        setState(-1; // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory(.newThread(this;
    }

    /** Delegates main run loop to outer runWorker  */
    public void run( {
        runWorker(this;
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively( {
        return getState( != 0;
    }

    protected boolean tryAcquire(int unused {
        if (compareAndSetState(0, 1 {
            setExclusiveOwnerThread(Thread.currentThread(;
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused {
        setExclusiveOwnerThread(null;
        setState(0;
        return true;
    }

    public void lock(        { acquire(1; }
    public boolean tryLock(  { return tryAcquire(1; }
    public void unlock(      { release(1; }
    public boolean isLocked( { return isHeldExclusively(; }

    void interruptIfStarted( {
        Thread t;
        if (getState( >= 0 && (t = thread != null && !t.isInterrupted( {
            try {
                t.interrupt(;
            } catch (SecurityException ignore {
            }
        }
    }
}

work对象的执行依赖于 runWorker(,与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

final void runWorker(Worker w {
    Thread wt = Thread.currentThread(;
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(; // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask( != null {
            w.lock(;
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(, STOP ||
                 (Thread.interrupted( &&
                  runStateAtLeast(ctl.get(, STOP &&
                !wt.isInterrupted(
                wt.interrupt(;
            try {
                beforeExecute(wt, task;
                Throwable thrown = null;
                try {
                    task.run(;
                } catch (RuntimeException x {
                    thrown = x; throw x;
                } catch (Error x {
                    thrown = x; throw x;
                } catch (Throwable x {
                    thrown = x; throw new Error(x;
                } finally {
                    afterExecute(task, thrown;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock(;
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly;
    }
}

编程笔记 » SpringBoot线程池和Java线程池的实现原理

赞同 (27) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽