线程池

线程池的作用

如果某业务场景需要创建大量的线程且线程实际的工作时间短暂,那么势必会耗费大量的资源在线程的创建、切换、销毁等上面。线程池的作用就是减少资源浪费,统筹管理多线程。java中的线程池为ThreadPoolExecutor。


线程池运行状态

线程池字段

1
2
3
4
5
6
7
8
9
10
11
//ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//表示位数,此时为29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//线程池内有效线程的数量 (workerCount)上线,大约是5亿

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
  • RUNNING:运行。能接受新的任务也能处理阻塞队列中的任务
  • SHUTDOWN:关闭。不接受新任务但能处理阻塞队列中的任务
  • STOP:停止。不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
  • TIDYING:整理。线程池进入该状态后会调用terminated() 方法进入终止态。
  • TERMINATED:终止。

状态转换如下:

RUNNING-(调用shutdown())->SHUTDOWN-(阻塞队列为空且工作线程数量为0)->TIDYING-(调用terminated方法)->TERMINATED

RUNNING-(调用shutdownNow())->STOP-(工作线程数量为0)->TIDYING-(调用terminated方法)->TERMINATED


ThreadPoolExecutor构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

corePoolSize:核心线程数量,当有新任务在execute()方法提交时,会执行以下判断:

  1. 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
  2. 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
  3. 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
  4. 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;

所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。

maximumPoolSize:最大线程数,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量。需要注意的是只有当workQueue队列填满时才会创建多于corePoolSize的线程

workQueue:阻塞队列,存放着等待执行的Runnable任务对象。它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列、延迟队列几种。

1
2
3
4
5
6
7
8
9
SynchronousQueue:同步队列,它没有容量,m每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。 使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;

ArrayBlockingQueue:数组阻塞队列,底层数据结构是数组,需要指定队列的大小。若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。

LinkedBlockingQueue:链式阻塞队列,底层数据结构是链表,默认大小是Integer.MAX_VALUE,也可以指定大小。线程池创建的最大线程数量就是corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时要特别注意内存泄漏问题。

PriorityBlockingQueue:基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),内部控制线程同步的锁采用的是非公平锁。它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

DelayQueue:延迟队列,是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注入其中的元素必须实现 java.util.concurrent.Delayed接口。

keepAliveTime:非核心线程最大存活时长。非核心线程如果处于闲置状态超过该值,就会被销毁.注意当corePoolSize=maxPoolSize时,keepAliveTime参数也就不起作用了(因为不存在非核心线程)

unit:keepAliveTime的单位。

threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。

handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:

  1. AbortPolicy:直接抛出异常,这是默认策略;
  2. CallerRunsPolicy:用调用者所在的线程来执行任务;
  3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  4. DiscardPolicy:直接丢弃任务;

ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()afterExecute()terminated()三个接口实现的,

1、beforeExecute:线程池中任务运行前执行

2、afterExecute:线程池中任务运行完毕后执行

3、terminated:线程池退出后执行


worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
  • firstTask用它来保存传入的任务
  • thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程

四种常见线程池

  • newCachedThreadPool可缓存线程池

这是一个线程只要空闲60秒就会被回收的线程池,适用于短时间高并发的处理业务,而在峰值过后并不会占用系统资源。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • newFixedThreadPool定长线程池

核心线程数量和总线程数量相等,都是传入的参数nThreads,因为LinkedBlockingQueue阻塞队列的大小默认是Integer.MAX_VALUE,如果使用不当,很可能导致内存溢出。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行,这是一个支持延时任务执行的线程池。

1
2
3
4
5
6
7
8
9
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
//注意这里使用延迟队列
new DelayedWorkQueue());
}
  • newSingleThreadExecutor

有且仅有一个核心线程的线程池( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}


版权声明:本文为博主原创文章,欢迎转载,转载请注明作者、原文超链接,感谢各位看官!!!

本文出自:monkeyGeek

座右铭:生于忧患,死于安乐

欢迎志同道合的朋友一起交流、探讨!

monkeyGeek
#

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×