高手的存在,就是让服务10亿人的时候,你感觉只是为你一个人服务......

java线程池

目录
  1. 1. 使用线程池的好处
  2. 2. ThreadPoolExecutor
    1. 2.1. 简单的例子:
    2. 2.2. 线程池的创建
    3. 2.3. 线程任务的执行与终止
      1. 2.3.1. 执行过程
  3. 3. 常见线程池
    1. 3.1. newFixedThreadPool
    2. 3.2. newSingleThreadExecutor
    3. 3.3. newCachedThreadPool
    4. 3.4. 线程池大小

我们使用线程的时候就去创建一个线程,这样实现起来比较简单。
但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

使用线程池的好处

合理的使用线程池能够带来3个很明显的好处:

  1. 降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗
  2. 提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
  3. 提高线程的可管理性:线程池可以统一管理、分配、调优和监控。

ThreadPoolExecutor

java的线程池支持主要通过ThreadPoolExecutor来实现,

简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

public class ThreadTest {

public static void main(String[] args) {

int threadNum =20;
useThreadPool(threadNum);

}

public static void useThreadPool(int ThreadNum) {
// 核心线程池的大小
int corePoolSize = 20;
// 线程池最大线程数
int maximumPoolSize = ThreadNum;
// 表示线程没有任务执行时最多保持多久时间会终止;默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用
long keepAliveTime = 200;
// 参数keepAliveTime的时间单位
TimeUnit unit = TimeUnit.MILLISECONDS;
// 一个阻塞队列,用来存储等待执行的任务
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(5);

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

threadPoolExecutor.execute(new MyTask(num));

threadPoolExecutor.shutdown();
}

static class MyTask implements Runnable {
private int num;
MyTask(int num) {
this.num = num;
}
@Override
public void run() {
System.out.println(num);
}

}
}

线程池的创建

线程池的创建可以通过ThreadPoolExecutor的构造方法实现,有四个构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
{

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
{

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
{

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{

if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

前面三个都是调用第四个构造函数来实现的。

具体解释一下上面的参数:

corePoolSize: 核心线程池大小。初始为0,当任务来了之后会创建一个线程去执行任务;当线程数大于corePoolSize后,把到达的任务放入缓存任务队列中。

maximumPoolSize: 线程池最大容量。

keepAliveTime :线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。

TimeUnit : KeepAliveTime的时间单位,7种取值:

TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒

ThreadFactory :线程工厂,主要用来创建线程。

BlockingQueue :缓存任务队列。常见的有以下四种队列:

ArrayBlockingQueue —基于数组的先进先出队列
LinkedBlockingQueue — 基于链表的先进先出队列
PriorityBlockingQueue
SynchronizedQueue

RejectedExecutionHandler 线程拒绝策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务。
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。


线程任务的执行与终止

在ThreadPoolExecutor类中有几个非常重要的方法:
execute() 这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit() 这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果
shutdown()和shutdownNow()方法 用来关闭线程池。

shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,不会接受新的任务.
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务.

执行过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果线程数小于基本线程数,则创建线程并执行当前任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}

里面涉及的点比较多,简单的讲:

首先检查CorePool,如果CorePool内的线程小于CorePoolSize,新创建线程执行任务。
如果当前CorePool内的线程大于等于CorePoolSize,那么将线程加入到BlockingQueue。
如果不能加入BlockingQueue,在小于MaxPoolSize的情况下创建线程执行任务。
如果线程数大于等于MaxPoolSize,那么执行拒绝策略.


常见线程池

在java doc帮助文档中,有如此一段话:
强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。


newFixedThreadPool

1
2
3
4
5
6

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。

FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。
但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。


newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
如果这个线程异常结束,会有另一个取代它,保证顺序执行。
单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。


newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

这种类型的线程池特点是:

工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。


实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。


线程池大小

可以参考,具体还需要测试:
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
如果是IO密集型任务,参考值可以设置为2*NCPU


参考资料:
http://www.importnew.com/19011.html
http://blog.csdn.net/nk_tf/article/details/51959276
http://www.jianshu.com/p/ade771d2c9c0