发布于 

NIO学习

啥都别说了,就记录一下NIO的相关学习内容。

java IO

java1.4之前,java只支持一种阻塞式IO,可以称之为BIO。对应于网络编程,服务端提供了位置信息(ip和端口),客户端向服务端监听的端口发起连接请求,通过三次握手建立连接,然后通过网络套接字(socket)进行通信。

这是一种同步阻塞式的通信模式,通常由一个独立的Acceptor线程监听客户端连接,收到连接请求后,它为每一个客户端连接创建一个线程进行处理,阻塞式等待输入完成,从输入流获取数据后,处理请求,再通过输出流返回数据,最后关闭连接,销毁线程。

IO实例

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class HelloServer {


static int port = 9090;

public static void main(String[] args) {

final ServerSocket server;
try {
server = new ServerSocket(port);
System.out.println("Hello Server started at " + port);
Socket socket = null;

do {
socket = server.accept();
new Thread(new HelloServerHandler(socket)).start();

} while (true);

} catch (IOException e) {
e.printStackTrace();
}
}


public static class HelloServerHandler implements Runnable {

private Socket socket;

public HelloServerHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {

BufferedReader in = null;
PrintWriter out = null;

try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
while (true) {
String message = in.readLine();
if (message == null) break;

System.out.println("message from client: " + message);
out.println("Hello, " + message);
}

} catch (Exception e) {
e.printStackTrace();
if (in != null)
try {
in.close();
} catch (IOException e1) {
e1.printStackTrace();
}

if (out != null) {
out.close();
}

if (this.socket != null) {
try {
socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
socket = null;
}
}

}
}

}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class HelloClient {

static int port = 9090;

public static void main(String[] args) {

Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);

in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);

out.println("JoJo");
System.out.println("Sending message JoJo to server");
String response = in.readLine();
System.out.println("Got response from server: \n" + response);

} catch (IOException e) {
e.printStackTrace();
} finally {

if (in != null)
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}

if (out != null)
out.close();

try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}

}
}

优点和缺点

开发简单,容易维护和调试。然而缺乏弹性伸缩能力,后台处理请求的线程数和请求数量成正比,当同时大量请求发生时,会导致后台线程数急剧增加,最终崩溃。

优化:伪异步/使用线程池处理请求

为了解决这个上述模型的问题,我们可以对它做出优化,后端使用线程池处理客户端请求,M个线程的线程池可以处理N个客户端请求,其中N可以远大于M, 通过这种方式限制了后台线程数量,防止线程资源耗尽崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
final ServerSocket server = new ServerSocket(9090);
server.setReceiveBufferSize(1024 * 4);// 设置缓冲区大小4k

do {
try {
final Socket client = server.accept();
soaTransPool.execute(new SoaTransPool.SoaCodecTask(client));
} catch (IOException e) {
//TODO
e.printStackTrace();
}
} while (true);

...
private int corePoolSize = 100; //线程池维护线程的最少数量
private iint maximumPoolSize = 200; //线程池维护线程的最大数量
private long keepAliveTime = 60; //线程池维护线程所允许的空闲时间
private TimeUnit unit = TimeUnit.SECOND; //空闲时间单位
private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(50); //线程池的缓冲队列
private final ThreadFactory threadFactory = new DefaultThreadFactory("Thread Of Test");

private ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

public class DefaultThreadFactory implements ThreadFactory {

private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String name;

public DefaultThreadFactory(String name) {
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-" + threadNumber.getAndIncrement());
}
}

然而,请求量比较大,网络传输耗时,请求处理复杂时,由于IO模型的限制,后台线程将阻塞在请求上,将线程池和队列占满了,将没有多余的线程来接收请求,对于客户端来说,就会一直连接超时,请求失败,对于客户端来说,服务端已经系统崩溃,拒绝服务。

NIO

诞生

NIO和IO对比

面向流和面向通道

Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向通道的。流是单向的,而通道是双向的,我们可以通过同一个channel read 或者 write, 但是我们只能对inputstream read,也只能对outputstream做write操作。

阻塞与非阻塞IO

Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上。

nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector,他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试着打开另一个水龙头(看看有没有水)。

当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们.

优点和缺点

Java Nio使得可以通过一个或几个线程管理成千上万个连接(通道),使用非阻塞式的IO读写,在数据准备好之前不会消耗后台处理的线程和资源,极大提高了系统的并发处理能力,特别适用于大量连接,但是每个连接请求的数据量很小的情况。对于连结数量少,但是每个连接一次性需要传输大量数据的情况并不适用。

缺点

  1. NIO的类库和API繁杂,使用麻烦,需要掌握Selector, ServerSocketChannel, SocketChannel, ByteBuffer等;
  2. 需要熟悉Java多线程编程/网络编程;
  3. 可靠性能力补齐艰难,比如客户端重连,网络闪断,半包读写(以及粘包问题),网络拥塞(会导致写失败)等;
  4. 可能存在bug.

ps: 这也是我们选择成熟的nio框架比如netty而不是自己开发的原因。

例子:

1
2
3
4
5
6
7
8
9
10
11
ByteBuffer buf = ByteBuffer.allocate(48);  

int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
buf.flip();
while(buf.hasRemaining()){
System.out.print((char) buf.get());
}
buf.clear();
bytesRead = inChannel.read(buf);
}

组件和概念

Channel(通道)

NIO的通道,类似于IO的输入输出流,但是有些区别:

  1. 通道是双向的,既可以读取数据,也可以写入数据;
  2. 通道可以异步读写
  3. 通道中的数据必须读取到一个缓冲区,也只能由缓冲区写入

我们一般用的是SocketChannelServerSocketChannel

Buffer(缓冲区)

Buffer用于和Channel交互,它的本质是一块可以读取和写入的内存,被封装成一个Buffer对象,并且提供了一些方法以访问内存,操作数据。

重点关注一下Buffer有3个属性:

  1. capacity: 表示Buffer内存块的容量,一旦分配,对于Buffer来说,这个值就已经固定了;

  2. position:

    • 当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1。
    • 当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。
  3. limit: 在写模式下, limit 等于capicity, 表示你最多能写入多少个值;在读模式下,limit被赋值为写模式时position的值;

通过这三个属性,我们可以对一块内存的读写做出控制:

  1. 初始化时,position = 0, limit = capacity;
  2. inChannel.read(buffer), 将channel的数据写入Buffer, position随着写入递增;
  3. flip: 写模式 => 读模式
    1
    2
    3
    public final Buffer flip() {
    limit = position;
    position = 0;
  4. 读取数据:
    1
    2
    3
    4
    5
    6
    byte[] bytes = new byte[readBuffer.remaining()];
    readBuffer.get(bytes);

    public final int remaining() {
    return limit - position;
    }
  5. 清理buffer: 读模式 => 写模式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public final Buffer clear() {
    position = 0;
    limit = capacity;

    public abstract ByteBuffer compact();
    //compact方法将没有读完的数据移动到Buffer的开头,将position设置为移动后没读完数据的下一位,这样,接下来写入的数据会接着没读完的地方开始填充

    //使用实例:从一个channel将数据写入另一个channel
    buf.clear(); // Prepare buffer for use
    while (in.read(buf) >= 0 || buf.position != 0) {
    buf.flip();
    out.write(buf);
    buf.compact(); // In case of partial write
    }

有ByteBuffer, CharBuffer, DoubleBuffer, IntBuffer, FloatBuffer, LongBuffer, ShortBuffer等, 这些Buffer类型代表了不同的数据类型。也就是可以通过char,short,int,long,float 或 double类型来操作缓冲区中的字节。我们一般使用的是ByteBuffer。

Selector (选择器)

Selector又叫多路复用器,是Java Nio的关键。简单来说,Selector会不断轮训注册在其上的Channel,如果某个Channel上发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后我们通过SelectionKey获取到这些就绪的Channel的集合,进行后续的IO操作。

  1. 创建Selector

Selector selector = Selector.open();

  1. 向Selector注册Channel
    1
    2
    channel.configureBlocking(false);  
    channel.register(selector, Selectionkey.OP_READ);

    与Selector一起使用时,Channel必须处于非阻塞模式下

register的第二个参数,是一个”insterest集合”,表示Selector监听这个channel时,对什么类型的事件感兴趣.可以监听以下四种事件:

  1. SelectionKey.OP_CONNECT: 某个channel成功连接到服务端;
  2. SelectionKey.OP_ACCEPT: 服务端的serverSocketChannel准备好接收新进入的连接;
  3. SelectionKey.OP_READ: 一个通道有数据可读,读就绪;
  4. SelectionKey.OP_WRITE: 通道可以写入数据,写就绪;

如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

  1. SelectionKey
    SelectionKey是一个对象,它包含了一些属性:

  2. insterest集合

  3. ready集合: 已经就绪的操作的集合,但也可以使用类似于selectionKey.isAcceptable来判断是否就绪;

  4. Channel: Channel channel = key.channel();

  5. Selector: Selector selector = key.selector();

  6. 通过Selector选择通道
    一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。

下面是select()方法:
int select():select()阻塞到至少有一个通道在你注册的事件上就绪了。
int select(long timeout): select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
int selectNow():selectNow()不会阻塞,如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。

select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。

SelectionKeys()
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,获取已经就绪的SelectionKey的集合,然后通过对SelectionKey对象的操作,访问就绪通道.

服务端改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class NioHelloServer implements Runnable {

static int port = 9090;

static private Selector selector;

static private ServerSocketChannel serverSocketChannel;


public static void main(String[] args) {
new Thread(new NioHelloServer()).start();
}

@Override
public void run() {
try {
selector = Selector.open();

serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start in port: " + port);

while (true) {
selector.select(1000);

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;

while (it.hasNext()) {
key = it.next();
it.remove();

try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
}

} catch (IOException e) {
e.printStackTrace();
}
}

private void handleInput(SelectionKey key) throws IOException {

if (key.isValid()) {

if (key.isAcceptable()) {

ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {

SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);

int readBytes = sc.read(readBuffer);

if (readBytes > 0) {

readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
System.out.println("Server received message: " + message);
doReply(sc, "Hello," + message);
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else ;
}

}
}


private void doReply(SocketChannel sc, String message) throws IOException {

if (message != null && message.trim().length() > 0) {

byte[] bytes = message.getBytes();

ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
}
}
}

注意由于SocketChannel是异步非阻塞的,我们不能保证write的时候一次性就将ByteBuffer中的数据写入通道,所以,在实际应用中,我们需要对ByteBuffer做判断,是否还有未写的数据,如果有,继续写。同理,在读取channel到ByteBuffer时,可能一次性没读完,下一次要接着读。这里仅作为例子,没有做这么详细的考虑。

dapeng框架的IO和线程模型

在网络层面,使用的是netty的非阻塞方式。后台netty服务端收到请求,非阻塞的读取数据,将数据写入Buffer, 然后调用线程池解析(反序列化)这段Buffer中的数据,处理请求,并通过channel将请求结果返回。

对于同步请求客户端,它将netty的异步转为了同步, 简单来说,客户端将数据写入channel之后,本来可以该干嘛干嘛去,不要占用这个线程了,等请求结果回来之后再调用回调方法处理结果,但是它不这么干,它发送完数据后就阻塞在这儿等待,一直到请求结果返回(或者超时)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 回调方法,SoaClientHandler收到返回结果,即调用此方法处理返回结果,取出seqid,将结果放入对应的caches中,并释放锁,使得等待的线程返回结果
*/
private SoaClientHandler.CallBack callBack = msg -> {
// length(4) stx(1) version(...) protocol(1) seqid(4) header(...) body(...) etx(1)
int readerIndex = msg.readerIndex();
msg.skipBytes(5);
int len = msg.readInt();
msg.readBytes(new byte[len], 0, len);
msg.skipBytes(1);
int seqid = msg.readInt();
msg.readerIndex(readerIndex);

ByteBuf[] byteBufs = caches.get(String.valueOf(seqid));

if (byteBufs == null) {

if (futureCaches.containsKey(String.valueOf(seqid))) {

CompletableFuture<ByteBuf> future = (CompletableFuture<ByteBuf>) futureCaches.get(String.valueOf(seqid));
future.complete(msg);

futureCaches.remove(String.valueOf(seqid));
} else {
LOGGER.error("返回结果超时,siqid为:" + String.valueOf(seqid));
msg.release();
}
} else {
synchronized (byteBufs) {
byteBufs[0] = msg;

byteBufs.notify();
}
}
};

/**
* 发送请求,阻塞等待结果再返回
*
* @param seqid
* @param request
* @return
*/
public ByteBuf send(int seqid, ByteBuf request) throws Exception {
if (channel == null || !channel.isActive())
connect(host, port);

//means that this channel is not idle and would not managered by IdleConnectionManager
IdleConnectionManager.remove(channel);

ByteBuf[] byteBufs = new ByteBuf[1];

caches.put(String.valueOf(seqid), byteBufs);

try {
channel.writeAndFlush(request);

//等待返回结果,soaClientHandler会将结果写入caches并释放锁,此时返回
synchronized (byteBufs) {
if (byteBufs[0] != null)
return byteBufs[0];

try {
byteBufs.wait(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return byteBufs[0];
} finally {
caches.remove(String.valueOf(seqid));
}
}

异步和问题

Dapeng已经支持前端和后台的异步, 简单来说:

  • 对于客户端,你可以定义一个回调方法,然后像服务端发送请求,发送完线程就可以结束了,不需要等待返回;当服务器返回了处理结果后,框架会自动调用回调方法,处理这个返回结果;
  • 对于服务端,接收到一个请求后,线程可以将返回结果的准备交给第三方,然后就结束自己的使命。至于第三方,假设是一个线程,它可以同时管理很多个请求,并在结果获取到时“通知”该结果完成了,可以返回了。比如我们的竞拍模块,很多连接查询出价,但是要等到出价变化了才会返回最新的结果,如果是同步模式,前端和后端都需要阻塞等待数据变化,使用了异步模式后,前端发送请求和后端接收请求后可以释放线程,由一个独立的线程管理报价,发生变化时,通知所有请求结果已经准备好了可以返回了。

看起来很美好,但是有一个问题,异步结果的处理,也就是“回调方法”的执行者,跟请求者并不是同一个线程。也就是说它适用于返回结果并不影响后续执行的情况,比如通知,但是不适用于后续处理强依赖于返回结果的情况,也不适用于回调方法需要跟请求方法在同一个事务的情况。当然,请求端也可以发送异步请求后,处理其他工作,然后阻塞等待异步的结果返回…具体情况具体分析。