万字详解 java 线程池,让我们的高并发程序更稳健

JAVA 2023-08-07 18:45:28
209阅读

一、线程池的概念和作用

什么是线程池?

线程池是一种管理和复用线程资源的机制,它可以在应用程序中创建一组预先初始化的线程,以执行多个任务。线程池维护着一个线程队列,其中包含一定数量的空闲线程。当有新的任务到达时,线程池中的空闲线程会立即执行任务,而不需要每次都创建和销毁线程,从而提高了应用程序的性能和效率。

使用线程池的主要目的是为了避免频繁地创建和销毁线程带来的开销,同时有效地控制并发线程的数量,防止过多的线程造成系统资源的浪费和线程切换的开销。

线程池的优势

  • 提高系统性能:线程池可避免频繁地创建和销毁线程,减少了线程创建和上下文切换的开销。
  • 提高响应速度:线程池中的空闲线程可以立即执行新任务,减少了任务等待的时间。
  • 控制并发资源:通过调整线程池的参数,可以限制并发线程的数量,防止系统资源被过度占用。
  • 提供管理和监控机制:线程池提供了对线程的统一管理和监控,可以方便地管理线程的状态、执行情况和异常处理。

线程池的原理和工作方式

线程池通常由以下几个组件构成:

  1. 线程池管理器(ThreadPoolExecutor):负责整个线程池的创建、销毁和管理,并根据任务的提交情况动态地调整线程数量。
  2. 工作线程(Worker Threads):实际执行任务的线程,它们被线程池所管理,可以复用,并且在任务执行完毕后进入空闲状态等待新的任务。
  3. 任务队列(Task Queue):用于存放未执行的任务,线程池会从任务队列中获取任务,然后交给空闲线程执行。
  4. 线程池参数(Thread Pool Parameters):包括核心线程数量、最大线程数量、线程存活时间等参数,用于控制线程池的行为和性能。

线程池的工作流程如下:

  1. 初始化线程池,创建指定数量的工作线程,并将它们添加到线程池中。
  2. 当有任务提交到线程池时,线程池会从任务队列中获取一个任务,并把该任务分配给空闲的工作线程来执行。
  3. 如果当前没有空闲线程,且工作线程的数量未达到最大线程数,则会创建新的线程来执行任务。
  4. 如果任务队列已满,且工作线程的数量已达到最大线程数,根据预先设定的拒绝策略来处理无法执行的任务。
  5. 工作线程执行完任务后,会返回线程池并等待新的任务到来。
  6. 当线程池不再需要执行任务时,可以关闭线程池,释放资源。

线程池是多线程编程中一个重要的概念,通过有效地复用线程以及合理地管理并发线程的数量,可以提高程序的性能、响应速度和资源利用率。

二、创建线程池

使用 Executors 类创建线程池

Executors 类是 Java 标准库中提供的一个工具类,用于创建和管理线程池。它提供了一些静态方法,可以方便地创建不同类型的线程池。

  1. 创建固定大小的线程池(FixedThreadPool):该线程池会创建一个固定数量的线程来执行任务,如果线程池中的线程都处于忙碌状态,新的任务会进入任务队列等待执行。
 
 
ExecutorService executor = Executors.newFixedThreadPool(int nThreads);

其中,nThreads 指定了线程池中的线程数量。

  1. 创建单线程的线程池(SingleThreadExecutor):该线程池只会创建一个线程来执行任务,保证所有任务按照指定顺序(FIFO)执行。
ExecutorService executor = Executors.newSingleThreadExecutor();
  1. 创建可缓存的线程池(CachedThreadPool):该线程池根据需要创建新的线程来执行任务,并且会在线程空闲一段时间后自动回收线程资源。
ExecutorService executor = Executors.newCachedThreadPool();
  1. 创建定时任务的线程池(ScheduledThreadPool):该线程池可用于执行定时任务和周期性任务,可以设置延迟执行任务、定时执行任务、以及按固定频率执行任务。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(int corePoolSize);

其中,corePoolSize 指定了核心线程的数量。

通过上述方法创建的线程池都实现了 ExecutorService 接口,可以使用该接口提供的方法来提交任务、关闭线程池等操作。

例如,可以使用 execute(Runnable command) 方法提交一个任务给线程池执行:

executor.execute(new Runnable() {
    public void run() {
        // 任务的具体逻辑
    }
});

还可以通过调用 shutdown() 方法关闭线程池,使其不再接受新任务,并且会等待已提交的任务执行完毕后再关闭:

executor.shutdown();

Executors 类提供了便捷的方法用于创建线程池,简化了线程池的创建过程,同时也提供了灵活的配置选项,方便根据应用场景选择合适的线程池类型。但需要注意的是,对于长时间运行的应用程序,建议手动创建线程池并进行精细的配置,以满足应用程序的性能和资源管理需求。

自定义线程池的参数设置和创建过程

自定义线程池可以通过 ThreadPoolExecutor 类来实现,它提供了更灵活的参数设置和创建过程。

  1. 创建 ThreadPoolExecutor 对象:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue
);
  • corePoolSize:指定线程池的核心线程数量,即线程池中最小的线程数。
  • maximumPoolSize:指定线程池的最大线程数量,即线程池中最大允许创建的线程数。
  • keepAliveTime:指定超过核心线程数量的空闲线程的存活时间,即当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程在一定时间内未被使用,则会被销毁。
  • unit:指定 keepAliveTime 的时间单位,可选择 TimeUnit.SECONDS、TimeUnit.MILLISECONDS 等。
  • workQueue:指定任务队列,用于存放还未执行的任务。
  1. 设置拒绝策略(可选): 如果线程池的任务队列已满且无法再接受新任务时,可以通过设置拒绝策略来处理无法执行的任务。ThreadPoolExecutor 提供了四种预定义的拒绝策略:
  • AbortPolicy(默认):直接抛出 RejectedExecutionException 异常,拒绝执行新任务。
  • CallerRunsPolicy:将任务返回给调用者执行,由提交任务的线程来执行该任务。
  • DiscardPolicy:直接丢弃无法执行的任务,没有任何异常抛出。
  • DiscardOldestPolicy:丢弃队列中最旧的任务,然后重新尝试执行新任务。

可以通过 setRejectedExecutionHandler() 方法来设置拒绝策略:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
  1. 可选地设置线程工厂(可选): 线程工厂用于创建新的线程,默认使用默认的线程工厂(DefaultThreadFactory)。如果需要自定义线程工厂,可以实现 ThreadFactory 接口,并通过 setThreadFactory() 方法设置:
executor.setThreadFactory(runnable -> new Thread(runnable, "MyThread"));

三、线程池的核心组件:

ThreadPoolExecutor 类的介绍

ThreadPoolExecutor 是 Java 中用于创建和管理线程池的类。它是 ExecutorService 接口的一个具体实现,提供了灵活的线程池参数配置和任务执行管理。

ThreadPoolExecutor 类的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

参数说明:

  • corePoolSize:线程池的核心线程数。当有新任务提交到线程池时,如果当前线程数小于 corePoolSize,会立即创建新线程来处理任务;
  • maximumPoolSize:线程池允许的最大线程数。当队列已满并且当前线程数小于 maximumPoolSize,会创建新线程来处理任务;
  • keepAliveTime:多余的空闲线程的存活时间,即当线程池中的线程数量大于 corePoolSize 时,多余的空闲线程在一定时间内未被使用,则会被销毁;
  • unit:keepAliveTime 的时间单位;
  • workQueue:用于存放待执行任务的阻塞队列。

ThreadPoolExecutor 提供了以下方法来管理线程池:

  1. execute(Runnable command):提交一个 Runnable 任务给线程池执行,该方法是非阻塞的,任务会被添加到工作队列中等待执行。

  2. submit(Callable task):提交一个 Callable 任务给线程池执行,并返回一个表示任务结果的 Future 对象。

  3. shutdown():平滑地关闭线程池。线程池将不再接受新任务,但会等待已提交的任务执行完毕后再关闭。

  4. shutdownNow():立即关闭线程池。线程池将尝试中断正在执行的任务,并返回未执行的任务列表。

  5. setCorePoolSize(int corePoolSize):设置线程池的核心线程数。

  6. setMaximumPoolSize(int maximumPoolSize):设置线程池的最大线程数。

  7. setKeepAliveTime(long keepAliveTime, TimeUnit unit):设置空闲线程的存活时间。

  8. setThreadFactory(ThreadFactory threadFactory):设置线程工厂,用于创建新线程。

  9. setRejectedExecutionHandler(RejectedExecutionHandler handler):设置拒绝策略,当工作队列已满且无法接受新任务时,决定如何处理无法执行的任务。

  10. getQueue():获取线程池使用的工作队列。

  11. getActiveCount():获取当前活动线程的数量。

  12. getCompletedTaskCount():获取已完成的任务数量。

  13. getTaskCount():获取已提交的任务数量。

  14. isShutdown():判断线程池是否已经关闭。

  15. isTerminated():判断线程池是否已经终止。

核心线程池大小、最大线程池大小和线程存活时间的设置

在 ThreadPoolExecutor 中,核心线程池大小(corePoolSize)、最大线程池大小(maximumPoolSize)和线程存活时间(keepAliveTime)是非常重要的参数,它们可以根据实际需求来进行设置。

核心线程池大小(corePoolSize):

核心线程池大小指定了线程池中保持活动状态的线程数量。当有新任务提交给线程池时,如果当前线程数小于核心线程池大小,会立即创建新线程来处理任务。如果线程池中的线程数量已经达到核心线程池大小,新任务就会被放入工作队列中等待执行。

核心线程池大小的设置需要根据任务的类型和负载情况进行合理的估计。如果任务执行时间较长或者任务的数量较多,可以适当增加核心线程池大小以提高任务的响应速度。

最大线程池大小(maximumPoolSize):

最大线程池大小指定了线程池允许的最大线程数量。当工作队列已满并且当前线程数小于最大线程池大小,会创建新线程来处理任务。如果工作队列已满且当前线程数已达到最大线程池大小,则根据拒绝策略来处理无法执行的任务。

最大线程池大小的设置需要根据系统资源、任务的性质和负载情况进行合理调整。如果任务数量经常超过核心线程池大小,可以适当增加最大线程池大小来处理峰值任务。

线程存活时间(keepAliveTime):

线程存活时间指定了多余的空闲线程在被销毁前的存活时间。当线程池中的线程数量超过核心线程池大小时,多余的空闲线程会在一定时间内未被使用时被销毁。

线程存活时间的设置可以根据任务的持续时间和业务需求来进行调整。如果任务的执行时间较短,可以将线程存活时间设置为较短的时间,以便及时释放多余的线程资源。如果任务的执行时间较长,可以将线程存活时间设置为较长的时间,以避免频繁地创建和销毁线程。

需要注意的是,在 ThreadPoolExecutor 中,默认情况下,空闲的线程只有在核心线程池大小被占满后,才会根据线程存活时间来销毁多余的空闲线程。可以通过调用 allowCoreThreadTimeOut(boolean value) 方法来设置核心线程是否也可以被回收。

任务队列和拒绝策略的选择

任务队列和拒绝策略是 ThreadPoolExecutor 中非常重要的概念,对于线程池的性能和稳定性起着至关重要的作用。

  1. 任务队列: 任务队列(workQueue)用于存放待执行的任务。当线程池中的线程数已达到核心线程池大小时,新任务会被放入任务队列中等待执行,直到队列已满。线程池提供了多种类型的任务队列,常见的有以下几种:
  • 直接提交队列(SynchronousQueue):这是一个没有容量的队列,每次只能存放一个任务。如果当前线程池中没有可用的线程来执行任务,则会立即创建一个新线程。适用于负载较轻且任务执行时间短的场景。

  • 有界队列(ArrayBlockingQueue、LinkedBlockingQueue):这些队列有固定的容量,可以存放一定数量的任务。一旦队列已满,新任务就会等待,直到有线程可用来执行。可根据任务量和系统资源进行合理设置。ArrayBlockingQueue 是一个基于数组的有界队列,LinkedBlockingQueue 是一个基于链表的有界(或无界)队列。

  • 无界队列(LinkedBlockingQueue):这是一个容量可选的队列,可以存放任意数量的任务。只要系统资源允许,任务就会被添加到队列中,直到队列耗尽系统内存。适用于任务量较大且系统资源充足的场景。

根据具体需求和场景,我们可以选择适合的任务队列类型来管理待执行的任务。

  1. 拒绝策略: 当任务队列已满且线程池中的线程数已达到最大线程池大小时,新任务无法被添加到队列中。这时就需要使用拒绝策略来决定如何处理无法执行的任务。ThreadPoolExecutor 提供了多种预定义的拒绝策略,以及自定义拒绝策略的选项。
  • AbortPolicy(默认):直接抛出 RejectedExecutionException 异常,阻止任务的提交。

  • CallerRunsPolicy:将任务返回给调用者进行处理。在交互式应用程序中,这个策略可能会导致主线程被阻塞,但它可以保证不会丢失任何任务。

  • DiscardPolicy:默默地丢弃无法执行的任务,不提供任何反馈。

  • DiscardOldestPolicy:丢弃队列中最旧的任务,然后尝试再次提交新任务。

除了预定义的拒绝策略,还可以通过实现 RejectedExecutionHandler 接口来自定义拒绝策略,根据业务需求进行特定处理。

四、线程池的任务执行:

Callable 和 Future 的使用

Callable 和 Future 是 Java 多线程编程中用于处理带有返回结果的任务的接口。通过 Callable 接口可以定义具有返回值的任务,而 Future 接口则用来表示任务的执行结果。

  1. 定义 Callable 任务: 首先,需要定义一个实现了 Callable 接口的任务类。Callable 接口中只有一个方法 call(),该方法用于执行任务并返回一个结果。
import java.util.concurrent.Callable;

class MyCallableTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        // 执行任务逻辑并返回结果
        return "任务执行结果";
    }
}
  1. 提交 Callable 任务到线程池: 使用 ThreadPoolExecutor 的 submit() 方法来提交 Callable 任务到线程池中执行,并返回一个 Future 对象,用于表示任务的执行结果。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

ExecutorService executor = Executors.newFixedThreadPool(1);
Callable<String> task = new MyCallableTask();
Future<String> future = executor.submit(task);
  1. 处理任务执行结果: 可以通过 Future 对象来获取任务的执行结果。Future 接口提供了多个方法用于处理任务执行结果:
  • get():阻塞当前线程,直到任务执行完成并返回结果。如果任务尚未完成,get() 方法会一直等待。可以通过 get(long timeout, TimeUnit unit) 方法设置等待时间。
String result = future.get();  // 获取任务执行结果
  • isDone():判断任务是否已经完成。如果任务完成,即使抛出异常,也返回 true。
boolean isDone = future.isDone();  // 判断任务是否完成
  • cancel():尝试取消任务的执行。如果任务还没有开始执行,则会成功取消任务;如果任务已经开始执行或已经完成,则无法取消任务。
boolean isCancelled = future.cancel(true);  // 尝试取消任务
  • isCancelled():判断任务是否已经被取消。
boolean isCancelled = future.isCancelled();  // 判断任务是否被取消

需要注意的是,调用 get() 方法会阻塞当前线程,直到任务执行完成并返回结果。如果不想阻塞当前线程,可以使用 isDone() 方法先判断任务是否已经完成,或者使用 get(long timeout, TimeUnit unit) 方法设置等待超时时间。

通过 Callable 和 Future 接口,可以很方便地定义具有返回值的任务,并且获取任务的执行结果。这在需要进行耗时计算、并发请求数据等场景下非常有用。

执行定时任务和周期性任务

在 Java 中,可以使用 java.util.concurrent 包中的 ScheduledExecutorService 接口来执行定时任务和周期性任务。ScheduledExecutorService 提供了以下几种方法来提交定时任务和周期性任务:

  1. schedule(Runnable task, long delay, TimeUnit unit): 在给定的延迟之后执行一次任务。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
    // 执行任务逻辑
};
executor.schedule(task, 5, TimeUnit.SECONDS);  // 延迟 5 秒后执行任务
  1. schedule(Callable<V> task, long delay, TimeUnit unit): 在给定的延迟之后执行一次 Callable 任务,并返回一个 Future 对象,用于获取任务的执行结果。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Callable<String> task = () -> {
    // 执行任务逻辑并返回结果
    return "任务执行结果";
};
ScheduledFuture<String> future = executor.schedule(task, 5, TimeUnit.SECONDS);  // 延迟 5 秒后执行任务
// 获取任务执行结果
String result = future.get();
  1. scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit): 在给定的初始延迟之后开始周期性地执行任务,每次执行之间固定等待给定的时间间隔。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
    // 执行任务逻辑
};
executor.scheduleAtFixedRate(task, 1, 5, TimeUnit.SECONDS);  // 初始延迟 1 秒后开始执行任务,每次执行之间间隔 5 秒
  1. scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit): 在给定的初始延迟之后开始周期性地执行任务,每次任务执行完成之后等待给定的时间间隔再执行下一次。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
    // 执行任务逻辑
};
executor.scheduleWithFixedDelay(task, 1, 5, TimeUnit.SECONDS);  // 初始延迟 1 秒后开始执行任务,每次任务完成后等待 5 秒再执行下一次

需要注意的是,以上方法中的时间参数可以使用 TimeUnit 枚举类来指定时间单位,例如 TimeUnit.SECONDS 表示秒。此外,ScheduledExecutorService 还提供了 shutdown() 方法来关闭线程池,在不再需要执行定时任务和周期性任务时释放资源。

executor.shutdown();

五、线程池的状态和监控:

线程池的运行状态和转换过程

线程池的运行状态和转换过程主要由 ThreadPoolExecutor 类中的 ctl (控制状态)字段来管理。该字段使用一个 int 类型的变量来表示线程池的状态和活动线程数量。

线程池的运行状态包括以下几种:

  1. RUNNING(运行中):

    • 线程池初始状态,接受新任务并处理已添加到队列中的任务。
    • 对应的值是:RUNNING = -1 << COUNT_BITS(高 29 位为 0,低 3 位为 111)
  2. SHUTDOWN(关闭中):

    • 不再接受新任务,但会继续处理已添加到队列中的任务。
    • 对应的值是:SHUTDOWN = 0 << COUNT_BITS(高 29 位为 0,低 3 位为 000)
  3. STOP(停止中):

    • 不再接受新任务,也不会处理已添加到队列中的任务,同时中断正在执行的任务。
    • 对应的值是:STOP = 1 << COUNT_BITS(高 29 位为 0,低 3 位为 001)
  4. TIDYING(整理中):

    • 所有任务已经终止,活动线程数量为 0,进入该状态后会执行 terminated() 方法进行一些后续操作。
    • 对应的值是:TIDYING = 2 << COUNT_BITS(高 29 位为 0,低 3 位为 010)
  5. TERMINATED(已终止):

    • 线程池彻底终止,处于该状态后,线程池将不再改变状态。
    • 对应的值是:TERMINATED = 3 << COUNT_BITS(高 29 位为 0,低 3 位为 011)

转换过程如下:

  1. 初始状态:RUNNING

  2. shutdown() 方法被调用,进入 SHUTDOWN 状态:

    • 不再接受新任务,但会继续处理已添加到队列中的任务。
  3. shutdownNow() 方法被调用,进入 STOP 状态:

    • 不再接受新任务,也不会处理已添加到队列中的任务,中断正在执行的任务。
  4. 所有任务完成后,活动线程数量为 0,进入 TIDYING 状态:

    • 执行 terminated() 方法,进行一些后续操作, 如关闭线程池。
  5. 线程池彻底终止,进入 TERMINATED 状态:

    • 处于该状态后,线程池将不再改变状态。

监控线程池的活动线程数、任务队列状态

要监控线程池的活动线程数和任务队列状态,可以使用 ThreadPoolExecutor 类提供的一些方法和属性。

  1. 活动线程数:

    • 使用 getActiveCount() 方法可以获取线程池中当前活动的线程数量。
    • 这个方法返回的是当前正在执行任务的线程数量,不包括处于空闲状态的线程。
  2. 任务队列状态:

    • 使用 getQueue() 方法可以获取线程池使用的任务队列。
    • LinkedBlockingQueueThreadPoolExecutor 默认使用的队列实现,它是一个无界阻塞队列。
    • 可以通过 size() 方法获取队列中的任务数量。
    • 如果你使用的是其他类型的队列,可以根据相应的方法来获取队列状态。

示例代码如下所示:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,   // 核心线程数
    maximumPoolSize,   // 最大线程数
    keepAliveTime, TimeUnit.SECONDS,   // 线程空闲时间
    new LinkedBlockingQueue<Runnable>()   // 任务队列
);

// 获取活动线程数
int activeThreadCount = executor.getActiveCount();

// 获取任务队列状态
BlockingQueue<Runnable> queue = executor.getQueue();
int taskCount = queue.size();
int remainingCapacity = queue.remainingCapacity();  // 队列剩余容量

注意,以上方法获取的活动线程数和任务队列状态是即时的。在多线程的环境下,这些值可能会发生变化。因此,如果需要监控线程池的活动线程数和任务队列状态,应该定期调用这些方法来获取最新的值。

使用监视器接口和钩子方法进行监控和自定义操作

在 Java 中,可以使用监视器接口(Monitor Interface)和钩子方法(Hook Method)对线程池进行监控和自定义操作。

  1. 监视器接口:

    • 监视器接口定义了回调方法,用于在特定事件发生时通知监听者。
    • Java 提供了 ThreadPoolExecutor 类的子类 ThreadPoolExecutorMBean,它实现了 java.util.concurrent.ThreadPoolExecutorMBean 接口。
    • 通过注册监视器接口的实现类,可以在线程池的各个阶段收到事件通知。
  2. 钩子方法:

    • 钩子方法是指在特定时间点被调用的方法,可以被子类重写以自定义一些操作。
    • Java 提供了 ThreadPoolExecutor 类的三个可重写的钩子方法:beforeExecute()afterExecute()terminated()
    • 这些钩子方法允许你在线程池的每个任务执行之前、之后以及线程池终止时进行一些自定义操作。

下面是对这些接口和方法的详细介绍:

  1. 监视器接口的使用方法:

    • 创建一个实现了 java.util.concurrent.ThreadPoolExecutorMBean 接口的类,并实现其中的回调方法。
    • 将该实现类注册到 MBean Server,通过 MBean Server 可以获取和管理线程池的相关信息。
    • 可以收到的监视器事件包括线程池的状态转换、活动线程数的变化和任务队列的状态等。
  2. 钩子方法的使用方法:

    • beforeExecute(Thread, Runnable):在每个任务执行之前被调用。
      • 可以用于记录任务的执行信息、设置一些上下文环境等。
    • afterExecute(Runnable, Throwable):在每个任务执行之后被调用。
      • 可以用于资源清理、日志记录、异常处理等。
    • terminated():在线程池终止时被调用。
      • 可以用于执行一些线程池终止后的操作,例如释放资源、发送通知等。

示例代码如下所示:

public class MyThreadPool extends ThreadPoolExecutor {

    public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // 在任务执行之前进行一些自定义操作
        System.out.println("Task " + r.toString() + " is about to execute.");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 在任务执行之后进行一些自定义操作
        System.out.println("Task " + r.toString() + " has finished.");
        if (t != null) {
            System.out.println("An exception occurred during task execution: " + t.getMessage());
        }
    }

    @Override
    protected void terminated() {
        // 在线程池终止时进行一些自定义操作
        System.out.println("ThreadPool has terminated.");
    }
}

// 创建线程池实例
MyThreadPool threadPool = new MyThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>());

// 提交任务
threadPool.execute(task);

// 关闭线程池
threadPool.shutdown();

通过重写钩子方法,可以在适当的时间点执行自定义操作。这样可以实现线程池的监控和一些额外的动作,比如记录日志、异常处理、资源释放等。

六、并发工具类和线程池的结合:

CountDownLatch 的使用

CountDownLatch(倒计时门闩)是 Java 并发包中一个同步工具类,它可以让一个或多个线程在达到指定的计数之前等待。

CountDownLatch 主要有两个方法:

  1. countDown() 方法:

    • 每次调用 countDown() 方法,计数器就减少 1。
    • 当计数器减少到 0 时,所有等待的线程将被释放。
  2. await() 方法:

    • 调用 await() 方法的线程会等待,直到计数器减少到 0。
    • 如果计数器已经为 0,则会立即返回。

CountDownLatch 的使用场景:

  • 主线程等待所有子线程完成才继续执行。
  • 协调多个并发操作,确保某个操作在其他操作完成之后再执行。

下面是一个使用 CountDownLatch 和线程池的示例:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Example {

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

        // 提交任务到线程池
        for (int i = 0; i < threadCount; i++) {
            executorService.execute(new Worker(latch));
        }

        // 等待所有任务完成
        latch.await();

        // 关闭线程池
        executorService.shutdown();

        System.out.println("All workers have completed their tasks.");
    }

    static class Worker implements Runnable {
        private final CountDownLatch latch;

        public Worker(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                // 模拟任务执行
                Thread.sleep((long) (Math.random() * 5000));
                System.out.println("Worker thread has completed its task.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 完成任务后调用 countDown() 方法
                latch.countDown();
            }
        }
    }
}

在上面的示例中,我们创建了一个包含 5 个线程的线程池,并将任务提交到线程池中。每个任务完成时,会调用 countDown() 方法来减少计数器的值。主线程调用 latch.await() 来等待所有任务完成,当计数器减少到 0 时,主线程继续执行,并输出 "All workers have completed their tasks."。

需要注意的是,在使用线程池时,我们需要在任务完成后手动调用 countDown() 方法,以确保计数器递减。另外,在使用完线程池后,应该调用 shutdown() 方法来关闭线程池。

通过使用 CountDownLatch 和线程池,我们可以方便地实现任务之间的同步和协作,使得主线程能够在所有任务完成之后继续执行。

CyclicBarrier 的使用

CyclicBarrier(循环屏障)是 Java 并发包中的一个同步工具类,它可以让一组线程在达到指定数量之前等待,并且可以重复使用。

CyclicBarrier 主要有两个构造方法:

  1. CyclicBarrier(int parties):指定需要等待的线程数量,即 parties 数量。
  2. CyclicBarrier(int parties, Runnable barrierAction):除了指定需要等待的线程数量外,还可以指定当所有线程都达到屏障时需要执行的动作。

CyclicBarrier 主要有两个方法:

  1. await():调用线程在屏障处等待,直到 parties 数量的线程都调用了该方法。
  2. await(long timeout, TimeUnit unit):与 await() 方法类似,但是可以指定等待的最长时间。

CyclicBarrier 的使用场景:

  • 在并行任务中,某个任务需要等待其他所有任务都完成后才能继续执行,类似于等待其他线程的结果。
  • 在任务分解和计算的场景中,将大任务拆分成多个子任务,每个子任务分别执行,然后等待所有子任务完成后,进行结果汇总。

下面是一个使用 CyclicBarrier 的示例:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Example {

    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("All parties have reached the barrier.");
        });

        for (int i = 0; i < parties; i++) {
            new Thread(new Worker(barrier)).start();
        }
    }

    static class Worker implements Runnable {
        private final CyclicBarrier barrier;

        public Worker(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                // 模拟任务执行
                Thread.sleep((long) (Math.random() * 5000));
                System.out.println(Thread.currentThread().getName() + " has reached the barrier.");

                // 等待其他线程到达屏障
                barrier.await();

                System.out.println(Thread.currentThread().getName() + " continues to execute.");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

在上面的示例中,我们创建了一个 CyclicBarrier,指定需要等待的线程数量为 3,并且提供了一个在所有线程都达到屏障时执行的动作。然后,我们创建了 3 个线程(parties 数量),每个线程都执行一些任务,并在某个点处调用 barrier.await() 方法等待其他线程。当所有线程都达到屏障时,动作将被执行。

需要注意的是,CyclicBarrier 是可以重复使用的。每次所有线程都达到屏障后,计数器会被重置,并且新一轮的等待开始。

通过使用 CyclicBarrier,我们可以实现多个线程之间的同步,使得这些线程可以在达到指定数量之前等待,并在达到数量后继续执行后续操作。这对于任务拆分和结果合并的场景非常有用。

Semaphore 的使用

Semaphore(信号量)是 Java 并发包中的一个同步工具类,它可以用于控制同时访问某个资源的线程数量。

Semaphore 通过维护一组许可证(permits),来限制同时访问某个资源的线程数量。线程可以通过 acquire() 方法获取许可证,如果许可证数量不足,则线程将被阻塞。线程在使用完资源后,可以通过 release() 方法释放许可证,使得其他等待许可证的线程可以继续执行。

Semaphore 主要有两个构造方法:

  1. Semaphore(int permits):指定初始许可证的数量。
  2. Semaphore(int permits, boolean fair):除了指定初始许可证的数量外,还可以指定公平性,即是否按照先进先出的顺序分配许可证。

Semaphore 主要有三个核心方法:

  1. acquire():获取一个许可证,如果没有可用的许可证,则调用线程将被阻塞。
  2. acquire(int permits):获取指定数量的许可证,如果没有足够的许可证,则调用线程将被阻塞。
  3. release():释放一个许可证。
  4. release(int permits):释放指定数量的许可证。

Semaphore 的使用场景:

  • 控制同时访问某个资源的线程数量,例如连接池、线程池等。
  • 实现限流操作,限制某个操作的并发数。

下面是一个使用 Semaphore 的示例:

import java.util.concurrent.Semaphore;

public class Example {

    public static void main(String[] args) {
        int permits = 2; // 允许同时访问的线程数量
        Semaphore semaphore = new Semaphore(permits);

        for (int i = 0; i < 5; i++) {
            new Thread(new Worker(semaphore)).start();
        }
    }

    static class Worker implements Runnable {
        private final Semaphore semaphore;

        public Worker(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                // 获取许可证
                semaphore.acquire();
                
                // 模拟任务执行
                System.out.println(Thread.currentThread().getName() + " is executing.");

                // 释放许可证
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在上面的示例中,我们创建了一个 Semaphore,指定初始许可证的数量为 2。然后,我们创建了 5 个线程,每个线程在执行任务之前通过 semaphore.acquire() 方法获取许可证,如果没有可用的许可证,则线程将被阻塞。任务执行完毕后,线程通过 semaphore.release() 方法释放许可证。

需要注意的是,Semaphore 的许可证数量会动态变化,acquire() 方法获取许可证会使计数减少,而 release() 方法释放许可证会使计数增加。如果在某个时刻许可证数量为 0 且所有线程都在等待,那么一旦有一个线程释放了许可证,将会有一个等待的线程获取到许可证。

通过使用 Semaphore,我们可以有效地限制同时访问某个资源的线程数量,从而实现并发控制和限流操作。

CompletableFuture 的使用

CompletableFuture 是 Java 并发包中的一个类,它提供了一种简化异步编程的方式,可以用于执行异步任务并处理任务结果。

CompletableFuture 可以通过编程方式创建、组合和执行异步任务,同时支持链式操作和回调函数。它可以在任务完成时触发后续的操作,也可以在所有任务完成时触发某个动作。使用 CompletableFuture 可以更加方便地处理异步任务的结果,避免了传统的回调风格代码编写的复杂性。

CompletableFuture 的主要特性和用法如下:

  1. 创建 CompletableFuture

    • CompletableFuture.runAsync(Runnable runnable):创建一个 CompletableFuture 并异步执行给定的 Runnable 任务。
    • CompletableFuture.supplyAsync(Supplier<U> supplier):创建一个 CompletableFuture 并异步执行给定的 Supplier 任务,并返回计算结果。
  2. 异步任务完成时的操作

    • thenApply(Function<? super T,? extends U> fn):在上一步任务完成后应用给定的函数,并返回一个新的 CompletableFuture。
    • thenAccept(Consumer<? super T> action):在上一步任务完成后应用给定的消费函数,没有返回值。
    • thenRun(Runnable action):在上一步任务完成后执行给定的 Runnable,没有返回值。
  3. 异步任务完成时的回调函数

    • whenComplete(BiConsumer<? super T,? super Throwable> action):在上一步任务完成后执行给定的回调函数,无论是否发生异常。
    • exceptionally(Function<Throwable,? extends T> fn):在上一步任务发生异常时执行给定的回调函数,并返回一个新的 CompletableFuture。
  4. 组合多个 CompletableFuture

    • thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn):当两个 CompletableFuture 完成时,应用给定的函数处理结果,并返回一个新的 CompletableFuture。
    • thenCompose(Function<? super T,? extends CompletionStage<U>> fn):在上一步任务完成后,将结果传递给给定的函数并返回一个新的 CompletableFuture。
  5. 等待所有 CompletableFuture 完成

    • allOf(CompletableFuture<?>... cfs):返回一个新的 CompletableFuture,当所有给定的 CompletableFuture 完成时触发动作。
    • anyOf(CompletableFuture<?>... cfs):返回一个新的 CompletableFuture,当任意一个给定的 CompletableFuture 完成时触发动作。

下面是一个使用 CompletableFuture 的示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Example {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " World!");

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
        System.out.println(combinedFuture.get());
    }
}

在上面的示例中,我们创建了两个异步任务的 CompletableFuture,分别返回字符串 "Hello" 和 " World!"。然后,我们通过 thenCombine() 方法将这两个 CompletableFuture 组合起来,当两个 CompletableFuture 都完成时,将它们的结果拼接并返回。最后,我们通过 get() 方法来获取最终的计算结果。

需要注意的是,CompletableFuture 是可变的,它允许我们在异步任务完成后对结果进行一系列的操作,包括数据转换、回调函数和多个 CompletableFuture 的组合等操作。它提供了一种更加简洁和灵活的方式来处理异步任务的结果和流程控制。

七、Java 8+ 中的并行流和线程池:

并行流的概念和工作方式

并行流是 Java 8 引入的一种用于并行处理集合数据的工具。它可以将一个顺序流(Sequential Stream)转换为一个并行流(Parallel Stream),从而利用多线程来加速集合数据的处理。

并行流的概念和工作方式如下:

  1. 概念: 并行流是对集合数据的一种并行处理方式,它将集合分割成多个小块,并使用多个线程同时处理这些小块的数据。通过并行处理,可以提高处理大量数据的效率。

  2. 工作方式: 首先,在使用并行流之前,需要将集合数据转换为一个顺序流。可以通过 stream() 方法或者集合类中的方法如 parallelStream() 来获取一个顺序流。顺序流是一种逐个处理元素的流,它的操作是串行执行的。

    然后,通过调用 parallel() 方法将顺序流转换为一个并行流。并行流的操作会在多个线程中同时进行,以实现对集合数据的并行处理。

    并行流的工作方式是将数据分成多个小块,并将每个小块分配给不同的线程进行处理。每个线程独立地处理自己分配到的小块,并将处理结果合并起来,最终得到整体的处理结果。这样就可以充分利用多核处理器和多线程来加速集合数据的处理过程。

    并行流的具体工作流程如下:

    • 将集合数据分割成多个小块。
    • 为每个小块创建一个任务,并将任务分配给不同的线程进行处理。
    • 线程独立地处理自己分配到的小块,生成中间结果。
    • 合并所有线程的中间结果,得到最终的处理结果。

需要注意的是,并行流的使用需要注意以下几点:

  • 并行流适用于处理大量数据、复杂的计算或者耗时的操作,对于处理简单的任务,顺序流更加高效。
  • 在使用并行流时,需要考虑数据的分割和合并对性能的影响。如果数据量太小或者数据分割和合并开销过大,可能导致并行流的性能不如顺序流。
  • 在使用并行流时,需要注意共享状态的问题。并行流操作是多线程同时进行的,因此对共享状态的修改需要进行同步或使用并发容器来保证线程安全。

使用线程池提高并行流的性能

使用线程池可以进一步提高并行流的性能。线程池是一种管理和复用线程的机制,它可以有效地控制并发线程的数量,并提供线程的重用和调度功能,从而减少线程创建和销毁的开销。

下面是详细介绍如何使用线程池提高并行流的性能:

  1. 创建线程池: 首先,需要创建一个线程池来管理并发执行的任务。可以通过 Executors.newFixedThreadPool() 方法来创建一个固定大小的线程池,或者使用 Executors.newCachedThreadPool() 方法来创建一个根据需要自动调整大小的线程池。

  2. 使用线程池并行执行任务: 通过并行流的 parallelStream() 方法将顺序流转换为并行流,并在其中使用线程池来执行并行任务。可以通过 withExecutor() 方法将自定义的线程池传递给并行流。

    例如,可以这样使用线程池来执行并行流操作:

    ExecutorService executorService = Executors.newFixedThreadPool(4);
    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
    
    list.parallelStream()
        .withExecutor(executorService)
        .forEach(System.out::println);
    
    executorService.shutdown();
    
  3. 控制线程池的大小: 通过设置线程池的大小,可以控制并行流中同时执行任务的线程数量。线程池的大小取决于可用的处理器核心数量、可用内存以及任务的特性。

    一般来说,如果任务是 CPU 密集型的,线程池的大小可以设置为处理器核心数。而如果任务是 IO 密集型的,可以根据实际情况适当增加线程池的大小以提高并行度。

  4. 关闭线程池: 在使用完线程池后,需要调用 shutdown() 方法来关闭线程池,释放资源。可以在所有任务完成后手动关闭线程池,或者使用 shutdownNow() 方法立即关闭线程池。

使用线程池可以提高并行流的性能的原因如下:

  • 线程池可以重用线程,避免了线程创建和销毁的开销,减少了系统资源的消耗。
  • 线程池可以控制并发线程的数量,避免了同时启动大量线程造成的线程调度开销和资源竞争。
  • 线程池可以提供线程的调度和管理功能,使得并发任务的执行更加可控和灵活。

 

作者:蜀山剑客李沐白
链接:https://juejin.cn/post/7264383071318491197
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
the end
免责声明:本文不代表本站的观点和立场,如有侵权请联系本站删除!本站仅提供信息存储空间服务。