概要

  • I/O模型

    • 同步阻塞I/O
    • 同步非阻塞I/O
    • I/O多路复用
    • 异步 I/O
  • NioEndPoint

    • Acceptor
    • Poller
    • SocketProcessor

I/O模型

UNIX 系统下的 I/O 模型有 5 种:

  • 同步阻塞 I/O

  • 同步非阻塞I/O

  • I/O多路复用

  • 信号驱动I/O(不了解)

  • 异步I/O

所谓I/O,就是计算机内存与外部设备之间拷贝数据的过程。

JAVA I/O模型

当用记发起I/O操作后,经历2个步骤

  1. 用户线程等待内核将数据从网卡(外设)中拷贝到内核空间

  2. 内核将数据从内核空间拷贝到用户空间

1.同步阻塞I/O

  1. 用户线程发起read调用后就阻塞了,让出CPU。
  2. 内核等待网卡数据到来,把数据从网卡拷贝到内核空间,接着把数据拷贝到用户空间,再把用户线程叫醒
  3. 用户线程读取数据
image-20200110142558659

2.同步非阻塞I/O

  1. 用户线程不断的发起read调用,数据没到内核空间时,每次都返回失败
  2. 内核等待网卡数据到来,把数据从网卡拷贝到内核空间
  3. 这一次read调用后线程开始阻塞。在等待数据从内核空间拷贝到用户空间,等数据到用户空间后再把线程叫醒
image-20200110142746256

3.I/O多路复用

  1. 通过select询问内核数据是否已经到达,select()方法是阻塞的。
  2. 通过read调用命令内核把网卡的数据拷贝到用户空间下,在内核拷贝数据到用户空间的这段时间内线程是阻塞的。一般这种情况下都是用selector在一个死循环内来实现的。

之所以称为多路复用,是因为一个selector可以询问多个连接的数据是否已经到达。

image-20200108223222709

4.异步 I/O

  1. 用户线程发起 read 调用的同时注册一个回调函数,,read 立即返回,
  2. 等内核将数据准备好后,再调用指定的回调函数完成处理,在这个过程中,用户线程一直没有阻塞
image-20200110142905964

NioEndPoint

它有三大线程组分别用于处理不同的逻辑:

  • Acceptor线程:负责监听请求,等待和接收客户端连接。在接收到连接后,创建SocketChannel并将其注册到poller线程。
  • Poller线程:将SocketChannel放到selector上注册读事件,轮询selector,获取就绪的SelectionKey,并将就绪的SelectionKey(或SocketChannel)委托给工作线程。
  • SocketProcessor(work):执行真正的业务逻辑。
    Acceptor线程和poller线程之间有一个SocketChannel队列,Acceptor线程负责将SocketChannel推送到队列,poller线程负责从队列取出SocketChannel。poller线程从队列取出SocketChannel后,紧接着会把它放到selector上注册读事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class NioEndpoint {
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {

....
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
initializeConnectionLatch();

// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
//调用SynchronizedQueue.poll(),直到拿到PollerEvent
pollerThread.start();

//创建接收器,在AbstractEndpoint#startAcceptorThread()中创建Acceptor,然后启动
//最终调用NioEndpoint#serverSocketAccept(),启动socketServer
//开始监听,到这Connector启动就结束了
//Acceptor接收到请求封装成PollerEvent然后放到SynchronizedQueue中
//NioEndPoint最核心之处,启动Acceptor
startAcceptorThread();

}

//创建了Acceptor线程并启动
protected void startAcceptorThread() {
acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor";
acceptor.setThreadName(threadName);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}

}

NioEndPoint的启动,最主要是创建Acceptor线程池,同时监听新请求。