简介
说明
线程池是Java多线程常用的技术,本文介绍线程池的种类,用示例介绍其用法。
相关网址
Java多线程–阻塞队列(BlockingQueue)–使用/教程/实例 – 自学精灵
常见线程池种类
种类 | 核心线程数 | 最大线程数 | 队列长度 | 描述 |
固定大小(fixed) | n | n(虽然有但实际不起作用) | 无限 | |
单个(single) | 1 | 1 | 无限 | 只一个线程在工作,相当于单线程顺序串行执行所有任务。 |
定时(scheduled) | n | 无限大 | 无限 | 周期性执行任务 |
缓存(cached) | 0 | 无限大 | 0 | 动态增删线程数 |
创建的方法
方法 | 作用 | 说明 | ThreadPoolExecutor参数 |
Executors.newFixed ThreadPool | 创建固定大小的线程池。 | 提交一个任务创建一个线程,直到最大数。 线程池数一旦达到最大值就会保持不变。 若某线程因异常而结束,会有新线程替代。 | nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() //nThreads是传进来的参数 |
Executors.newSingle ThreadExecutor | 创建一个单线程的线程池。 | 只一个线程工作,相当于单线程串行执行。 若此线程因异常而结束,会有新线程替代。 保证任务的执行顺序按任务提交顺序执行。 | 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() |
Executors.newScheduled ThreadPool | 创建一个固定大小的定时线程池。 | 支持定时以及周期性执行任务的需求。 | corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue() //corePoolSize是传进来的参数 |
Executors.newSingleThread ScheduledExecutor | 创建一个单线程的定时线程池。 | 支持定时以及周期性执行任务的需求。 | return new DelegatedScheduledExecutorService (newScheduledThreadPoolExecutor(1)); |
Executors.newCached ThreadPool | 创建一个可缓存的线程池。 | 若线程池的数量超过了处理任务所需要的线程,就回收部分空闲(默认60秒不执行任务)的线程。 当任务数增加时,此线程池又可以智能的添加新线程来处理任务。 | 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>() |
这些方法的返回值是ExecutorService对象,该对象表示一个线程池,可以执行Runnable对象或者Callable对象代表的线程。
固定大小(fixed)
简介
execute流程
对上图的说明如下。
- 如果当前运行的线程数少于corePoolSize,则创建一个新线程来执行任务。
- 在线程池完成线程的创建之后,将任务加入Linked- BlockingQueue。
- 线程会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。
固定大小的线程池使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响:
- 线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
- 由于1,使用无界队列时maximumPoolSize将是一个无效参数。
- 由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
- 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或 shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。
实例
代码
package org.example.a; import java.util.concurrent.*; class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " " + "正在执行task "+taskNum); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " " + "--执行task "+taskNum + " 完毕!!!"); } } public class Demo { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); for(int i=0;i<6;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); } executor.shutdown(); } }
执行结果
pool-1-thread-2 正在执行task 1 pool-1-thread-3 正在执行task 2 pool-1-thread-1 正在执行task 0 pool-1-thread-3 --执行task 2 完毕!!! pool-1-thread-3 正在执行task 3 pool-1-thread-2 --执行task 1 完毕!!! pool-1-thread-2 正在执行task 4 pool-1-thread-1 --执行task 0 完毕!!! pool-1-thread-1 正在执行task 5 pool-1-thread-1 --执行task 5 完毕!!! pool-1-thread-3 --执行task 3 完毕!!! pool-1-thread-2 --执行task 4 完毕!!!
单个(single)
简介
execute流程
对上图的说明如下。
- 如果当前运行的线程数少于corePoolSize,则创建一个新线程来执行任务。
- 在线程池完线程的创建之后,将任务加入Linked- BlockingQueue。
- 线程会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。
实例
代码
package org.example.a; import java.util.concurrent.*; class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " " + "正在执行task "+taskNum); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " " + "--执行task "+taskNum + " 完毕!!!"); } } public class Demo { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor(); for(int i=0;i<6;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); } executor.shutdown(); } }
执行结果
pool-1-thread-1 正在执行task 0 pool-1-thread-1 --执行task 0 完毕!!! pool-1-thread-1 正在执行task 1 pool-1-thread-1 --执行task 1 完毕!!! pool-1-thread-1 正在执行task 2 pool-1-thread-1 --执行task 2 完毕!!! pool-1-thread-1 正在执行task 3 pool-1-thread-1 --执行task 3 完毕!!! pool-1-thread-1 正在执行task 4 pool-1-thread-1 --执行task 4 完毕!!! pool-1-thread-1 正在执行task 5 pool-1-thread-1 --执行task 5 完毕!!!
定时(scheduled)
简介
创建
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
- 此法获得的对象为ScheduledThreadPoolExecutor,它实现了ScheduledExecutorService,继承了ThreadPoolExecutor。所以,它比法1多了ThreadPoolExecutor的一些方法,例如:
- public boolean remove(Runnable task)
- public void purge()
- public int getActiveCount()
- 此法获得的对象为ScheduledThreadPoolExecutor,它实现了ScheduledExecutorService,继承了ThreadPoolExecutor。所以,它比法1多了ThreadPoolExecutor的一些方法,例如:
方法
方法 | 描述 |
schedule(Callable<V> callable, long delay, TimeUnit unit) | 创建任务:在给定延迟后只执行一次。 |
schedule(Runnable command, long delay, TimeUnit unit) | 创建任务:在给定延迟后只执行一次。 |
scheduleAtFixedRate(Runnable command, long initialDelay,long period, TimeUnitunit) | 创建任务:固定周期执行。在 initialDelay 后开始执行,然后在initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。 |
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit) | 创建任务:上个任务执行完后,固定周期执行。在 initialDelay 后开始执行,运行完后,延时period 后执行,执行完后,再延时 period 后执行,依此类推。 |
ScheduledThreadPoolExecutor与Timer对比
项 | Timer | ScheduledThreadPoolExecutor |
线程数量 | 单线程。 单线程执行所有TimerTask,若某TimerTask任务执行时间比较久,会影响其他任务的调度执行。 | 多线程。 重用线程池,某个ScheduledFutureTask任务执行的时间比较久,不会影响到其他任务的调度执行。 |
系统时间敏感度 | 对操作系统的时间敏感。 基于操作系统的绝对时间,若操作系统的时间改变,则Timer的调度不再精确。 | 不受操作系统时间改变的影响。 基于相对时间的,不受操作系统时间改变的影响。 |
是否捕获异常 | 不捕获TimerTask抛出的异常。 加上Timer又是单线程的。一旦某个调度任务出现异常,则整个线程就终止,其他需要调度的任务也不再执行。 | 不捕获异常。 基于线程池来实现调度功能,某个任务抛出异常后,其他任务仍能正常执行。 |
任务是否具备优先级 | 没有优先级。 只是按照系统的绝对时间来执行任务。 | 有优先级。 ScheduledFutureTask类实现了java.lang.Comparable接口和java.util.concurrent.Delayed接口。Comparable#compareTo方法实现了任务的比较,距离下次执行的时间间隔短的任务会排在前面。而Delayed#getDelay方法:能够返回距离下次任务执行的时间间隔。 |
是否支持对任务排序 | 不支持对任务的排序。 | 支持。 ScheduledThreadPoolExecutor类中定义了一个静态内部类DelayedWorkQueue,它是一个有序队列,为需要调度的每个任务按照距离下次执行时间间隔的大小来排序 |
能否获取返回的结果 | 不能。 TimerTask类只是实现了java.lang.Runnable接口,无法从TimerTask中获取返回的结果。 | 能。 ScheduledFutureTask类继承了FutureTask类,能够通过Future来获取返回的结果。 |
实例(单定时)
示例1:(间隔时间(3s)大于任务的执行时间(1s))
package com.knife.example.controller; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; class MyTask implements Runnable { @Override public void run() { try { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); System.out.println(sdf.format(new Date()) + " " + "正在执行task"); Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("--执行task " + " 完毕!!!"); } } public class Demo { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(new MyTask(), 0, 3, TimeUnit.SECONDS); // executor.scheduleWithFixedDelay(new MyTask(), 0, 3, TimeUnit.SECONDS); //不能加这句 //executor.shutdown(); } }
运行结果
scheduleAtFixedRate:可见,每3秒运行一次
09:21:38 正在执行task --执行task 完毕!!! 09:21:41 正在执行task --执行task 完毕!!! 09:21:44 正在执行task --执行task 完毕!!! 09:21:47 正在执行task --执行task 完毕!!! 09:21:50 正在执行task --执行task 完毕!!!
scheduleWithFixedDelay:可见,每4秒运行一次
09:22:31 正在执行task --执行task 完毕!!! 09:22:35 正在执行task --执行task 完毕!!! 09:22:39 正在执行task --执行task 完毕!!!
示例2 :(间隔时间(3s)小于任务的执行时间(4s))
package org.example.a; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; class MyTask implements Runnable { @Override public void run() { try { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); System.out.println(sdf.format(new Date()) + " " + "正在执行task"); Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("--执行task " + " 完毕!!!"); } } public class Demo { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedDelay(new MyTask(), 0, 3, TimeUnit.SECONDS); //executor.scheduleWithFixedDelay(new MyTask(), 0, 3, TimeUnit.SECONDS); //这句不要写 //executor.shutdown(); } }
运行结果
scheduleAtFixedRate:可见,此时间隔时间失效,会以任务执行时间为准。上一个任务一结束就开始下一个任务
22:58:53 正在执行task --执行task 完毕!!! 22:58:57 正在执行task --执行task 完毕!!! 22:59:01 正在执行task --执行task 完毕!!! 22:59:05 正在执行task --执行task 完毕!!! 22:59:09 正在执行task --执行task 完毕!!!
scheduleWithFixedRate:
23:08:31 正在执行task --执行task 完毕!!! 23:08:38 正在执行task --执行task 完毕!!! 23:08:45 正在执行task --执行task 完毕!!! 23:08:52 正在执行task --执行task 完毕!!! 23:08:59 正在执行task --执行task 完毕!!!
异常处理
若程序中捕获了异常,则不影响下一个任务执行,若不捕获,则会影响下一个任务执行。
示例1:不捕获异常
package org.example.a; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; class MyTask implements Runnable { int i = 0; int[] a = {1, 2, 3}; @Override public void run() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); System.out.println(sdf.format(new Date()) + " " + "正在执行task"); System.out.println(a[i++]); System.out.println("-------------------执行task " + " 完毕!!!"); } } public class Demo { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleWithFixedDelay(new MyTask(), 0, 1, TimeUnit.SECONDS); //这句不要写 //executor.shutdown(); } }
运行结果(异常后,直接卡死)
23:15:28 正在执行task 1 -------------------执行task 完毕!!! 23:15:29 正在执行task 2 -------------------执行task 完毕!!! 23:15:30 正在执行task 3 -------------------执行task 完毕!!! 23:15:31 正在执行task
示例2:捕获异常
package org.example.a; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; class MyTask implements Runnable { int i = 0; int[] a = {1, 2, 3}; @Override public void run() { try { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); System.out.println(sdf.format(new Date()) + " " + "正在执行task"); System.out.println(a[i++]); System.out.println("-------------------执行task " + " 完毕!!!"); } catch (Exception e) { e.printStackTrace(); } } } public class Demo { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleWithFixedDelay(new MyTask(), 0, 1, TimeUnit.SECONDS); //这句不要写 //executor.shutdown(); } }
执行结果(一直往下执行)
23:17:10 正在执行task 1 -------------------执行task 完毕!!! 23:17:11 正在执行task 2 -------------------执行task 完毕!!! 23:17:12 正在执行task 3 -------------------执行task 完毕!!! 23:17:13 正在执行task java.lang.ArrayIndexOutOfBoundsException: 3 at org.example.a.MyTask.run(Demo.java:15) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 23:17:14 正在执行task java.lang.ArrayIndexOutOfBoundsException: 4 at org.example.a.MyTask.run(Demo.java:15) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 23:17:15 正在执行task java.lang.ArrayIndexOutOfBoundsException: 5 at org.example.a.MyTask.run(Demo.java:15) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
缓存(cached)
简介
CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThreadPool的源代码。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue()); }
CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着 CachedThreadPool中的线程如果空闲(没有任务执行)超过60秒后将会被终止。
FixedThreadPool和SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
详解
CachedThreadPool的execute流程
对上图的说明如下。
首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步骤
当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤(1)将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。
在步骤(2)中新创建的线程将任务执行完后,会执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤(1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
前面提到过,SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图如图所示。
请先
!