醋醋百科网

Good Luck To You!

Java并发工具:CompletionService

CompletionService 是 Java 并发编程中的一个接口,位于 java.util.concurrent 包中。它的主要作用是将 任务的提交(submit)和 任务结果的获取(take/poll) 解耦,使得你可以更灵活地处理多个并发任务的结果。


简介

CompletionService 的核心思想是:当你提交多个异步任务后,可以按照这些任务完成的顺序来获取它们的结果,而不是按照提交的顺序。

常用方法:

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;


- submit():用于提交任务。

- take():阻塞直到有一个任务完成,并返回其 Future。

- poll():非阻塞地尝试获取已完成的任务的 Future;如果没有就返回 null。

- poll(timeout, unit):在指定时间内等待任务完成。


为什么需要 CompletionService?

假设你有多个并行任务,比如下载多个网页或执行多个计算任务,你希望一旦某个任务完成就可以立即处理它,而不需要等待所有任务都完成。

在这种情况下,使用 ExecutorService + Future.get() 就不太方便,因为你必须按顺序调用 get(),可能要等待前面未完成的任务。

而 CompletionService 可以让你按照任务完成的顺序来处理结果,提升响应速度和资源利用率。


使用示例

简单的 CompletionService示例:

import java.util.concurrent.*;

public class CompletionServiceExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

        // 提交多个任务
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            completionService.submit(() -> {
                Thread.sleep((long) (Math.random() * 3000));
                return "Task " + taskId + " completed";
            });
        }

        // 获取结果(按完成顺序)
        for (int i = 1; i <= 5; i++) {
            Future<String> future = completionService.take(); // 阻塞直到有任务完成
            System.out.println(future.get()); // 打印结果
        }

        executor.shutdown();
    }
}


输出示例(顺序不一定与提交顺序一致):

Task 1 completed
Task 4 completed
Task 5 completed
Task 3 completed
Task 2 completed


实现原理简述

ExecutorCompletionService 是 CompletionService的标准实现类,内部维护了一个阻塞队列(BlockingQueue<Future<V>>),每当有任务完成时,就会把该任务的 Future 放入这个队列中。外部通过 take() 或 poll() 方法从队列中取出已完成的 Future。


应用场景

1. 并行搜索:比如同时向多个搜索引擎发起请求,哪个先返回就先显示。

2. 批量任务处理:如批量下载文件、图像处理等。

3. 超时控制:配合 poll(timeout, ...) 实现对任务的限时处理。

4. 优先级调度:虽然不直接支持优先级,但结合自定义队列可以实现。


注意事项

- 使用完 CompletionService 后记得关闭底层的 ExecutorService。

- 如果任务抛出异常,future.get() 会抛出 ExecutionException。

- 多线程环境下注意同步问题。

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言