前言 链接到标题

到 day9 的时候,一个单线程的服务器已经算写好了。Reactor 驱动大致成型。

服务器的启动流程大致如下,先创建 EventLoop 对象 loop(里面包含了 Epoll 对象),然后 Server 会利用 loop 实例化对象 server,Server 对象实例化时,Acceptor 类型的 acceptor_ 对象会被初始化, Acceptor 对象用于建立连接,会在 Server 的构造函数里回调函数 acceptor_->new_conn_callback_ 会被注册为 server->NewConn(Socket *clnt_sock) ,由 acceptor_->AcceptConn() 来调用。而 Acceptor 的构造函数中 Channel 类的实例 accept_ch_ 会被初始化,而回调函数 acceptr_ch->callback_ 会被注册为 acceptor_->AcceptConn(),最终还是调用的 server->NewConn(Socket *clnt_sock)

Server::Server(EventLoop *loop) :
    loop_(loop), acceptor_(nullptr) {
    acceptor_ = new Acceptor(loop);
    std::function<void(Socket *)> callback = [this](auto &&PH1) { NewConn(std::forward<decltype(PH1)>(PH1)); };
    acceptor_->set_new_conn_callback(callback);
}

void Server::NewConn(Socket *sock) {
    auto *conn = new Connection(loop_, sock); // 这里应该是 clnt_sock
    std::function<void(Socket *)> callback = [this](auto &&PH1) { DeleteConn(std::forward<decltype(PH1)>(PH1)); };
    conn->set_delete_conn_callback(callback);
    connections_[sock->getfd()] = conn;
}

void Acceptor::AcceptConn() {
    auto *clnt_addr = new InetAddress();
    auto *clnt_sock = new Socket(sock_->Accpet(clnt_addr));
    printf("new client fd %d! IP: %s Port: %d\n", clnt_sock->getfd(), inet_ntoa(clnt_addr->get_addr().sin_addr), ntohs(clnt_addr->get_addr().sin_port));
    clnt_sock->Setnonblocking();
    new_conn_callback_(clnt_sock);
    delete clnt_addr;
}

void Acceptor::set_new_conn_callback(std::function<void(Socket *)> &callback) {
    new_conn_callback_ = callback;
}

当建立连接的时候,会生成一个 Connection 实例,存储在 Sever 的 connections_ 变量中。connections_ 是 key 为 client fd,value 为建立的连接的指针 Connection *。并在,在建立连接时,Server::DelteConn 会被注册为 Connection 实例的回调函数 delete_conn_call_back_。当读取数据时,如果发现 read_bytes == 0,就会执行 delete_conn_call_back_ 来断开连接。

void Server::DeleteConn(Socket *sock) {
    Connection *conn = connections_[sock->getfd()];
    connections_.erase(sock->getfd());
    delete conn;
}

而 Connection 实例 conn 建立时,会创建一个 Channel 类的实例 clnt_chclng_ch->callback_ 被设置为 conn->echo()。调用 clnt_ch->EnableReading() 会将该 clnt_ch->fd 添加到 epoll 关注的文件描述符中去,对应的 ev 中会包含指向该 clnt_ch 的指针。

之后便是 loop->Loop() 一直循环了。

void EventLoop::Loop() {
    while (!quit_) {
        auto chs = ep_->Poll(); // Poll 返回的是活跃的 Channel 的集合
        for (auto *ch : chs) {
            ch->HandleEvent();
        }
    }
}

最后注意一下生命周期的管理,server 的析构函数会 delete acceptor_,而 acceptor_ 的析构函数 会 delete sock_, addr_, accept_ch_loop 的析构函数会 delete ep_

断开连接时,会 delete conn,从而 delete channel_, sock_, read_buffer_

在 main 函数的 loop->Loop() 循环结束时,会 delete serverdelete loop

理论上 loop->Loop() 一直不会结束。

添加线程池 链接到标题

线程池说到底就是一个 vector<std::thread> 加上一个任务队列组成。Channel 可以说是生产者,每次发生事件,就往任务队列中添加要执行的函数 callback_,添加完之后要唤醒因条件变量阻塞的线程(notify_one 或者 notify_all)。而线程创建的时候,绑定的函数就是往任务队列里面取任务,如果队列为空,就会阻塞在条件变量 cv_ 上,而释放锁。

ThreadPool::ThreadPool(int size) :
    stop_(false) {
    for (int i = 0; i < size; ++i) {
        // 线程创建成功
        printf("create thread!\n");
        auto func = [this]() {
            while (true) {
                std::function<void()> task;
                {
                    std::unique_lock<std::mutex> lock(tasks_mtx_);
                    cv_.wait(lock, [this]() {
                        return stop_ || !tasks_.empty();
                    });
                    if (stop_ && tasks_.empty()) {
                        return;
                    }
                    task = tasks_.front();
                    printf("get task\n");
                    tasks_.pop();
                }
                printf("run task!\n");
                task();
            }
        }; // 创建线程时绑定的函数
        threads_.emplace_back(func);
    }
    printf("thread count: %d\n", threads_.size());
}

void Channel::HandleEvent() {
    loop_->add_thread(callback_);
    /* callback_(); */
}

void EventLoop::add_thread(std::function<void()> func) {
    thread_pool_->add_task(func);
}

void ThreadPool::add_task(std::function<void()> func) {
    {
        std::unique_lock<std::mutex> lock(tasks_mtx_);
        printf("add task\n");
        if (stop_) {
            throw std::runtime_error("ThreadPool already stop, can't add task any more");
        }
        tasks_.emplace(func);
        cv_.notify_one(); // 记得要唤醒线程
    }
}

说到底,就是把本来应该由 Channel 直接执行的任务,给挪到任务队列里面让线程池中的线程取出来去执行。

这里还有一个问题,那就是 Acceptor 的建立连接的任务,Channel 也会放到线程池里面让线程竞争得到执行权再执行,这样是不应该的。应该直接由主线程负责。