Worker Thread Based on libev

本文将基于 libev 实现一个简单的 worker thread,支持投递任务来异步执行(有空的话会更新如何支持定时器任务已更新,详见这里)。Github 地址:worker_thread

ev_async

ev_async 是 libev 中的一种事件 handle,可以在运行 ev_loop 之外的线程通过 ev_async_send 来唤醒事件循环,可以用作一种线程之间的 signaling 机制。

这样一来,投递任务可以先将任务塞进队列,再唤醒该线程的事件循环即可。

Main Loop

worker thread 的主循环十分简单,在构造函数初始化 ev_async handle 之后,主循环直接调用 ev_run 即可:

1
2
3
void WorkerThread::MainLoop() {
::ev_run(loop_, 0);
}

初始化时需绑定 handle 与我们的 worker thread 实例,并注册回调,添加进事件循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
WorkerThread::WorkerThread()
: loop_(::ev_loop_new(0)),
async_(std::make_unique<struct ev_async>()),
running_(true) {
ev_async_init(
async_.get(),
[](EV_P_ struct ev_async *w, int revents) {
auto *wt = reinterpret_cast<WorkerThread *>(w->data);
wt->AsyncCb();
}
);
::ev_async_start(loop_, async_.get());
async_->data = this;
thread_ = std::make_unique<std::thread>(&WorkerThread::MainLoop, this);
NotifyLoop();
}

Async Handler Callback

在回调函数中我们需要查看当前队列是否还有任务,若有则执行,否则结束进入下一轮事件循环。在这里需要用锁来保护任务队列,因为其他线程 push task 的时候也会修改该队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void WorkerThread::AsyncCb() {
std::deque<Task> burst_;
while (true) {
{
std::lock_guard<std::mutex> _{lock_};
if (!running_) {
::ev_async_stop(loop_, async_.get());
}
if (tasks_.empty()) {
break;
}
burst_.swap(tasks_);
}
while (!burst_.empty()) {
auto task = std::move(burst_.front());
burst_.pop_front();
task();
}
}
}

Push Tasks

这里利用了 packaged_task ,用户可以通过返回的 future 来获取任务的返回值,或用来等待任务结束。在将任务塞进队列后需要调用 ev_async_send 通知事件循环来检查队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<class F, class... Args>
decltype(auto) PushTask(F&& f, Args&&... args) {
using result_type = std::invoke_result_t<F, Args...>;
std::packaged_task<result_type(void)>
task{std::bind(std::forward<F>(f), std::forward<Args>(args)...)};

auto result = task.get_future();
{
std::lock_guard<std::mutex> _{lock_};
if (running_) {
tasks_.emplace_back(std::move(task));
}
}
NotifyLoop();

return result;
}

Push Timers

请见本文