发布于 

ServerSocket 和 ThreadPoolExecutor学习笔记

ServerSocket部分

1
2
3
4
5
6
7
8
9
10
11
12
final ServerSocket server = new ServerSocket(9090);
server.setReceiveBufferSize(1024 * 4);// 设置缓冲区大小4k

do {
try {
final Socket client = server.accept();
soaTransPool.execute(new SoaTransPool.SoaCodecTask(client));
} catch (IOException e) {
//TODO
e.printStackTrace();
}
} while (true);

其中,soaTransPool是一个自定义的类,类中定义了ThreadPoolExecutor,以及实现了Runnable的静态类SoaCodecTask.

ServerSocket监听9090端口,每当有请求到达,就创建一个socket,使用这个socket构造一个Runnable对象,即SoaCodecTask,再使用ThreadPoolExecutor来执行这个Runnable对象。也就是说,对每个请求创建一个任务,由线程池来执行这个任务。

事实上,ServerSocket的功能就是向系统注册一个服务,然后等待客户端请求,或者从请求队列中取出Socket,至于数据的传递则由Socket完成。

上面的例子中,使用的是线程池的处理方式,接收到的请求由线程池来处理;如果是简单的单线程ServerSocket,则可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ServerSocket server = new ServerSocket(9090);
Socket client = null;
do {
try {
client = server.accept();
System.out.println("新增链接: " + client.getInetAddress() + ": " + client.getPort());
// ...接收、处理、发送数据

} catch (IOException e) {
//TODO
e.printStackTrace();
}finally{
if(client != null){
client.close();
}
}
} while (true);

如果是想用多线程,就如第一个例子中那样,将socket作为参数创建一个Runnable对象,并新建一个线程并start以执行run方法。

ThreadPoolExecutor部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private int corePoolSize = 100; //线程池维护线程的最少数量
private iint maximumPoolSize = 200; //线程池维护线程的最大数量
private long keepAliveTime = 60; //线程池维护线程所允许的空闲时间
private TimeUnit unit = TimeUnit.SECOND; //空闲时间单位
private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(50); //线程池的缓冲队列
private final ThreadFactory threadFactory = new DefaultThreadFactory("Thread Of Test");

private ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

public class DefaultThreadFactory implements ThreadFactory {

private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String name;

public DefaultThreadFactory(String name) {
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-" + threadNumber.getAndIncrement());
}
}

一个任务通过execute(Runnable)方法被添加到线程池,任务就是一个Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法添加到线程池时:

  1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
  2. 如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。
  3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
    这里的defaultHandler就是实现了RejectedExecutionHandler的一个类,它会抛出任务被拒绝的异常。
  5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

上面说的,使用handler处理被拒绝的任务,查看代码如下:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

这里的defaultHandler就是实现了RejectedExecutionHandler的一个类,它会抛出任务被拒绝的异常。

handler有四个选择:

  1. ThreadPoolExecutor.AbortPolicy()
    抛出java.util.concurrent.RejectedExecutionException异常

  2. ThreadPoolExecutor.CallerRunsPolicy()
    重试添加当前的任务,他会自动重复调用execute()方法

  3. ThreadPoolExecutor.DiscardOldestPolicy()
    抛弃旧的任务

  4. ThreadPoolExecutor.DiscardPolicy()
    抛弃当前的任务