Java并发——线程池

线程池参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

一共七个参数:

  • corePoolSize 核心线程数量,任务队列未达到队列容量时,最大可以同时运行的线程数量
  • maximumPoolSize 最大线程数量,任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数
  • keepAliveTime:当线程数大于核心线程数时,多余的空闲线程存活的最长时间
  • unit:时间单位
  • workQueue:任务队列,新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中
  • threadFactory:线程工厂,用来创建线程,一般默认即可
  • handler:拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
线程池各个参数的关系

关于拒绝策略RejectedExecutionHandler handler:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

线程池原理分析

  1. 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。

  2. 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。

  3. 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。

  4. 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。

image-20250228163621662

线程池使用示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {

//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

常见内置线程池

常见阻塞队列

  1. LinkedBlockingQueue(无界队列):容量为Integer.MAX_VALUE。
  2. SynchronousQueue(同步队列):没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。
  3. DelayedWorkQueue(延迟阻塞队列):内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。队列元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE。

常见内置线程池

除了使用ThreadPoolExecutor executor = new ThreadPoolExecutor(…)的方法来创建线程池,还可以通过Excutors来创建Java内置的一些线程池,这些线程池本质上还是 new ThreadPoolExecutor(),但规定了不同的参数和使用了不同的阻塞队列。

《阿里巴巴 Java 开发手册》强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

FixedThreadPool

一个可重用固定线程数的线程池

  • corePoolSize 和 maximumPoolSize 都被设置为 nThreads
  • 使用的是无界队列LinkedBlockingQueue(由于容量永远不会满,所以即使超过核心线程数也永远不会再新建线程,因此使用这种队列的核心线程数就是最大线程数)
1
2
3
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());

具体的,一开始当前运行的线程数小于核心线程数, 来新任务时就创建新的线程来执行任务;当达到核心线程数后,新的任务会进入队列,队列永远不会满,线程池中的线程循环从队列中取出任务来执行。

运行中的 FixedThreadPool(未执行 shutdown()shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。

SingleThreadExcutor

只有一个线程的线程池,可以看作FixedThreadPool的单线程版本。

  • corePoolSize 和 maximumPoolSize 都被设置为 1
  • 其他参数与FixedThreadPool相同
1
2
3
4
5
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));

CachedThreadPool

  • corePoolSize = 0
  • maximumPoolSize = Integer.MAX_VALUE
  • 使用同步队列SynchronousQueue
  • keepAliveTime = 60 秒
1
2
3
4
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);

核心线程数为0,最大线程数为Integer.MAX_VALUE,使用无容量的SynchronousQueue,这就意味着每当有新的任务加入时,如果没有空闲线程,都会创建新的线程来执行。

这也意味着如果线程的处理速度赶不上提交任务的速度,CachedThreadPool可以不断创建出大量的线程,从而导致OOM(内存溢出)。

ScheduledThreadPool

  • corePoolSize
  • maximumPoolSize = Integer.MAX_VALUE
  • 使用延迟阻塞队列DelayedWorkQueue
  • keepAliveTime = 0
1
2
3
return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE,
0, NANOSECONDS,
new DelayedWorkQueue());

首先由于延迟阻塞队列DelayedWorkQueue可以不断扩容,永远不会满,因此最多只能创建核心线程数的线程。同时队列中按照延迟的时间长度进行排序。因此ScheduledThreadPool可以用来在给定的延迟后运行任务或者定期执行任务。

一些其他比较

Runnable vs Callable

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。

Runnable接口不会返回结果或抛出检查异常,但是Callable接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。

工具类 Executors可以实现将 Runnable对象转换成 Callable对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}

@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}

execute() vs submit()

execute()submit()是两种提交任务到线程池的方法,有一些区别:

返回值execute() 方法用于提交不需要返回值的任务。通常用于执行 Runnable 任务,无法判断任务是否被线程池成功执行。submit() 方法用于提交需要返回值的任务。可以提交 RunnableCallable 任务。submit() 方法返回一个 Future 对象,通过这个 Future 对象可以判断任务是否执行成功,并获取任务的返回值。

示例,通过future.get()方法获取返回,该方法会阻塞当前线程直到任务完成。还有一个get(long timeout,TimeUnit unit),多了一个超时时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executorService = Executors.newFixedThreadPool(3);

Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});

String s = submit.get();
System.out.println(s);
executorService.shutdown();

异常处理:在使用 submit() 方法时,可以通过 Future 对象处理任务执行过程中抛出的异常;而在使用 execute() 方法时,异常处理需要通过自定义的 ThreadFactory (在线程工厂创建线程的时候设置UncaughtExceptionHandler对象来 处理异常)或 ThreadPoolExecutorafterExecute() 方法来处理。

shutdown() vs shoutdownNow()

shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。

shutdownNow() :关闭线程池,线程池的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

isTerminated() vs isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

线程池最佳实践

  1. 线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用Executors 类创建线程池,会有 OOM 风险。也就是使用有界队列,控制线程创建数量。
  2. 检测线程池运行状态。ThreadPoolExecutor提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。
  3. 不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务。(一个案例:父子任务使用同一个线程池,父任务使用完线程资源,子任务一直被阻塞在队列,可能导致死锁)
  4. 给线程池命名(设置线程池名称前缀),有利于定位问题。
  5. 线程池尽量不要放耗时任务。线程池本身的目的是为了提高任务执行效率,避免因频繁创建和销毁线程而带来的性能开销。如果将耗时任务提交到线程池中执行,可能会导致线程池中的线程被长时间占用,无法及时响应其他任务,甚至会导致线程池崩溃或者程序假死。
  6. 别忘记显式地关闭线程池,释放线程资源。
  7. 线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值。
  8. 合理设置线程池参数。

对于最后一点,补充:

  • 如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。

  • 如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

有一个简单并且适用面比较广的公式:

  • CPU 密集型任务 (N): 这种任务消耗的主要是 CPU 资源,线程数应设置为 N(CPU 核心数)。由于任务主要瓶颈在于 CPU 计算能力,与核心数相等的线程数能够最大化 CPU 利用率,过多线程反而会导致竞争和上下文切换开销。
  • I/O 密集型任务(M * N): 这类任务大部分时间处理 I/O 交互,线程在等待 I/O 时不占用 CPU。 为了充分利用 CPU 资源,线程数可以设置为 M * N,其中 N 是 CPU 核心数,M 是一个大于 1 的倍数,建议默认设置为 2 ,具体取值取决于 I/O 等待时间和任务特点,需要通过测试和监控找到最佳平衡点。