前言 链接到标题

在 Day11 中,我们实现了一种最容易想到的 Reactor 多线程模式,即将每个 Channel 的任务分配给一个线程执行。

这个模式逻辑上有不少问题,例如线程池由 EventLoop 来持有,按理来说应该由 Server 类来管理,这是受到了 Channel 类的限制,Channel 类仅有 EventLoop 成员。

主从 Reactor 模式 链接到标题

主从 Reactor 模式有以下几个特点:

  1. 服务器一般只有一个 main reactor,可以有很多个 sub reactor;
  2. 服务器管理一个线程池,每个线程对应一个 sub reactor,每个 sub reactor 负责一部分 Connections 的事件循环,事件执行也在这个线程完成;
  3. main reactor 只负责 Acceptor 建立新连接,然后将这个连接分配给一个 sub Reactor。

Server 成员 链接到标题

/todo,测试 accept 和 connect 的时候区分非阻塞与阻塞

Server 的成员包括一个 main_reactor 和 多个 sub_reactors,每个 sub_reactor 对应一个独有的 EventLoop,每个 sub_reactor 由一个线程负责。这就是所谓的 One Loop per Thread

class Server {
  private:
    EventLoop *main_reactor_;
    Acceptor *acceptor_;
    std::map<int, Connection *> connections_;
    std::vector<EventLoop *> sub_reactors_;
    ThreadPool *thpool_;

  public:
    Server(EventLoop *evl);
    ~Server();

    void HandleReadEvent(int fd);
    void NewConn(Socket *serv_sock);
    void DeleteConn(int sockfd);
};

main reactor 的工作流程 链接到标题

Server 创建的时候,会利用 main 函数的 loop 来初始化 Server,并利用 loop 来初始化 main_reactor_,和 acceptor_acceptor_ 会有绑定了服务器 ip 和端口的 socket。初始化 acceptor_ 的时候,会将 acceptor_->new_conn_callback_ 注册为 Server::NewConn(Socket *clnt_sock),当有连接时,acceptor_ 调用 Acceptor::AcceptConn(),该函数会调用 Socket::Accept(InetAddress *) 来接受连接,以及调用 new_conn_callback_(clnt_sock),实际上就是调用 Server::NewConn(Socket *clnt_sock)

main_reactor_ 就是接受、建立连接的事件循环,

Server::Server(EventLoop *loop) :
    main_reactor_(loop), acceptor_(nullptr) {
    acceptor_ = new Acceptor(main_reactor_);
    std::function<void(Socket *)> callback = [this](auto &&PH1) { NewConn(std::forward<decltype(PH1)>(PH1)); };
    acceptor_->set_new_conn_callback(callback); // 注册回调函数
    auto size = std::thread::hardware_concurrency(); // 获取 CPU 核心数?
    thpool_ = new ThreadPool(size);
    for (int i = 0; i < size; ++i) {
        sub_reactors_.push_back(new EventLoop());
    }

    for (int i = 0; i < size; ++i) {
        auto sub_loop = [capture0 = sub_reactors_[i]] { capture0->Loop(); };
        thpool_->add_task(sub_loop);
    }
}
Acceptor::Acceptor(EventLoop *loop) :
    loop_(loop), sock_(nullptr), accept_ch_(nullptr) {
    sock_ = new Socket();
    auto *addr = new InetAddress("127.0.0.1", 6789); // 6789679
    sock_->Bind(addr);
    sock_->Listen();
    sock_->Setnonblocking(); // Acceptor 建议使用阻塞式
    accept_ch_ = new Channel(loop_, sock_->getfd());
    std::function<void()> callback = [this] {
        AcceptConn();
    };
    // accept_ch_->set_use_threadpool(false); // 主 Acceptor 不使用线程池
    accept_ch_->set_read_callback(callback);
    accept_ch_->EnableReading();
    delete addr;
}
void Acceptor::AcceptConn() {
    auto *clnt_addr = new InetAddress();
    auto *clnt_sock = new Socket(sock_->Accpet(clnt_addr));
    printf("new client fd %d! IP: %s Port: %d\n", clnt_sock->getfd(), inet_ntoa(clnt_addr->get_addr().sin_addr), ntohs(clnt_addr->get_addr().sin_port));
    clnt_sock->Setnonblocking();
    new_conn_callback_(clnt_sock);
    delete clnt_addr;
}
void Server::NewConn(Socket *sock) {
    if (sock->getfd() != -1) {
        auto idx = sock->getfd() % sub_reactors_.size(); // 将连接随机分配到 sub_reactor
        auto *conn = new Connection(sub_reactors_[idx], sock); // 新建对应的 Connection 类
        auto callback = [this](auto &&ph1) { DeleteConn(std::forward<decltype(ph1)>(ph1)); };
        conn->set_delete_conn_callback(callback);
        connections_[sock->getfd()] = conn;
    }
}

一个连接对应一个 Connection 类,而 Connection 类创建的时候,会创建对应的 Channel,并注册 Channel 的回调函数,这个回调函数就是 Channel 用于处理读写事件的。

sub_reactor 的工作流程 链接到标题

当 Server 被初始化时,会创建 k 个 sub_reactors,每个 sub_reactor 就是一个 EventLoop,这里没有什么 sub_acceptor。同时有 k 个子线程被创建出来,每个线程对应一个 sub_reactor。

子线程的任务很简单,每一个子线程对应着一个 EventLoop,任务就是一直执行 sub_reactors[i]->Loop()

void Server::NewConn(Socket *sock) {
    if (sock->getfd() != -1) {
        auto idx = sock->getfd() % sub_reactors_.size(); // 将连接随机分配到 sub_reactor
        auto *conn = new Connection(sub_reactors_[idx], sock); // 新建对应的 Connection 类
        auto callback = [this](auto &&ph1) { DeleteConn(std::forward<decltype(ph1)>(ph1)); };
        conn->set_delete_conn_callback(callback);
        connections_[sock->getfd()] = conn;
    }
}

始终要注意,一个 reactor 就是一个 EventLoop!

这里可以再注意一下建立连接的时候发生的事情,建立的 Conn 会根据取模的结果,分配给一个 sub_reactor,每个 sub_reactor 只会关系属于自己的那些 Connection。在 Loop() 中得到关注的 Connections 中的活跃的 Channel,然后执行这些 Channel 的回调函数。

Channel 的回调函数在 Connection 创建的时候被注册。