事件驱动 链接到标题

原先的代码中,不管是接受客户端连接还是处理客户端事件,都是围绕epoll来编程,可以说epoll是整个程序的核心,服务器做的事情就是监听epoll上的事件,然后对不同事件类型进行不同的处理。这种以事件为核心的模式又叫事件驱动,事实上几乎所有的现代服务器都是事件驱动的。和传统的请求驱动模型有很大不同,事件的捕获、通信、处理和持久保留是解决方案的核心结构。

将服务器改造成 Reactor 模式 链接到标题

我们将服务器抽象成一个 Server 类,类中有一个 main-reactor,main-reactor 的核心是一个 EventLoop,即不断循环,一旦有事件发生,我们就会通过 ep->Poll 知晓,然后作出对应的处理。

Channel 的修改 链接到标题

day5 中,每个 channel 里面都含有一个 Epoll * 指针,表示它在哪个 epoll 实例中被关注,这里我们把 Epoll * 替换成了 EventLoop *,表示该 channel 处于哪个事件循环中,事实上 EventLoop 的关键成员就是 Epoll *

初始化流程 链接到标题

首先创建 EventLoop 对象 loop,它会创建一个 Epoll 实例,核心就是 epfd,然后利用 loop 初始化 Server 对象,在这个过程中,会完成服务器的 serv_fd 的创建以及 bindlisten,同时将对应着 serv_fd 以及新建连接事件的 Channel 创建出来,将 Channel 的处理事件的回调函数设置为 NewConn;调用 serv_ch->EnableReading 会将关注的事件类型设置为 EPOLLIN | EPOLLET 并调用 loop->UpdateChannel(this) 最终调用 ep->UpdateChannel(ch),从而将 serv_ch 对应的文件描述符添加到 epoll 关注的文件描述符列表或者修改。

Server::Server(EventLoop *loop) :
    loop_(loop) {
    Socket *serv_sock = new Socket();
    InetAddress *serv_addr = new InetAddress("127.0.0.1", 8888);
    serv_sock->Bind(serv_addr);
    serv_sock->Listen();
    serv_sock->Setnonblocking();
    Channel *serv_ch = new Channel(loop, serv_sock->getfd());
    std::function<void()> cb = [this, serv_sock] { NewConn(serv_sock); };
    serv_ch->set_callback(cb);
    serv_ch->EnableReading();
}

void Channel::EnableReading() {
    events = EPOLLIN | EPOLLET;
    loop_->UpdateChannel(this);
}

void EventLoop::UpdateChannel(Channel *ch) {
    ep_->UpdateChannel(ch);
}

void Epoll::UpdateChannel(Channel *ch) {
    int fd = ch->getfd();
    struct epoll_event ev;
    memset(&ev, 0, sizeof(ev));
    ev.data.ptr = ch;
    ev.events = ch->get_events();
    if (!ch->get_in_epoll()) {
        // 添加到 epoll 中
        errif(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1, "epoll add error!\n");
        ch->set_in_epoll();
    } else {
        errif(epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1, "epoll mod error!\n");
    }
}

void Server::NewConn(Socket *serv_sock) {
    InetAddress *clnt_addr = new InetAddress();
    Socket *clnt_sock = new Socket(serv_sock->Accpet(clnt_addr));
    printf("new client fd %d! IP: %s Port: %d\n", clnt_sock->getfd(), inet_ntoa(clnt_addr->addr.sin_addr), ntohs(clnt_addr->addr.sin_port));
    clnt_sock->Setnonblocking();
    int sockfd = clnt_sock->getfd();
    Channel *clnt_ch = new Channel(loop_, sockfd);
    std::function<void()> callback = [this, sockfd] { HandleReadEvent(sockfd); }; // 设定建立了连接的 ch 对应的回调函数
    clnt_ch->set_callback(callback);
    clnt_ch->EnableReading();
}

上述代码就是有新连接到来时的函数调用流程。

新连接到来后 链接到标题

当有新连接到来后,就会调用 ch->HandleEvent(),即我们绑定的回调函数 Server::NewConn(Socket *serv_sock),它会建立连接,利用 accept 返回的文件描述符,创建 clnt_ch,并设置 clnt_ch 的回调函数为 Server::HandleReadEvent

这样,当活跃的 channel 是 clnt_ch 时,就会执行读写事件。

void EventLoop::Loop() {
    while (!quit_) {
        auto chs = ep_->Poll(); // Poll 返回的是活跃的 Channel 的集合
        for (auto *ch : chs) {
            ch->HandleEvent();
        }
    }
}