线程池的简单实现

最近看了下JAVA线程相关的资料,顺便写了个自己的线程池的实现方式,记录一下。

简单任务的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** 
* Task
* Created on: 2008-9-29 上午10:29:18
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public interface Task {

public int getNum();
public void execute();
}

工作线程的定义

用于执行任务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* WorkThread
* Created on: 2008-9-29 上午10:30:06
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;

import java.util.Queue;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public class WorkThread extends Thread {

/**
* 线程关闭的标识位
*/
private boolean shutDown = false;

/**
* 线程池管理器
*/
ThreadPoolManager mgr;

/**
* 任务队列
*/
private Queue<Task> taskQueue;

public WorkThread(ThreadPoolManager mgr, Queue<Task> taskQueue, String name) {
super(name);
this.mgr = mgr;
this.taskQueue = taskQueue;
}

public void run() {
while(!shutDown) {
Task task;
// 如果任务队列不为空,则取出一个任务并开始执行,否则线程等等
if(!taskQueue.isEmpty()) {
synchronized(taskQueue) {
task = taskQueue.poll();
}
task.execute();
// 任务执行完毕之后释放线程到空闲线程队列中
mgr.releaseThread(this);
} else {
try {
synchronized(taskQueue) {
taskQueue.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public void shutDown() {
this.shutDown = true;
}
}

线程池管理器

核心的线程管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* ThreadPoolManager
* Created on: 2008-9-29 上午10:34:09
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public class ThreadPoolManager {

public static int DEFAULT_POOL_SIZE = 5;
public static int POOL_SIZE = 0;

/**
* 空闲线程
*/
private Queue<WorkThread> idleThread;

/**
* 任务队列
*/
private Queue<Task> taskQueue;

/**
* 线程池大小
*/
private int poolSize;

public ThreadPoolManager() {
this(DEFAULT_POOL_SIZE);
}

public ThreadPoolManager(int poolSize) {
if(poolSize < 0) {
this.poolSize = DEFAULT_POOL_SIZE;
} else {
this.poolSize = poolSize;
}
idleThread = new ConcurrentLinkedQueue<WorkThread>();
taskQueue = new ConcurrentLinkedQueue<Task>();
init();
}

/**
* 初始化线程池,新建 N 个空闲线程
*
*/
private void init() {
System.out.println("Start up thread pool...");
synchronized(taskQueue) {
for(int i=0; i < poolSize; i++) {
WorkThread workThread = new WorkThread(this, taskQueue, "Thread " + i);
idleThread.add(workThread);
POOL_SIZE++;
workThread.start();
}
}
}

/**
* 关闭线程池,关闭线程池中各个线程
* 在调用该方法后,线程并没有马上关闭,而是在线程任务执行完之后关闭
*
*/
public void shutDown() {
System.out.println("Shut down all work thread...");
synchronized(taskQueue) {
for(WorkThread thread : idleThread) {
thread.shutDown();
}
}
}

/**
* 添加任务并唤醒各因无任务而等待的空闲线程
* @param task
* @throws Exception
*/
public void addTask(Task task) throws Exception {
synchronized(taskQueue) {
taskQueue.add(task);
taskQueue.notifyAll();
}
}

// public void schedule() throws Exception {
// while(!shutDown) {
// if(!taskQueue.isEmpty()) {
// WorkThread workThread = getIdleThread();
// synchronized(workThread) {
// workThread.notifyAll();
// }
// } else {
// taskQueue.wait();
// }
// }
// }

/**
* 获取空闲线程,当线程池内无空闲线程时等待
* @return
* @throws Exception
*/
public WorkThread getIdleThread() throws Exception {
if(idleThread.isEmpty()) {
System.out.println("No idle thread in pool, please wait...");
idleThread.wait();
}
synchronized(idleThread) {
return idleThread.poll();
}
}

/**
* 释放线程
* @param thread
*/
public void releaseThread(WorkThread thread) {
System.out.println("Release the thread [" + thread.getName() + "] to the pool...");
synchronized(idleThread) {
idleThread.add(thread);
idleThread.notifyAll();
}
}
}

单元测试

测试起来就比较简单,为了模拟线程效果,在任务中只是打印一行执行过程,并让线程睡眠一段时间,同样,在所有任务执行完成后,让线程池睡眠一段时间再关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* SimpleTask
* Created on: 2008-9-29 上午10:47:07
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;

import edu.ccnu.inc.ivan.util.DateUtils;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public class SimpleTask implements Task {

int num = 0;

public SimpleTask(int num) {
this.num = num;
}

public void execute() {
try {
System.out.println("[" + DateUtils.getTimeNow() + "] Task[" + getNum() + "]:I have worked in thread [" + Thread.currentThread().getName() + "]");
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public int getNum() {
return num;
}

public void setNum(int num) {
this.num = num;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* TestThreadPoolManager
* Created on: 2008-9-29 上午10:45:35
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;

import junit.framework.TestCase;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public class TestThreadPoolManager extends TestCase {

public void testManager() throws Exception {
ThreadPoolManager pool = new ThreadPoolManager();
for(int i = 0; i < 100; i++) {
pool.addTask(new SimpleTask(i));
}
Thread.sleep(1000 * 10);
pool.shutDown();
}

}