CompletionService 是 Java 并发编程中的一个接口,位于 java.util.concurrent 包中。它的主要作用是将 任务的提交(submit)和 任务结果的获取(take/poll) 解耦,使得你可以更灵活地处理多个并发任务的结果。
简介
CompletionService 的核心思想是:当你提交多个异步任务后,可以按照这些任务完成的顺序来获取它们的结果,而不是按照提交的顺序。
常用方法:
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
- submit():用于提交任务。
- take():阻塞直到有一个任务完成,并返回其 Future。
- poll():非阻塞地尝试获取已完成的任务的 Future;如果没有就返回 null。
- poll(timeout, unit):在指定时间内等待任务完成。
为什么需要 CompletionService?
假设你有多个并行任务,比如下载多个网页或执行多个计算任务,你希望一旦某个任务完成就可以立即处理它,而不需要等待所有任务都完成。
在这种情况下,使用 ExecutorService + Future.get() 就不太方便,因为你必须按顺序调用 get(),可能要等待前面未完成的任务。
而 CompletionService 可以让你按照任务完成的顺序来处理结果,提升响应速度和资源利用率。
使用示例
简单的 CompletionService示例:
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// 提交多个任务
for (int i = 1; i <= 5; i++) {
final int taskId = i;
completionService.submit(() -> {
Thread.sleep((long) (Math.random() * 3000));
return "Task " + taskId + " completed";
});
}
// 获取结果(按完成顺序)
for (int i = 1; i <= 5; i++) {
Future<String> future = completionService.take(); // 阻塞直到有任务完成
System.out.println(future.get()); // 打印结果
}
executor.shutdown();
}
}
输出示例(顺序不一定与提交顺序一致):
Task 1 completed
Task 4 completed
Task 5 completed
Task 3 completed
Task 2 completed
实现原理简述
ExecutorCompletionService 是 CompletionService的标准实现类,内部维护了一个阻塞队列(BlockingQueue<Future<V>>),每当有任务完成时,就会把该任务的 Future 放入这个队列中。外部通过 take() 或 poll() 方法从队列中取出已完成的 Future。
应用场景
1. 并行搜索:比如同时向多个搜索引擎发起请求,哪个先返回就先显示。
2. 批量任务处理:如批量下载文件、图像处理等。
3. 超时控制:配合 poll(timeout, ...) 实现对任务的限时处理。
4. 优先级调度:虽然不直接支持优先级,但结合自定义队列可以实现。
注意事项
- 使用完 CompletionService 后记得关闭底层的 ExecutorService。
- 如果任务抛出异常,future.get() 会抛出 ExecutionException。
- 多线程环境下注意同步问题。