NIO学习
java IO
java1.4之前,java只支持一种阻塞式IO,可以称之为BIO。对应于网络编程,服务端提供了位置信息(ip和端口),客户端向服务端监听的端口发起连接请求,通过三次握手建立连接,然后通过网络套接字(socket)进行通信。
这是一种同步阻塞式的通信模式,通常由一个独立的Acceptor线程监听客户端连接,收到连接请求后,它为每一个客户端连接创建一个线程进行处理,阻塞式等待输入完成,从输入流获取数据后,处理请求,再通过输出流返回数据,最后关闭连接,销毁线程。
IO实例
服务端
1 | public class HelloServer { |
客户端
1 | public class HelloClient { |
优点和缺点
开发简单,容易维护和调试。然而缺乏弹性伸缩能力,后台处理请求的线程数和请求数量成正比,当同时大量请求发生时,会导致后台线程数急剧增加,最终崩溃。
优化:伪异步/使用线程池处理请求
为了解决这个上述模型的问题,我们可以对它做出优化,后端使用线程池处理客户端请求,M个线程的线程池可以处理N个客户端请求,其中N可以远大于M, 通过这种方式限制了后台线程数量,防止线程资源耗尽崩溃。
1 | final ServerSocket server = new ServerSocket(9090); |
然而,请求量比较大,网络传输耗时,请求处理复杂时,由于IO模型的限制,后台线程将阻塞在请求上,将线程池和队列占满了,将没有多余的线程来接收请求,对于客户端来说,就会一直连接超时,请求失败,对于客户端来说,服务端已经系统崩溃,拒绝服务。
NIO
诞生
NIO和IO对比
面向流和面向通道
Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向通道的。流是单向的,而通道是双向的,我们可以通过同一个channel read 或者 write, 但是我们只能对inputstream read,也只能对outputstream做write操作。
阻塞与非阻塞IO
Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上。
nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector,他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试着打开另一个水龙头(看看有没有水)。
当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们.
优点和缺点
Java Nio使得可以通过一个或几个线程管理成千上万个连接(通道),使用非阻塞式的IO读写,在数据准备好之前不会消耗后台处理的线程和资源,极大提高了系统的并发处理能力,特别适用于大量连接,但是每个连接请求的数据量很小的情况。对于连结数量少,但是每个连接一次性需要传输大量数据的情况并不适用。
缺点
- NIO的类库和API繁杂,使用麻烦,需要掌握Selector, ServerSocketChannel, SocketChannel, ByteBuffer等;
- 需要熟悉Java多线程编程/网络编程;
- 可靠性能力补齐艰难,比如客户端重连,网络闪断,半包读写(以及粘包问题),网络拥塞(会导致写失败)等;
- 可能存在bug.
ps: 这也是我们选择成熟的nio框架比如netty而不是自己开发的原因。
例子:1
2
3
4
5
6
7
8
9
10
11ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
buf.flip();
while(buf.hasRemaining()){
System.out.print((char) buf.get());
}
buf.clear();
bytesRead = inChannel.read(buf);
}
组件和概念
Channel(通道)
NIO的通道,类似于IO的输入输出流,但是有些区别:
- 通道是双向的,既可以读取数据,也可以写入数据;
- 通道可以异步读写
- 通道中的数据必须读取到一个缓冲区,也只能由缓冲区写入
我们一般用的是SocketChannel
和ServerSocketChannel
Buffer(缓冲区)
Buffer用于和Channel交互,它的本质是一块可以读取和写入的内存,被封装成一个Buffer对象,并且提供了一些方法以访问内存,操作数据。
重点关注一下Buffer有3个属性:
- capacity: 表示Buffer内存块的容量,一旦分配,对于Buffer来说,这个值就已经固定了;
position:
- 当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1。
- 当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。
limit: 在写模式下, limit 等于capicity, 表示你最多能写入多少个值;在读模式下,limit被赋值为写模式时position的值;
通过这三个属性,我们可以对一块内存的读写做出控制:
- 初始化时,position = 0, limit = capacity;
- inChannel.read(buffer), 将channel的数据写入Buffer, position随着写入递增;
flip: 写模式 => 读模式
1
2
3public final Buffer flip() {
limit = position;
position = 0;读取数据:
1
2
3
4
5
6byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
public final int remaining() {
return limit - position;
}清理buffer: 读模式 => 写模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14public final Buffer clear() {
position = 0;
limit = capacity;
public abstract ByteBuffer compact();
//compact方法将没有读完的数据移动到Buffer的开头,将position设置为移动后没读完数据的下一位,这样,接下来写入的数据会接着没读完的地方开始填充
//使用实例:从一个channel将数据写入另一个channel
buf.clear(); // Prepare buffer for use
while (in.read(buf) >= 0 || buf.position != 0) {
buf.flip();
out.write(buf);
buf.compact(); // In case of partial write
}
有ByteBuffer, CharBuffer, DoubleBuffer, IntBuffer, FloatBuffer, LongBuffer, ShortBuffer等, 这些Buffer类型代表了不同的数据类型。也就是可以通过char,short,int,long,float 或 double类型来操作缓冲区中的字节。我们一般使用的是ByteBuffer。
Selector (选择器)
Selector又叫多路复用器,是Java Nio的关键。简单来说,Selector会不断轮训注册在其上的Channel,如果某个Channel上发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后我们通过SelectionKey获取到这些就绪的Channel的集合,进行后续的IO操作。
创建Selector
Selector selector = Selector.open();
向Selector注册Channel
1
2channel.configureBlocking(false);
channel.register(selector, Selectionkey.OP_READ);与Selector一起使用时,Channel必须处于非阻塞模式下
register的第二个参数,是一个”insterest集合”,表示Selector监听这个channel时,对什么类型的事件感兴趣.可以监听以下四种事件:
- SelectionKey.OP_CONNECT: 某个channel成功连接到服务端;
- SelectionKey.OP_ACCEPT: 服务端的serverSocketChannel准备好接收新进入的连接;
- SelectionKey.OP_READ: 一个通道有数据可读,读就绪;
- SelectionKey.OP_WRITE: 通道可以写入数据,写就绪;
如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
SelectionKey
SelectionKey是一个对象,它包含了一些属性:- insterest集合
- ready集合: 已经就绪的操作的集合,但也可以使用类似于
selectionKey.isAcceptable
来判断是否就绪; - Channel:
Channel channel = key.channel();
- Selector:
Selector selector = key.selector();
通过Selector选择通道
一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。下面是select()方法:
int select()
:select()阻塞到至少有一个通道在你注册的事件上就绪了。int select(long timeout)
: select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。int selectNow()
:selectNow()不会阻塞,如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。
select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。
SelectionKeys()
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,获取已经就绪的SelectionKey的集合,然后通过对SelectionKey对象的操作,访问就绪通道.
服务端改造
1 | public class NioHelloServer implements Runnable { |
注意由于SocketChannel是异步非阻塞的,我们不能保证write的时候一次性就将ByteBuffer中的数据写入通道,所以,在实际应用中,我们需要对ByteBuffer做判断,是否还有未写的数据,如果有,继续写。同理,在读取channel到ByteBuffer时,可能一次性没读完,下一次要接着读。这里仅作为例子,没有做这么详细的考虑。
dapeng框架的IO和线程模型
在网络层面,使用的是netty的非阻塞方式。后台netty服务端收到请求,非阻塞的读取数据,将数据写入Buffer, 然后调用线程池解析(反序列化)这段Buffer中的数据,处理请求,并通过channel将请求结果返回。
对于同步请求客户端,它将netty的异步转为了同步, 简单来说,客户端将数据写入channel之后,本来可以该干嘛干嘛去,不要占用这个线程了,等请求结果回来之后再调用回调方法处理结果,但是它不这么干,它发送完数据后就阻塞在这儿等待,一直到请求结果返回(或者超时)。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74/**
* 回调方法,SoaClientHandler收到返回结果,即调用此方法处理返回结果,取出seqid,将结果放入对应的caches中,并释放锁,使得等待的线程返回结果
*/
private SoaClientHandler.CallBack callBack = msg -> {
// length(4) stx(1) version(...) protocol(1) seqid(4) header(...) body(...) etx(1)
int readerIndex = msg.readerIndex();
msg.skipBytes(5);
int len = msg.readInt();
msg.readBytes(new byte[len], 0, len);
msg.skipBytes(1);
int seqid = msg.readInt();
msg.readerIndex(readerIndex);
ByteBuf[] byteBufs = caches.get(String.valueOf(seqid));
if (byteBufs == null) {
if (futureCaches.containsKey(String.valueOf(seqid))) {
CompletableFuture<ByteBuf> future = (CompletableFuture<ByteBuf>) futureCaches.get(String.valueOf(seqid));
future.complete(msg);
futureCaches.remove(String.valueOf(seqid));
} else {
LOGGER.error("返回结果超时,siqid为:" + String.valueOf(seqid));
msg.release();
}
} else {
synchronized (byteBufs) {
byteBufs[0] = msg;
byteBufs.notify();
}
}
};
/**
* 发送请求,阻塞等待结果再返回
*
* @param seqid
* @param request
* @return
*/
public ByteBuf send(int seqid, ByteBuf request) throws Exception {
if (channel == null || !channel.isActive())
connect(host, port);
//means that this channel is not idle and would not managered by IdleConnectionManager
IdleConnectionManager.remove(channel);
ByteBuf[] byteBufs = new ByteBuf[1];
caches.put(String.valueOf(seqid), byteBufs);
try {
channel.writeAndFlush(request);
//等待返回结果,soaClientHandler会将结果写入caches并释放锁,此时返回
synchronized (byteBufs) {
if (byteBufs[0] != null)
return byteBufs[0];
try {
byteBufs.wait(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return byteBufs[0];
} finally {
caches.remove(String.valueOf(seqid));
}
}
异步和问题
Dapeng已经支持前端和后台的异步, 简单来说:
- 对于客户端,你可以定义一个回调方法,然后像服务端发送请求,发送完线程就可以结束了,不需要等待返回;当服务器返回了处理结果后,框架会自动调用回调方法,处理这个返回结果;
- 对于服务端,接收到一个请求后,线程可以将返回结果的准备交给第三方,然后就结束自己的使命。至于第三方,假设是一个线程,它可以同时管理很多个请求,并在结果获取到时“通知”该结果完成了,可以返回了。比如我们的竞拍模块,很多连接查询出价,但是要等到出价变化了才会返回最新的结果,如果是同步模式,前端和后端都需要阻塞等待数据变化,使用了异步模式后,前端发送请求和后端接收请求后可以释放线程,由一个独立的线程管理报价,发生变化时,通知所有请求结果已经准备好了可以返回了。
看起来很美好,但是有一个问题,异步结果的处理,也就是“回调方法”的执行者,跟请求者并不是同一个线程。也就是说它适用于返回结果并不影响后续执行的情况,比如通知,但是不适用于后续处理强依赖于返回结果的情况,也不适用于回调方法需要跟请求方法在同一个事务的情况。当然,请求端也可以发送异步请求后,处理其他工作,然后阻塞等待异步的结果返回…具体情况具体分析。