1.使用
ExecutorService

CountDownLatch
的方法示例

在Java中,当我们使用线程池(如
ExecutorService
)来执行异步任务时,常常需要知道所有任务是否都已经完成。
ExecutorService
接口提供了几种方式来处理这种情况,但最常用的是
shutdown()

awaitTermination()
方法的组合,或者使用
Future

CompletionService
。这里我将提供一个使用
ExecutorService

CountDownLatch
的示例,因为
CountDownLatch
提供了一种直观的方式来等待一组线程完成。

首先,我们定义几个任务,然后使用
ExecutorService
来异步执行它们,并使用
CountDownLatch
来等待所有任务完成。

import java.util.concurrent.*;  
  
public class ThreadPoolExample {  
  
    public static void main(String[] args) throws InterruptedException {  
        // 创建一个包含固定数量线程的线程池  
        ExecutorService executorService = Executors.newFixedThreadPool(4);  
  
        // 定义任务数量  
        int taskCount = 10;  
  
        // 使用CountDownLatch来等待所有任务完成  
        final CountDownLatch latch = new CountDownLatch(taskCount);  
  
        // 提交任务到线程池  
        for (int i = 0; i < taskCount; i++) {  
            int taskId = i;  
            executorService.submit(() -> {  
                // 模拟任务执行  
                try {  
                    Thread.sleep(1000); // 假设每个任务需要1秒  
                } catch (InterruptedException e) {  
                    Thread.currentThread().interrupt();  
                }  
                System.out.println("任务 " + taskId + " 完成");  
                // 每完成一个任务,计数减一  
                latch.countDown();  
            });  
        }  
  
        // 等待所有任务完成  
        System.out.println("等待所有任务完成...");  
        latch.await(); // 阻塞当前线程,直到latch的计数达到零  
        System.out.println("所有任务完成!");  
  
        // 关闭线程池  
        executorService.shutdown();  
  
        // 可选:等待线程池中的线程都执行完毕  
        try {  
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {  
                // 线程池没有在规定时间内关闭,则强制关闭  
                executorService.shutdownNow();  
            }  
        } catch (InterruptedException e) {  
            // 当前线程在等待过程中被中断  
            executorService.shutdownNow();  
            Thread.currentThread().interrupt();  
        }  
    }  
}

在这个例子中,我们首先创建了一个固定大小的线程池(这里使用4个线程)。然后,我们定义了一个
CountDownLatch
,其计数被初始化为任务的数量(这里为10)。对于每个任务,我们都向线程池提交了一个
Runnable
,其中包含了任务的执行逻辑和
latch.countDown()
调用,以确保每次任务完成时都会减少
CountDownLatch
的计数。

主线程通过调用
latch.await()
来等待,直到所有任务都调用了
countDown()
(即计数达到零),然后才能继续执行。这确保了主线程会等待所有任务完成后再继续。

最后,我们关闭了线程池,并通过调用
awaitTermination()
来可选地等待线程池中的所有线程都执行完毕。如果线程池没有在指定时间内关闭,则调用
shutdownNow()
来尝试立即停止所有正在执行的任务。

这个示例提供了处理异步任务并等待它们完成的一种有效方式,适用于需要等待所有任务完成再继续的场景。

2.使用
ExecutorService

invokeAll
方法和
Future
列表的方法示例

除了使用
CountDownLatch
之外,还有其他方法可以判断线程池中的所有任务是否执行完成。以下是一个使用
ExecutorService

invokeAll
方法和
Future
列表的示例,这种方法适用于我们有一组已知的任务(
Callable
)需要并行执行,并且我们需要等待所有任务完成并获取它们的结果。

import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.*;  
  
public class ThreadPoolFutureExample {  
  
    public static void main(String[] args) throws InterruptedException, ExecutionException {  
        // 创建一个包含固定数量线程的线程池  
        ExecutorService executorService = Executors.newFixedThreadPool(4);  
  
        // 创建一个Callable任务列表  
        List<Callable<String>> tasks = new ArrayList<>();  
        for (int i = 0; i < 10; i++) {  
            final int taskId = i;  
            tasks.add(() -> {  
                // 模拟任务执行  
                Thread.sleep(1000); // 假设每个任务需要1秒  
                return "任务 " + taskId + " 完成";  
            });  
        }  
  
        // 使用invokeAll提交所有任务,这将返回一个Future列表  
        List<Future<String>> futures = executorService.invokeAll(tasks);  
  
        // 遍历Future列表,获取每个任务的结果  
        for (Future<String> future : futures) {  
            // get()会阻塞,直到对应的任务完成  
            System.out.println(future.get());  
        }  
  
        // 关闭线程池  
        executorService.shutdown();  
  
        // 可选:等待线程池中的线程都执行完毕  
        try {  
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {  
                // 线程池没有在规定时间内关闭,则强制关闭  
                executorService.shutdownNow();  
            }  
        } catch (InterruptedException e) {  
            // 当前线程在等待过程中被中断  
            executorService.shutdownNow();  
            Thread.currentThread().interrupt();  
        }  
    }  
}  
  
// 注意:这里使用了Lambda表达式和方法引用来简化Callable的创建  
// 实际使用中,你可能需要实现Callable接口或使用匿名内部类

在这个例子中,我们创建了一个
ExecutorService
和一个
Callable
任务列表。每个
Callable
任务都会返回一个字符串,表示任务完成的信息。我们使用
invokeAll
方法提交了所有任务,并立即获得了一个
Future
列表,每个
Future
都代表了一个任务的执行结果。

然后,我们遍历这个
Future
列表,并对每个
Future
调用
get()
方法。
get()
方法会阻塞当前线程,直到对应的任务完成并返回结果。这样,我们就能确保在继续执行之前,所有任务都已经完成。

最后,我们关闭了线程池,并等待所有线程都执行完毕(或超时后强制关闭)。

请注意,虽然这个示例使用了
Callable

Future
,但它并没有直接提供一个“是否所有任务都已完成”的布尔值。然而,通过遍历
Future
列表并调用
get()
,我们实际上已经达到了等待所有任务完成的效果。如果我们只需要知道是否所有任务都已开始执行(而不是等待它们完成),那么我们可能需要采用不同的策略,比如使用
execute
方法结合其他同步机制(如
CountDownLatch
)。

3.使用
ExecutorService
来异步执行多个
Callable
任务方法示例

以下是一个详细完整的代码示例,该示例使用了
ExecutorService
来异步执行多个
Callable
任务,并通过遍历
Future
列表来等待所有任务完成并获取它们的结果。

import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.*;  
  
public class ThreadPoolFutureCompleteExample {  
  
    public static void main(String[] args) {  
        // 创建一个包含固定数量线程的线程池  
        ExecutorService executorService = Executors.newFixedThreadPool(4);  
  
        // 创建一个Callable任务列表  
        List<Callable<String>> tasks = new ArrayList<>();  
        for (int i = 0; i < 10; i++) {  
            final int taskId = i;  
            tasks.add(new Callable<String>() {  
                @Override  
                public String call() throws Exception {  
                    // 模拟任务执行  
                    TimeUnit.SECONDS.sleep(1); // 假设每个任务需要1秒  
                    return "任务 " + taskId + " 完成";  
                }  
            });  
  
            // 或者使用Lambda表达式(如果你使用的是Java 8或更高版本)  
            // tasks.add(() -> {  
            //     TimeUnit.SECONDS.sleep(1);  
            //     return "任务 " + taskId + " 完成";  
            // });  
        }  
  
        try {  
            // 使用invokeAll提交所有任务,这将返回一个Future列表  
            List<Future<String>> futures = executorService.invokeAll(tasks);  
  
            // 遍历Future列表,获取每个任务的结果  
            for (Future<String> future : futures) {  
                // get()会阻塞,直到对应的任务完成  
                System.out.println(future.get());  
            }  
  
            // 关闭线程池  
            executorService.shutdown();  
  
            // 等待线程池中的所有线程都执行完毕(可选)  
            // 注意:由于我们已经调用了invokeAll并等待了所有Future的完成,这一步通常是多余的  
            // 但为了完整性,我还是展示了如何等待线程池关闭  
            boolean terminated = executorService.awaitTermination(60, TimeUnit.SECONDS);  
            if (!terminated) {  
                // 如果线程池没有在规定时间内关闭,则强制关闭  
                System.err.println("线程池没有在规定时间内关闭,尝试强制关闭...");  
                executorService.shutdownNow();  
                // 注意:shutdownNow()不保证已经提交的任务会被取消  
                // 它会尝试停止正在执行的任务,但已经开始执行的任务可能无法被中断  
            }  
  
        } catch (InterruptedException | ExecutionException e) {  
            // 处理异常  
            e.printStackTrace();  
  
            // 如果当前线程在等待过程中被中断,尝试关闭线程池  
            if (!executorService.isShutdown()) {  
                executorService.shutdownNow();  
            }  
  
            // 根据需要,可能还需要重新设置中断状态  
            Thread.currentThread().interrupt();  
        }  
    }  
}

在这个示例中,我使用了传统的匿名内部类来创建
Callable
任务(同时也提供了Lambda表达式的注释),以便与各种Java版本兼容。然而,如果我们正在使用Java 8或更高版本,我强烈推荐我们使用Lambda表达式来简化代码。

请注意,
invokeAll
方法会阻塞调用它的线程,直到所有任务都完成,或者直到等待超时(如果我们提供了超时时间)。但是,在这个示例中,我们没有为
invokeAll
提供超时时间,因此它会一直等待,直到所有任务都完成。

另外,请注意,在
catch
块中,如果捕获到
InterruptedException
,我们检查了线程池是否已经被关闭(使用
isShutdown
方法)。如果没有,我们调用
shutdownNow
方法来尝试关闭线程池并停止正在执行的任务。然而,需要注意的是,
shutdownNow
方法并不保证能够停止所有已经开始执行的任务,因为某些任务可能无法被中断。

最后,如果在捕获到
InterruptedException
后,我们确定当前线程需要被重新中断(比如,我们在一个循环中等待某个条件,而中断是用来退出循环的),那么我们应该调用
Thread.currentThread().interrupt()
来重新设置中断状态。在这个示例中,我们没有这样做,因为
main
方法不需要重新中断。但是,在更复杂的场景中,这可能是必要的。

标签: none

添加新评论