线程池的作用 如果某业务场景需要创建大量的线程且线程实际的工作时间短暂,那么势必会耗费大量的资源在线程的创建、切换、销毁等上面。 线程池的作用就是减少资源浪费,统筹管理多线程。java中的线程池为ThreadPoolExecutor。
线程池运行状态 线程池字段
1 2 3 4 5 6 7 8 9 10 11 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING:运行。能接受新的任务也能处理阻塞队列中的任务
SHUTDOWN:关闭。不接受新任务但能处理阻塞队列中的任务
STOP:停止。不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
TIDYING:整理。线程池进入该状态后会调用terminated() 方法进入终止态。
TERMINATED:终止。
状态转换如下:
RUNNING-(调用shutdown())->SHUTDOWN-(阻塞队列为空且工作线程数量为0)->TIDYING-(调用terminated方法)->TERMINATED
RUNNING-(调用shutdownNow())->STOP-(工作线程数量为0)->TIDYING-(调用terminated方法)->TERMINATED
ThreadPoolExecutor构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
corePoolSize :核心线程数量,当有新任务在execute()方法提交时,会执行以下判断:
如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;
所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
maximumPoolSize :最大线程数,这个参数会根据你使用的workQueue
任务队列的类型,决定线程池会开辟的最大线程数量。需要注意的是只有当workQueue队列填满时才会创建多于corePoolSize的线程
workQueue :阻塞队列,存放着等待执行的Runnable
任务对象。它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列、延迟队列几种。
1 2 3 4 5 6 7 8 9 SynchronousQueue:同步队列,它没有容量,m每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。 使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略; ArrayBlockingQueue:数组阻塞队列,底层数据结构是数组,需要指定队列的大小。若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。 LinkedBlockingQueue:链式阻塞队列,底层数据结构是链表,默认大小是Integer.MAX_VALUE,也可以指定大小。线程池创建的最大线程数量就是corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时要特别注意内存泄漏问题。 PriorityBlockingQueue:基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),内部控制线程同步的锁采用的是非公平锁。它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。 DelayQueue:延迟队列,是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注入其中的元素必须实现 java.util.concurrent.Delayed接口。
keepAliveTime :非核心线程最大存活时长。非核心线程如果处于闲置状态超过该值,就会被销毁.注意当corePoolSize=maxPoolSize时,keepAliveTime参数也就不起作用了(因为不存在非核心线程)
unit :keepAliveTime的单位。
threadFactory :它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler :它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
AbortPolicy:直接抛出异常,这是默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
ThreadPoolExecutor扩展 ThreadPoolExecutor
扩展主要是围绕beforeExecute()
、afterExecute()
和terminated()
三个接口实现的,
1、beforeExecute
:线程池中任务运行前执行
2、afterExecute
:线程池中任务运行完毕后执行
3、terminated
:线程池退出后执行
worker类 线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
firstTask用它来保存传入的任务
thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程
四种常见线程池
newCachedThreadPool可缓存线程池
这是一个线程只要空闲60秒
就会被回收的线程池,适用于短时间高并发的处理业务,而在峰值过后并不会占用系统资源。
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
核心线程数量和总线程数量相等,都是传入的参数nThreads,因为LinkedBlockingQueue阻塞队列的大小默认是Integer.MAX_VALUE,如果使用不当,很可能导致内存溢出。
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
创建一个定长线程池,支持定时及周期性任务执行,这是一个支持延时任务 执行的线程池。
1 2 3 4 5 6 7 8 9 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue()); }
有且仅有一个核心线程的线程池( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
版权声明:本文为博主原创文章,欢迎转载,转载请注明作者、原文超链接,感谢各位看官!!!
本文出自: monkeyGeek
座右铭: 生于忧患,死于安乐
欢迎志同道合的朋友一起交流、探讨!
monkeyGeek