网络IO

网络IO

阻塞与非阻塞io

read / write 有两个职责(检测与拷贝),read检测读buffer中是否有可读数据并根据情况完成内核到用户的拷贝,write检测写buffer中是否有可写位置并根据情况完成用户到内核数据的拷贝

连接的 fd 阻塞属性决定了 io 函数是否阻塞,对于阻塞IO检测到读buffer中无数据 或 写buffer中已满,会阻塞等待知道读buffer中有数据或写buffer中有位置后在完成数据的拷贝,当然这可能会出现,实际读写的数据少于想要读写的数据即:ret = read(fd , buf , sz) ret < sz。对于非阻塞IO检测到读buffer中无数据 或 写buffer中已满时会立即返回。具体差异在:IO 函数在数据未就绪时是否立刻返回

非阻塞IO处理方式

所有的IO函数都有一个参数fd,这意味着IO函数只能检测一条连接的就绪状态以及操作一条IO的数据

连接的建立

connect:

其中connect分为两种,一种是接收客户端的连接,另一种是服务器作为客户端主动去连接其他服务器如mysql服务器

客户端非阻塞IO连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    int fd = socket(AF_INET , SOCK_STREAM , 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(atoi(argv[2]));
inet_pton(AF_INET,argv[1],&servaddr.sin_addr);
int flag = fcntl(fd , F_GETFL);
fcntl(fd , F_SETFL , flag | O_NONBLOCK);
while(1) {
connect(fd, &servaddr, sizeof(servaddr));
printf("connect:%s errno:%d \n",strerror(errno) , errno);
sleep(1);
}
输出:
connect:Operation now in progress errno:115
connect:Operation now in progress errno:115
connect:Transport endpoint is already connected errno:106
connect:Transport endpoint is already connected errno:106

对于非阻塞IO的connect需要循环连接,其中errnoEINPROGRESS(正在建立)转变为EISCONN(已经连接)

listen:

会创建半连接队列和全连接队列

accept:

对于非阻塞fd,首先检测全连接队列中没有可用的连接,会返回-1并设置errnoEWOULDBLOCK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    int listenfd = socket(AF_INET , SOCK_STREAM , 0);
int flag = fcntl(listenfd , F_GETFL);
fcntl(listenfd , F_SETFL , flag | O_NONBLOCK);
struct sockaddr_in addr;
memset(&addr , 0 , sizeof addr);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_family = AF_INET;
addr.sin_port = htons(8888);
if( -1 == bind(listenfd , &addr , sizeof(addr)) ) perror("bind");
if(-1 == listen(listenfd , 20)) perror("listen");
struct sockaddr_in client;
socklen_t len = sizeof client;

while(1) {
int cfd = accept(listenfd, &client, &len);
if(errno == EWOULDBLOCK)
printf("cfd:%d accept:%s errno:%d \n",cfd,strerror(errno) , errno);
sleep(1);
}
输出:
cfd:-1 accept:Resource temporarily unavailable errno:11
cfd:-1 accept:Resource temporarily unavailable errno:11
cfd:-1 accept:Resource temporarily unavailable errno:11

连接断开

连接建立,若某一端关闭连接,而另一端仍然向它写数据,第一次写数据后会收到RST(Reset the connection)响应,此后再写数据,内核将向进程发出SIGPIPE信号,通知进程此连接已经断开。而SIGPIPE信号的默认处理是终止程序

主动断开:close shutdown

被动断开:对端读端关闭 read() = 0 对端写端关闭 write() = -1 && errno = EPIPE

reactor

reactor将对IO的操作转化为了对事件的处理

reactor由io多路复用和非阻塞IO组成,IO多路复用负责检测IO事件,非阻塞IO用于操作IO

reactor为什么要搭配非阻塞IO?

  1. 多线程环境下, 会将一个listenfd添加到多个epoll中,这里只有一个listenfd,对应只会有一个全连接队列,但是有多个epoll从这个全连接队列中获取节点,当有多个线程同时对一个节点进行accept时,只会有一个accept成功,若listenfd是阻塞IO其他线程会阻塞在accept上等待返回,若是非阻塞会返回-1并将errno设为EWOULDBLOCK
  2. 在边缘触发下必须使用非阻塞IO,边缘触发要求每次将readbuf中的数据读完,需要借助非阻塞IO read返回 -1 时的errno来判断结束条件
  3. 当reactor使用select时,select存在一个bug,当某个socket接收缓冲区有新数据分节到达,然后select报告这个socket描述符可读,但随后协议栈检查到这个新节点检验和错误,然后丢弃这个节点,这时候调用read则无数据可读,如果socket没有被设置为非阻塞,则此read会阻塞线程

无论是C++还是Java编写的网络框架,大多数都是基于Reactor模型进行设计和开发,Reactor模型基于事件驱动,特别适合处理海量的I/O事件。

Reactor模型中定义的三种角色:

  • Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建立就绪、读就绪、写就绪等。
  • Acceptor:处理客户端新连接,并分派请求到处理器链中。
  • Handler:将自身与事件绑定,执行非阻塞读/写任务,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。可用资源池来管理。

Reactor处理请求的流程:

读取操作:

  1. 应用程序注册读就绪事件和相关联的事件处理器
  2. 事件分离器等待事件的发生
  3. 当发生读就绪事件的时候,事件分离器调用第一步注册的事件处理器

写入操作类似于读取操作,只不过第一步注册的是写就绪事件。

1.单Reactor单线程模型

Reactor线程负责多路分离套接字,accept新连接,并分派请求到handler。Redis使用单Reactor单进程的模型。

消息处理流程:

  1. Reactor对象通过select监控连接事件,收到事件后通过dispatch进行转发。
  2. 如果是连接建立的事件,则由acceptor接受连接,并创建handler处理后续事件。
  3. 如果不是建立连接事件,则Reactor会分发调用Handler来响应。
  4. handler会完成read->业务处理->send的完整业务流程。

单Reactor单线程模型只是在代码上进行了组件的区分,但是整体操作还是单线程,不能充分利用硬件资源。handler业务处理部分没有异步。

对于一些小容量应用场景,可以使用单Reactor单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下:

  1. 即便Reactor线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送。
  2. 当Reactor线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重Reactor线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。
  3. 一旦Reactor线程意外中断或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。

为了解决这些问题,演进出单Reactor多线程模型。

2.单Reactor多线程模型

该模型在事件处理器(Handler)部分采用了多线程(线程池)。

img

消息处理流程:

  1. Reactor对象通过Select监控客户端请求事件,收到事件后通过dispatch进行分发。
  2. 如果是建立连接请求事件,则由acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接完成后续的各种事件。
  3. 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应。
  4. Handler只负责响应事件,不做具体业务处理,通过Read读取数据后,会分发给后面的Worker线程池进行业务处理。
  5. Worker线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给Handler进行处理。
  6. Handler收到响应结果后通过send将响应结果返回给Client。

相对于第一种模型来说,在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,handler收到响应后通过send将响应结果返回给客户端。这样可以降低Reactor的性能开销,从而更专注的做事件分发工作了,提升整个应用的吞吐。

但是这个模型存在的问题:

  1. 多线程数据共享和访问比较复杂。如果子线程完成业务处理后,把结果传递给主线程Reactor进行发送,就会涉及共享数据的互斥和保护机制。
  2. Reactor承担所有事件的监听和响应,只在主线程中运行,可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。

为了解决性能问题,产生了第三种主从Reactor多线程模型。

3.主从Reactor多线程模型

比起第二种模型,它是将Reactor分成两部分:

  1. mainReactor负责监听server socket,用来处理网络IO连接建立操作,将建立的socketChannel指定注册给subReactor。
  2. subReactor主要做和建立起来的socket做数据交互和事件业务处理操作。通常,subReactor个数上可与CPU个数等同。

Nginx、Swoole、Memcached和Netty都是采用这种实现。

消息处理流程:

  1. 从主线程池中随机选择一个Reactor线程作为acceptor线程,用于绑定监听端口,接收客户端连接
  2. acceptor线程接收客户端连接请求之后创建新的SocketChannel,将其注册到主线程池的其它Reactor线程上,由其负责接入认证、IP黑白名单过滤、握手等操作
  3. 步骤2完成之后,业务层的链路正式建立,将SocketChannel从主线程池的Reactor线程的多路复用器上摘除,重新注册到Sub线程池的线程上,并创建一个Handler用于处理各种连接事件
  4. 当有新的事件发生时,SubReactor会调用连接对应的Handler进行响应
  5. Handler通过Read读取数据后,会分发给后面的Worker线程池进行业务处理
  6. Worker线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给Handler进行处理
  7. Handler收到响应结果后通过Send将响应结果返回给Client

总结

Reactor模型具有如下的优点:

  1. 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
  2. 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
  3. 可扩展性,可以方便地通过增加Reactor实例个数来充分利用CPU资源;
  4. 可复用性,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性。

部分转载:https://cloud.tencent.com/developer/article/1488120