在当今高度并发的互联网软件开发领域,高效地管理和利用线程资源是提升程序性能的关键。Java 作为一种广泛应用于后端开发的编程语言,为我们提供了丰富的线程池实现方式。今天,就让我们深入探讨 Java 中线程池的实现方式,帮助广大互联网软件开发人员更好地优化自己的代码。
使用 Executors 工具类创建线程池
Executors 工具类为我们提供了便捷的方法来创建常见类型的线程池,大大简化了线程池的创建过程。
(一)固定大小线程池(Fixed Thread Pool)
通过
Executors.newFixedThreadPool(int nThreads)方法创建。其特点是线程池中的线程数始终保持固定的nThreads个。当有新任务提交时,如果有空闲线程,任务会立即被分配给空闲线程执行;若所有线程都在忙碌,新任务则会被放入任务队列中等待。这种线程池适用于任务量已知且相对固定的场景,比如服务器端处理固定数量并发请求的业务逻辑。它能够有效控制资源的使用,避免线程过多导致的上下文切换开销。例如:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
fixedThreadPool.submit(() -> {
// 具体任务逻辑
System.out.println("任务在固定大小线程池的线程 " + Thread.currentThread().getName() + " 中执行");
});
}
在这个例子中,我们创建了一个固定大小为 4 的线程池,并向其中提交了 10 个任务。这些任务会在 4 个固定的线程中依次或并发执行,具体取决于任务的执行时间和线程的繁忙程度。
(二)缓存线程池(Cached Thread Pool)
使用
Executors.newCachedThreadPool()创建。此线程池的线程数量会根据任务的提交情况动态调整。当有新任务提交时,如果有空闲线程则立即执行任务;若没有空闲线程,且当前线程数小于最大线程数(理论上为Integer.MAX_VALUE),则会创建新的线程来执行任务。而空闲线程如果在 60 秒内没有新任务,将会被终止并从缓存中移除。这种线程池适合执行大量短期异步任务的场景,比如处理大量的短连接请求。它能够快速响应任务,减少线程创建和销毁的开销。示例代码如下:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
cachedThreadPool.submit(() -> {
// 具体任务逻辑
System.out.println("任务在缓存线程池的线程 " + Thread.currentThread().getName() + " 中执行");
});
}
这里,我们创建了一个缓存线程池并提交 10 个任务。由于任务可能是短时间内大量提交,缓存线程池会根据需要灵活创建和回收线程,以高效处理这些任务。
(三)单线程池(Single Thread Executor)
通过
Executors.newSingleThreadExecutor()创建。该线程池只有一个线程,所有任务会按照提交的顺序依次执行,保证了任务的顺序性和互斥性。适用于需要按顺序执行任务的场景,例如一些对数据一致性要求较高的操作。如下代码所示:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
singleThreadExecutor.submit(() -> {
// 具体任务逻辑
System.out.println("任务在单线程池的线程 " + Thread.currentThread().getName() + " 中执行");
});
}
在这个单线程池的例子中,10 个任务会一个接一个地在唯一的线程中执行,不会出现并发冲突。
(四)调度线程池(Scheduled Thread Pool)
分为两种:
Executors.newScheduledThreadPool(int corePoolSize)创建一个固定大小的线程池,支持定时或周期性任务;
Executors.newSingleThreadScheduledExecutor()创建一个单线程的线程池,同样支持定时或周期性任务。以newScheduledThreadPool为例,它适用于需要定时或周期性执行任务的场景,如定时数据备份、定时发送消息提醒等。代码示例如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.scheduleAtFixedRate(() -> {
// 具体任务逻辑
System.out.println("定时任务在调度线程池的线程 " + Thread.currentThread().getName() + " 中执行");
}, 1, 3, TimeUnit.SECONDS);
(五)工作窃取线程池(Work Stealing Pool)
通过
Executors.newWorkStealingPool(int parallelism)创建(若不传入参数,默认使用 CPU 核心数作为并行度)。它使用ForkJoinPool实现,支持并行处理大量任务。线程池中的线程会尝试 “窃取” 其他线程队列中的任务来执行,从而充分利用多核处理器的性能。适用于需要并行处理大量任务的场景,例如大规模数据计算。示例代码如下:
ExecutorService workStealingPool = Executors.newWorkStealingPool();
for (int i = 0; i < 10; i++) {
workStealingPool.submit(() -> {
// 具体任务逻辑
System.out.println("任务在工作窃取线程池的线程 " + Thread.currentThread().getName() + " 中执行");
});
}
在工作窃取线程池中,各个线程会积极地从其他线程的任务队列中窃取任务执行,提高整体的任务处理效率。
直接使用 ThreadPoolExecutor 创建线程池
虽然 Executors 工具类创建线程池很方便,但在某些场景下,我们需要更精细地控制线程池的行为,这时就可以直接使用ThreadPoolExecutor类。ThreadPoolExecutor提供了多个重载的构造方法,其中参数最全的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数,是线程池中保持活动的最小线程数,即使这些线程处于空闲状态。当有新任务提交时,如果当前线程数小于corePoolSize,即使有空闲线程,线程池也会创建一个新线程来处理任务。
- maximumPoolSize:线程池中允许的最大线程数,包括核心线程和非核心线程。当任务队列已满且当前线程数小于maximumPoolSize时,线程池会创建新的线程来处理任务。
- keepAliveTime:用于控制线程池中非核心线程的生命周期。如果超过这个时间没有新任务,非核心线程将被终止。
- unit:keepAliveTime参数的时间单位,如TimeUnit.SECONDS表示秒。
- workQueue:任务队列,当所有核心线程都在忙碌时(任务数超过了corePoolSize),任务首先会被放入这个队列中,然后由工作线程从队列中取出并执行。常见的任务队列有LinkedBlockingQueue(基于链表的无界阻塞队列)、ArrayBlockingQueue(基于数组的有界阻塞队列)、SynchronousQueue(不存储元素的阻塞队列,每个插入操作必须等待一个相应的移除操作)等。
- threadFactory:用于创建新线程的工厂,通过它可以定制新创建线程的属性,比如名称、优先级等。
- rejectedExecutionHandler:拒绝策略,当任务队列已满且线程数达到maximumPoolSize时,新提交的任务将被拒绝,由拒绝策略来处理这些被拒绝的任务。常见的拒绝策略有AbortPolicy(默认策略,直接抛出RejectedExecutionException异常)、CallerRunsPolicy(将被拒绝的任务交给调用者线程来执行)、DiscardPolicy(直接丢弃被拒绝的任务)、DiscardOldestPolicy(丢弃任务队列中最老的任务,然后尝试提交新任务)。
例如,我们创建一个自定义的线程池:
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService threadPool = new ThreadPoolExecutor(
2,
4,
10,
unit,
workQueue,
Executors.defaultThreadFactory(),
handler);
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> {
// 具体任务逻辑
System.out.println("任务在线程 " + Thread.currentThread().getName() + " 中执行");
});
}
在这个例子中,我们创建了一个核心线程数为 2,最大线程数为 4,非核心线程存活时间为 10 秒,任务队列容量为 2,采用CallerRunsPolicy拒绝策略的线程池。通过这种方式,我们可以根据具体业务场景灵活调整线程池的各项参数,以达到最优的性能。
ForkJoinPool—— 并行处理的利器
自 Java 7 引入的ForkJoinPool,是为解决并行计算问题而生的特殊线程池。它专为 “分治”(Fork/Join)并行计算设计,能够将一个大任务分解成多个小任务并行处理,然后把结果合并起来。
(一)工作原理
分治思想:
- 拆分(Fork):把一个大任务拆成若干个小任务。例如,在计算一个大数组的总和时,可以将数组拆分成多个小数组,每个小数组由一个子任务负责计算。
- 执行:当任务足够小时直接计算。比如,当拆分后的小数组元素数量较少时,直接计算该小数组的和。
- 合并(Join):把所有小任务的结果合并起来。将各个小数组的计算结果相加,得到最终大数组的总和。
工作窃取(Work - Stealing)机制:
- 每个工作线程都有自己的双端队列(deque)存储任务。
- 线程从自身队列尾部获取任务(后进先出,LIFO),从其他线程队列头部 “窃取” 任务(先进先出,FIFO)。
- 每个工作线程的任务队列是线程私有的,自身取任务(尾部 LIFO)和窃取任务(头部 FIFO)均通过无锁的 CAS 操作实现,避免了全局锁开销,这是ForkJoinPool高并发效率的关键原因。通过这种机制,确保所有线程都保持忙碌状态,避免资源浪费。
(二)适用场景
计算密集型任务:如矩阵运算、图像处理、排序算法等。这些任务通常需要大量的计算资源,通过并行处理可以充分利用多核 CPU 的性能,显著提高计算速度。
可拆分的问题:任务能被拆分为相互独立的子任务。只有任务可以合理拆分,才能发挥ForkJoinPool的优势。
合并成本低:子任务结果合并的时间远小于计算时间。如果合并结果的操作过于复杂或耗时,会抵消并行计算带来的优势。
任务规模可调:可以控制任务的拆分粒度。根据实际情况调整任务的拆分程度,以达到最佳的性能。
(三)与 ThreadPoolExecutor 的区别
ThreadPoolExecutor适合处理独立的任务,每个任务之间没有依赖关系;而ForkJoinPool适合处理可以分解的计算密集型任务,通过分治和工作窃取机制提高并行处理效率。
(四)示例代码
以使用ForkJoinPool实现归并排序为例:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class ParallelMergeSort extends RecursiveAction {
private static final int THRESHOLD = 1000;
private final int[] array;
private final int start;
private final int end;
public ParallelMergeSort(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
// 直接进行插入排序
for (int i = start + 1; i < end; i++) {
int value = array[i];
int j = i - 1;
while (j >= start && array[j] > value) {
array[j + 1] = array[j];
j--;
}
array[j + 1] = value;
}
} else {
int mid = (start + end) / 2;
ParallelMergeSort leftTask = new ParallelMergeSort(array, start, mid);
ParallelMergeSort rightTask = new ParallelMergeSort(array, mid, end);
leftTask.fork();
rightTask.compute();
leftTask.join();
merge(start, mid, end);
}
}
private void merge(int start, int mid, int end) {
int[] temp = new int[end - start];
int i = start;
int j = mid;
int k = 0;
while (i < mid && j < end) {
if (array[i] <= array[j]) {
temp[k++] = array[i++];
} else {
temp[k++] = array[j++];
}
}
while (i < mid) {
temp[k++] = array[i++];
}
while (j < end) {
temp[k++] = array[j++];
}
System.arraycopy(temp, 0, array, start, temp.length);
}
public static void main(String[] args) {
int[] array = {5, 4, 6, 2, 7, 1, 3};
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new ParallelMergeSort(array, 0, array.length));
for (int num : array) {
System.out.print(num + " ");
}
}
}
在这个例子中,我们通过ForkJoinPool实现了并行归并排序。将大数组不断拆分成小数组进行排序,最后合并结果,大大提高了排序效率。
CompletableFuture—— 异步任务的高层次抽象
CompletableFuture提供了异步任务的更高层次抽象,并支持链式处理和组合。它可以与线程池结合使用,进一步提升异步任务的处理能力。例如:
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture.supplyAsync(() -> {
// 异步任务逻辑
return "任务结果";
}, executorService)
.thenApply(result -> {
// 对结果进行处理
return result + " 经过处理";
})
.thenAccept(System.out::println);
在这段代码中,我们首先创建了一个固定大小为 4 的线程池,然后使用
CompletableFuture.supplyAsync方法提交一个异步任务到线程池中执行。任务执行完成后,通过thenApply方法对结果进行转换,最后使用thenAccept方法消费最终的结果。通过这种方式,我们可以方便地管理和组合多个异步任务,提高代码的可读性和可维护性。
综上所述,Java 中线程池的实现方式多种多样,每种方式都有其适用的场景和特点。在实际的互联网软件开发中,我们需要根据具体的业务需求、任务类型和系统资源等因素,选择合适的线程池实现方式,以实现高效的并发编程,提升软件系统的性能和稳定性。希望本文能为大家在使用 Java 线程池方面提供有益的参考和帮助。