NIO学习
啥都别说了,就记录一下NIO的相关学习内容。
java IO java1.4之前,java只支持一种阻塞式IO,可以称之为BIO。对应于网络编程,服务端提供了位置信息(ip和端口),客户端向服务端监听的端口发起连接请求,通过三次握手建立连接,然后通过网络套接字(socket)进行通信。
这是一种同步阻塞式的通信模式,通常由一个独立的Acceptor线程监听客户端连接,收到连接请求后,它为每一个客户端连接创建一个线程进行处理,阻塞式等待输入完成,从输入流获取数据后,处理请求,再通过输出流返回数据,最后关闭连接,销毁线程。
IO实例 服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class HelloServer { static int port = 9090; public static void main(String[] args) { final ServerSocket server; try { server = new ServerSocket(port); System.out.println("Hello Server started at " + port); Socket socket = null; do { socket = server.accept(); new Thread(new HelloServerHandler(socket)).start(); } while (true); } catch (IOException e) { e.printStackTrace(); } } public static class HelloServerHandler implements Runnable { private Socket socket; public HelloServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); while (true) { String message = in.readLine(); if (message == null) break; System.out.println("message from client: " + message); out.println("Hello, " + message); } } catch (Exception e) { e.printStackTrace(); if (in != null) try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } if (out != null) { out.close(); } if (this.socket != null) { try { socket.close(); } catch (IOException e1) { e1.printStackTrace(); } socket = null; } } } } }
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class HelloClient { static int port = 9090; public static void main(String[] args) { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket("127.0.0.1", port); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println("JoJo"); System.out.println("Sending message JoJo to server"); String response = in.readLine(); System.out.println("Got response from server: \n" + response); } catch (IOException e) { e.printStackTrace(); } finally { if (in != null) try { in.close(); } catch (IOException e) { e.printStackTrace(); } if (out != null) out.close(); try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
优点和缺点 开发简单,容易维护和调试。然而缺乏弹性伸缩能力,后台处理请求的线程数和请求数量成正比,当同时大量请求发生时,会导致后台线程数急剧增加,最终崩溃。
优化:伪异步/使用线程池处理请求 为了解决这个上述模型的问题,我们可以对它做出优化,后端使用线程池处理客户端请求,M个线程的线程池可以处理N个客户端请求,其中N可以远大于M, 通过这种方式限制了后台线程数量,防止线程资源耗尽崩溃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 final ServerSocket server = new ServerSocket(9090); server.setReceiveBufferSize(1024 * 4);// 设置缓冲区大小4k do { try { final Socket client = server.accept(); soaTransPool.execute(new SoaTransPool.SoaCodecTask(client)); } catch (IOException e) { //TODO e.printStackTrace(); } } while (true); ... private int corePoolSize = 100; //线程池维护线程的最少数量 private iint maximumPoolSize = 200; //线程池维护线程的最大数量 private long keepAliveTime = 60; //线程池维护线程所允许的空闲时间 private TimeUnit unit = TimeUnit.SECOND; //空闲时间单位 private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(50); //线程池的缓冲队列 private final ThreadFactory threadFactory = new DefaultThreadFactory("Thread Of Test"); private ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); public class DefaultThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String name; public DefaultThreadFactory(String name) { this.name = name; } @Override public Thread newThread(Runnable r) { return new Thread(r, name + "-" + threadNumber.getAndIncrement()); } }
然而,请求量比较大,网络传输耗时,请求处理复杂时,由于IO模型的限制,后台线程将阻塞在请求上,将线程池和队列占满了,将没有多余的线程来接收请求,对于客户端来说,就会一直连接超时,请求失败,对于客户端来说,服务端已经系统崩溃,拒绝服务。
NIO 诞生 NIO和IO对比 面向流和面向通道
Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向通道的。流是单向的,而通道是双向的,我们可以通过同一个channel read 或者 write, 但是我们只能对inputstream read,也只能对outputstream做write操作。
阻塞与非阻塞IO
Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上。
nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector,他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试着打开另一个水龙头(看看有没有水)。
当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们.
优点和缺点 Java Nio使得可以通过一个或几个线程管理成千上万个连接(通道),使用非阻塞式的IO读写,在数据准备好之前不会消耗后台处理的线程和资源,极大提高了系统的并发处理能力,特别适用于大量连接,但是每个连接请求的数据量很小的情况。对于连结数量少,但是每个连接一次性需要传输大量数据的情况并不适用。
缺点
NIO的类库和API繁杂,使用麻烦,需要掌握Selector, ServerSocketChannel, SocketChannel, ByteBuffer等;
需要熟悉Java多线程编程/网络编程;
可靠性能力补齐艰难,比如客户端重连,网络闪断,半包读写(以及粘包问题),网络拥塞(会导致写失败)等;
可能存在bug.
ps: 这也是我们选择成熟的nio框架比如netty而不是自己开发的原因。
例子:
1 2 3 4 5 6 7 8 9 10 11 ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); while (bytesRead != -1) { buf.flip(); while(buf.hasRemaining()){ System.out.print((char) buf.get()); } buf.clear(); bytesRead = inChannel.read(buf); }
组件和概念 Channel(通道) NIO的通道,类似于IO的输入输出流,但是有些区别:
通道是双向的,既可以读取数据,也可以写入数据;
通道可以异步读写
通道中的数据必须读取到一个缓冲区,也只能由缓冲区写入
我们一般用的是SocketChannel
和ServerSocketChannel
Buffer(缓冲区) Buffer用于和Channel交互,它的本质是一块可以读取和写入的内存,被封装成一个Buffer对象,并且提供了一些方法以访问内存,操作数据。
重点关注一下Buffer有3个属性:
capacity : 表示Buffer内存块的容量,一旦分配,对于Buffer来说,这个值就已经固定了;
position :
当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1。
当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。
limit : 在写模式下, limit 等于capicity, 表示你最多能写入多少个值;在读模式下,limit被赋值为写模式时position的值;
通过这三个属性,我们可以对一块内存的读写做出控制:
初始化时,position = 0, limit = capacity;
inChannel.read(buffer), 将channel的数据写入Buffer, position随着写入递增;
flip: 写模式 => 读模式1 2 3 public final Buffer flip() { limit = position; position = 0;
读取数据:1 2 3 4 5 6 byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); public final int remaining() { return limit - position; }
清理buffer: 读模式 => 写模式1 2 3 4 5 6 7 8 9 10 11 12 13 14 public final Buffer clear() { position = 0; limit = capacity; public abstract ByteBuffer compact(); //compact方法将没有读完的数据移动到Buffer的开头,将position设置为移动后没读完数据的下一位,这样,接下来写入的数据会接着没读完的地方开始填充 //使用实例:从一个channel将数据写入另一个channel buf.clear(); // Prepare buffer for use while (in.read(buf) >= 0 || buf.position != 0) { buf.flip(); out.write(buf); buf.compact(); // In case of partial write }
有ByteBuffer, CharBuffer, DoubleBuffer, IntBuffer, FloatBuffer, LongBuffer, ShortBuffer等, 这些Buffer类型代表了不同的数据类型。也就是可以通过char,short,int,long,float 或 double类型来操作缓冲区中的字节。我们一般使用的是ByteBuffer。
Selector (选择器) Selector又叫多路复用器,是Java Nio的关键。简单来说,Selector会不断轮训注册在其上的Channel,如果某个Channel上发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后我们通过SelectionKey获取到这些就绪的Channel的集合,进行后续的IO操作。
创建Selector
Selector selector = Selector.open();
向Selector注册Channel 1 2 channel.configureBlocking(false); channel.register(selector, Selectionkey.OP_READ);
与Selector一起使用时,Channel必须处于非阻塞模式下
register的第二个参数,是一个”insterest集合”,表示Selector监听这个channel时,对什么类型的事件感兴趣.可以监听以下四种事件:
SelectionKey.OP_CONNECT: 某个channel成功连接到服务端;
SelectionKey.OP_ACCEPT: 服务端的serverSocketChannel准备好接收新进入的连接;
SelectionKey.OP_READ: 一个通道有数据可读,读就绪;
SelectionKey.OP_WRITE: 通道可以写入数据,写就绪;
如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如: int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
SelectionKey SelectionKey是一个对象,它包含了一些属性:
insterest集合
ready集合: 已经就绪的操作的集合,但也可以使用类似于selectionKey.isAcceptable
来判断是否就绪;
Channel: Channel channel = key.channel();
Selector: Selector selector = key.selector();
通过Selector选择通道 一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。
下面是select()方法: int select()
:select()阻塞到至少有一个通道在你注册的事件上就绪了。 int select(long timeout)
: select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。 int selectNow()
:selectNow()不会阻塞,如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。
select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。
SelectionKeys() 一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,获取已经就绪的SelectionKey的集合,然后通过对SelectionKey对象的操作,访问就绪通道.
服务端改造 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public class NioHelloServer implements Runnable { static int port = 9090; static private Selector selector; static private ServerSocketChannel serverSocketChannel; public static void main(String[] args) { new Thread(new NioHelloServer()).start(); } @Override public void run() { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Server start in port: " + port); while (true) { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String message = new String(bytes, "UTF-8"); System.out.println("Server received message: " + message); doReply(sc, "Hello," + message); } else if (readBytes < 0) { key.cancel(); sc.close(); } else ; } } } private void doReply(SocketChannel sc, String message) throws IOException { if (message != null && message.trim().length() > 0) { byte[] bytes = message.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); } } }
注意由于SocketChannel是异步非阻塞的,我们不能保证write的时候一次性就将ByteBuffer中的数据写入通道,所以,在实际应用中,我们需要对ByteBuffer做判断,是否还有未写的数据,如果有,继续写。同理,在读取channel到ByteBuffer时,可能一次性没读完,下一次要接着读。这里仅作为例子,没有做这么详细的考虑。
dapeng框架的IO和线程模型 在网络层面,使用的是netty的非阻塞方式。后台netty服务端收到请求,非阻塞的读取数据,将数据写入Buffer, 然后调用线程池解析(反序列化)这段Buffer中的数据,处理请求,并通过channel将请求结果返回。
对于同步请求客户端,它将netty的异步转为了同步, 简单来说,客户端将数据写入channel之后,本来可以该干嘛干嘛去,不要占用这个线程了,等请求结果回来之后再调用回调方法处理结果,但是它不这么干,它发送完数据后就阻塞在这儿等待,一直到请求结果返回(或者超时)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 /** * 回调方法,SoaClientHandler收到返回结果,即调用此方法处理返回结果,取出seqid,将结果放入对应的caches中,并释放锁,使得等待的线程返回结果 */ private SoaClientHandler.CallBack callBack = msg -> { // length(4) stx(1) version(...) protocol(1) seqid(4) header(...) body(...) etx(1) int readerIndex = msg.readerIndex(); msg.skipBytes(5); int len = msg.readInt(); msg.readBytes(new byte[len], 0, len); msg.skipBytes(1); int seqid = msg.readInt(); msg.readerIndex(readerIndex); ByteBuf[] byteBufs = caches.get(String.valueOf(seqid)); if (byteBufs == null) { if (futureCaches.containsKey(String.valueOf(seqid))) { CompletableFuture<ByteBuf> future = (CompletableFuture<ByteBuf>) futureCaches.get(String.valueOf(seqid)); future.complete(msg); futureCaches.remove(String.valueOf(seqid)); } else { LOGGER.error("返回结果超时,siqid为:" + String.valueOf(seqid)); msg.release(); } } else { synchronized (byteBufs) { byteBufs[0] = msg; byteBufs.notify(); } } }; /** * 发送请求,阻塞等待结果再返回 * * @param seqid * @param request * @return */ public ByteBuf send(int seqid, ByteBuf request) throws Exception { if (channel == null || !channel.isActive()) connect(host, port); //means that this channel is not idle and would not managered by IdleConnectionManager IdleConnectionManager.remove(channel); ByteBuf[] byteBufs = new ByteBuf[1]; caches.put(String.valueOf(seqid), byteBufs); try { channel.writeAndFlush(request); //等待返回结果,soaClientHandler会将结果写入caches并释放锁,此时返回 synchronized (byteBufs) { if (byteBufs[0] != null) return byteBufs[0]; try { byteBufs.wait(50000); } catch (InterruptedException e) { e.printStackTrace(); } } return byteBufs[0]; } finally { caches.remove(String.valueOf(seqid)); } }
异步和问题 Dapeng已经支持前端和后台的异步, 简单来说:
对于客户端,你可以定义一个回调方法,然后像服务端发送请求,发送完线程就可以结束了,不需要等待返回;当服务器返回了处理结果后,框架会自动调用回调方法,处理这个返回结果;
对于服务端,接收到一个请求后,线程可以将返回结果的准备交给第三方,然后就结束自己的使命。至于第三方,假设是一个线程,它可以同时管理很多个请求,并在结果获取到时“通知”该结果完成了,可以返回了。比如我们的竞拍模块,很多连接查询出价,但是要等到出价变化了才会返回最新的结果,如果是同步模式,前端和后端都需要阻塞等待数据变化,使用了异步模式后,前端发送请求和后端接收请求后可以释放线程,由一个独立的线程管理报价,发生变化时,通知所有请求结果已经准备好了可以返回了。
看起来很美好,但是有一个问题,异步结果的处理,也就是“回调方法”的执行者,跟请求者并不是同一个线程。也就是说它适用于返回结果并不影响后续执行的情况,比如通知,但是不适用于后续处理强依赖于返回结果的情况,也不适用于回调方法需要跟请求方法在同一个事务的情况。当然,请求端也可以发送异步请求后,处理其他工作,然后阻塞等待异步的结果返回…具体情况具体分析。