文章目录
  1. 1. java IO
  2. 2. IO实例
    1. 2.1. 服务端
    2. 2.2. 客户端
    3. 2.3. 优点和缺点
    4. 2.4. 优化:伪异步/使用线程池处理请求
  3. 3. NIO
    1. 3.1. 诞生
    2. 3.2. NIO和IO对比
      1. 3.2.1. 面向流和面向通道
      2. 3.2.2. 阻塞与非阻塞IO
    3. 3.3. 优点和缺点
      1. 3.3.1. 缺点
    4. 3.4. 组件和概念
      1. 3.4.1. Channel(通道)
      2. 3.4.2. Buffer(缓冲区)
      3. 3.4.3. Selector (选择器)
    5. 3.5. 服务端改造
  4. 4. dapeng框架的IO和线程模型
    1. 4.1. 异步和问题

java IO

java1.4之前,java只支持一种阻塞式IO,可以称之为BIO。对应于网络编程,服务端提供了位置信息(ip和端口),客户端向服务端监听的端口发起连接请求,通过三次握手建立连接,然后通过网络套接字(socket)进行通信。

这是一种同步阻塞式的通信模式,通常由一个独立的Acceptor线程监听客户端连接,收到连接请求后,它为每一个客户端连接创建一个线程进行处理,阻塞式等待输入完成,从输入流获取数据后,处理请求,再通过输出流返回数据,最后关闭连接,销毁线程。

IO实例

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class HelloServer {


static int port = 9090;

public static void main(String[] args) {

final ServerSocket server;
try {
server = new ServerSocket(port);
System.out.println("Hello Server started at " + port);
Socket socket = null;

do {
socket = server.accept();
new Thread(new HelloServerHandler(socket)).start();

} while (true);

} catch (IOException e) {
e.printStackTrace();
}
}


public static class HelloServerHandler implements Runnable {

private Socket socket;

public HelloServerHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {


BufferedReader in = null;
PrintWriter out = null;

try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
while (true) {
String message = in.readLine();
if (message == null) break;

System.out.println("message from client: " + message);
out.println("Hello, " + message);
}

} catch (Exception e) {
e.printStackTrace();
if (in != null)
try {
in.close();
} catch (IOException e1) {
e1.printStackTrace();
}

if (out != null) {
out.close();
}

if (this.socket != null) {
try {
socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
socket = null;
}
}

}
}

}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class HelloClient {

static int port = 9090;

public static void main(String[] args) {

Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);

in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);

out.println("JoJo");
System.out.println("Sending message JoJo to server");
String response = in.readLine();
System.out.println("Got response from server: \n" + response);

} catch (IOException e) {
e.printStackTrace();
} finally {

if (in != null)
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}

if (out != null)
out.close();

try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}

}
}

优点和缺点

开发简单,容易维护和调试。然而缺乏弹性伸缩能力,后台处理请求的线程数和请求数量成正比,当同时大量请求发生时,会导致后台线程数急剧增加,最终崩溃。

优化:伪异步/使用线程池处理请求

为了解决这个上述模型的问题,我们可以对它做出优化,后端使用线程池处理客户端请求,M个线程的线程池可以处理N个客户端请求,其中N可以远大于M, 通过这种方式限制了后台线程数量,防止线程资源耗尽崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
final ServerSocket server = new ServerSocket(9090);
server.setReceiveBufferSize(1024 * 4);// 设置缓冲区大小4k

do {
try {
final Socket client = server.accept();
soaTransPool.execute(new SoaTransPool.SoaCodecTask(client));
} catch (IOException e) {
//TODO
e.printStackTrace();
}
} while (true);

...
private int corePoolSize = 100; //线程池维护线程的最少数量
private iint maximumPoolSize = 200; //线程池维护线程的最大数量
private long keepAliveTime = 60; //线程池维护线程所允许的空闲时间
private TimeUnit unit = TimeUnit.SECOND; //空闲时间单位
private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(50); //线程池的缓冲队列
private final ThreadFactory threadFactory = new DefaultThreadFactory("Thread Of Test");

private ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

public class DefaultThreadFactory implements ThreadFactory {

private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String name;

public DefaultThreadFactory(String name) {
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-" + threadNumber.getAndIncrement());
}
}

然而,请求量比较大,网络传输耗时,请求处理复杂时,由于IO模型的限制,后台线程将阻塞在请求上,将线程池和队列占满了,将没有多余的线程来接收请求,对于客户端来说,就会一直连接超时,请求失败,对于客户端来说,服务端已经系统崩溃,拒绝服务。

NIO

诞生

NIO和IO对比

面向流和面向通道

Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向通道的。流是单向的,而通道是双向的,我们可以通过同一个channel read 或者 write, 但是我们只能对inputstream read,也只能对outputstream做write操作。

阻塞与非阻塞IO

Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上。

nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector,他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试着打开另一个水龙头(看看有没有水)。

当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们.

优点和缺点

Java Nio使得可以通过一个或几个线程管理成千上万个连接(通道),使用非阻塞式的IO读写,在数据准备好之前不会消耗后台处理的线程和资源,极大提高了系统的并发处理能力,特别适用于大量连接,但是每个连接请求的数据量很小的情况。对于连结数量少,但是每个连接一次性需要传输大量数据的情况并不适用。

缺点

  1. NIO的类库和API繁杂,使用麻烦,需要掌握Selector, ServerSocketChannel, SocketChannel, ByteBuffer等;
  2. 需要熟悉Java多线程编程/网络编程;
  3. 可靠性能力补齐艰难,比如客户端重连,网络闪断,半包读写(以及粘包问题),网络拥塞(会导致写失败)等;
  4. 可能存在bug.

ps: 这也是我们选择成熟的nio框架比如netty而不是自己开发的原因。

例子:

1
2
3
4
5
6
7
8
9
10
11
ByteBuffer buf = ByteBuffer.allocate(48);  

int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
buf.flip();
while(buf.hasRemaining()){
System.out.print((char) buf.get());
}
buf.clear();
bytesRead = inChannel.read(buf);
}

组件和概念

Channel(通道)

NIO的通道,类似于IO的输入输出流,但是有些区别:

  1. 通道是双向的,既可以读取数据,也可以写入数据;
  2. 通道可以异步读写
  3. 通道中的数据必须读取到一个缓冲区,也只能由缓冲区写入

我们一般用的是SocketChannelServerSocketChannel

Buffer(缓冲区)

Buffer用于和Channel交互,它的本质是一块可以读取和写入的内存,被封装成一个Buffer对象,并且提供了一些方法以访问内存,操作数据。

重点关注一下Buffer有3个属性:

  1. capacity: 表示Buffer内存块的容量,一旦分配,对于Buffer来说,这个值就已经固定了;
  2. position:

    • 当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1。
    • 当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。
  3. limit: 在写模式下, limit 等于capicity, 表示你最多能写入多少个值;在读模式下,limit被赋值为写模式时position的值;

通过这三个属性,我们可以对一块内存的读写做出控制:

  1. 初始化时,position = 0, limit = capacity;
  2. inChannel.read(buffer), 将channel的数据写入Buffer, position随着写入递增;
  3. flip: 写模式 => 读模式

    1
    2
    3
    public final Buffer flip() {
    limit = position;
    position = 0;
  4. 读取数据:

    1
    2
    3
    4
    5
    6
    byte[] bytes = new byte[readBuffer.remaining()];
    readBuffer.get(bytes);

    public final int remaining() {
    return limit - position;
    }
  5. 清理buffer: 读模式 => 写模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public final Buffer clear() {
    position = 0;
    limit = capacity;

    public abstract ByteBuffer compact();
    //compact方法将没有读完的数据移动到Buffer的开头,将position设置为移动后没读完数据的下一位,这样,接下来写入的数据会接着没读完的地方开始填充

    //使用实例:从一个channel将数据写入另一个channel
    buf.clear(); // Prepare buffer for use
    while (in.read(buf) >= 0 || buf.position != 0) {
    buf.flip();
    out.write(buf);
    buf.compact(); // In case of partial write
    }

有ByteBuffer, CharBuffer, DoubleBuffer, IntBuffer, FloatBuffer, LongBuffer, ShortBuffer等, 这些Buffer类型代表了不同的数据类型。也就是可以通过char,short,int,long,float 或 double类型来操作缓冲区中的字节。我们一般使用的是ByteBuffer。

Selector (选择器)

Selector又叫多路复用器,是Java Nio的关键。简单来说,Selector会不断轮训注册在其上的Channel,如果某个Channel上发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后我们通过SelectionKey获取到这些就绪的Channel的集合,进行后续的IO操作。

  1. 创建Selector

    Selector selector = Selector.open();

  2. 向Selector注册Channel

    1
    2
    channel.configureBlocking(false);  
    channel.register(selector, Selectionkey.OP_READ);

    与Selector一起使用时,Channel必须处于非阻塞模式下

    register的第二个参数,是一个”insterest集合”,表示Selector监听这个channel时,对什么类型的事件感兴趣.可以监听以下四种事件:

    1. SelectionKey.OP_CONNECT: 某个channel成功连接到服务端;
    2. SelectionKey.OP_ACCEPT: 服务端的serverSocketChannel准备好接收新进入的连接;
    3. SelectionKey.OP_READ: 一个通道有数据可读,读就绪;
    4. SelectionKey.OP_WRITE: 通道可以写入数据,写就绪;

    如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如:
    int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

  3. SelectionKey
    SelectionKey是一个对象,它包含了一些属性:

    1. insterest集合
    2. ready集合: 已经就绪的操作的集合,但也可以使用类似于selectionKey.isAcceptable来判断是否就绪;
    3. Channel: Channel channel = key.channel();
    4. Selector: Selector selector = key.selector();
  4. 通过Selector选择通道
    一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。

    下面是select()方法:
    int select():select()阻塞到至少有一个通道在你注册的事件上就绪了。
    int select(long timeout): select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
    int selectNow():selectNow()不会阻塞,如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。

select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。

SelectionKeys()
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,获取已经就绪的SelectionKey的集合,然后通过对SelectionKey对象的操作,访问就绪通道.

服务端改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class NioHelloServer implements Runnable {

static int port = 9090;

static private Selector selector;

static private ServerSocketChannel serverSocketChannel;


public static void main(String[] args) {
new Thread(new NioHelloServer()).start();
}

@Override
public void run() {
try {
selector = Selector.open();

serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start in port: " + port);

while (true) {
selector.select(1000);

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;

while (it.hasNext()) {
key = it.next();
it.remove();

try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
}

} catch (IOException e) {
e.printStackTrace();
}
}

private void handleInput(SelectionKey key) throws IOException {

if (key.isValid()) {

if (key.isAcceptable()) {

ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {

SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);

int readBytes = sc.read(readBuffer);

if (readBytes > 0) {

readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
System.out.println("Server received message: " + message);
doReply(sc, "Hello," + message);
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else ;
}

}
}


private void doReply(SocketChannel sc, String message) throws IOException {

if (message != null && message.trim().length() > 0) {

byte[] bytes = message.getBytes();

ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
}
}
}

注意由于SocketChannel是异步非阻塞的,我们不能保证write的时候一次性就将ByteBuffer中的数据写入通道,所以,在实际应用中,我们需要对ByteBuffer做判断,是否还有未写的数据,如果有,继续写。同理,在读取channel到ByteBuffer时,可能一次性没读完,下一次要接着读。这里仅作为例子,没有做这么详细的考虑。

dapeng框架的IO和线程模型

在网络层面,使用的是netty的非阻塞方式。后台netty服务端收到请求,非阻塞的读取数据,将数据写入Buffer, 然后调用线程池解析(反序列化)这段Buffer中的数据,处理请求,并通过channel将请求结果返回。

对于同步请求客户端,它将netty的异步转为了同步, 简单来说,客户端将数据写入channel之后,本来可以该干嘛干嘛去,不要占用这个线程了,等请求结果回来之后再调用回调方法处理结果,但是它不这么干,它发送完数据后就阻塞在这儿等待,一直到请求结果返回(或者超时)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 回调方法,SoaClientHandler收到返回结果,即调用此方法处理返回结果,取出seqid,将结果放入对应的caches中,并释放锁,使得等待的线程返回结果
*/

private SoaClientHandler.CallBack callBack = msg -> {
// length(4) stx(1) version(...) protocol(1) seqid(4) header(...) body(...) etx(1)
int readerIndex = msg.readerIndex();
msg.skipBytes(5);
int len = msg.readInt();
msg.readBytes(new byte[len], 0, len);
msg.skipBytes(1);
int seqid = msg.readInt();
msg.readerIndex(readerIndex);

ByteBuf[] byteBufs = caches.get(String.valueOf(seqid));

if (byteBufs == null) {

if (futureCaches.containsKey(String.valueOf(seqid))) {

CompletableFuture<ByteBuf> future = (CompletableFuture<ByteBuf>) futureCaches.get(String.valueOf(seqid));
future.complete(msg);

futureCaches.remove(String.valueOf(seqid));
} else {
LOGGER.error("返回结果超时,siqid为:" + String.valueOf(seqid));
msg.release();
}
} else {
synchronized (byteBufs) {
byteBufs[0] = msg;

byteBufs.notify();
}
}
};

/**
* 发送请求,阻塞等待结果再返回
*
* @param seqid
* @param request
* @return
*/

public ByteBuf send(int seqid, ByteBuf request) throws Exception {
if (channel == null || !channel.isActive())
connect(host, port);

//means that this channel is not idle and would not managered by IdleConnectionManager
IdleConnectionManager.remove(channel);

ByteBuf[] byteBufs = new ByteBuf[1];

caches.put(String.valueOf(seqid), byteBufs);

try {
channel.writeAndFlush(request);

//等待返回结果,soaClientHandler会将结果写入caches并释放锁,此时返回
synchronized (byteBufs) {
if (byteBufs[0] != null)
return byteBufs[0];

try {
byteBufs.wait(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return byteBufs[0];
} finally {
caches.remove(String.valueOf(seqid));
}
}

异步和问题

Dapeng已经支持前端和后台的异步, 简单来说:

  • 对于客户端,你可以定义一个回调方法,然后像服务端发送请求,发送完线程就可以结束了,不需要等待返回;当服务器返回了处理结果后,框架会自动调用回调方法,处理这个返回结果;
  • 对于服务端,接收到一个请求后,线程可以将返回结果的准备交给第三方,然后就结束自己的使命。至于第三方,假设是一个线程,它可以同时管理很多个请求,并在结果获取到时“通知”该结果完成了,可以返回了。比如我们的竞拍模块,很多连接查询出价,但是要等到出价变化了才会返回最新的结果,如果是同步模式,前端和后端都需要阻塞等待数据变化,使用了异步模式后,前端发送请求和后端接收请求后可以释放线程,由一个独立的线程管理报价,发生变化时,通知所有请求结果已经准备好了可以返回了。

看起来很美好,但是有一个问题,异步结果的处理,也就是“回调方法”的执行者,跟请求者并不是同一个线程。也就是说它适用于返回结果并不影响后续执行的情况,比如通知,但是不适用于后续处理强依赖于返回结果的情况,也不适用于回调方法需要跟请求方法在同一个事务的情况。当然,请求端也可以发送异步请求后,处理其他工作,然后阻塞等待异步的结果返回…具体情况具体分析。

文章目录
  1. 1. java IO
  2. 2. IO实例
    1. 2.1. 服务端
    2. 2.2. 客户端
    3. 2.3. 优点和缺点
    4. 2.4. 优化:伪异步/使用线程池处理请求
  3. 3. NIO
    1. 3.1. 诞生
    2. 3.2. NIO和IO对比
      1. 3.2.1. 面向流和面向通道
      2. 3.2.2. 阻塞与非阻塞IO
    3. 3.3. 优点和缺点
      1. 3.3.1. 缺点
    4. 3.4. 组件和概念
      1. 3.4.1. Channel(通道)
      2. 3.4.2. Buffer(缓冲区)
      3. 3.4.3. Selector (选择器)
    5. 3.5. 服务端改造
  4. 4. dapeng框架的IO和线程模型
    1. 4.1. 异步和问题