本文将基于 libev 实现一个简单的 worker thread,支持投递任务来异步执行(有空的话会更新如何支持定时器任务已更新,详见这里)。Github 地址:worker_thread。
ev_async
ev_async
是 libev 中的一种事件 handle,可以在运行 ev_loop
之外的线程通过 ev_async_send
来唤醒事件循环,可以用作一种线程之间的 signaling 机制。
这样一来,投递任务可以先将任务塞进队列,再唤醒该线程的事件循环即可。
Main Loop
worker thread 的主循环十分简单,在构造函数初始化 ev_async
handle 之后,主循环直接调用 ev_run
即可:
1 2 3
| void WorkerThread::MainLoop() { ::ev_run(loop_, 0); }
|
初始化时需绑定 handle 与我们的 worker thread 实例,并注册回调,添加进事件循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| WorkerThread::WorkerThread() : loop_(::ev_loop_new(0)), async_(std::make_unique<struct ev_async>()), running_(true) { ev_async_init( async_.get(), [](EV_P_ struct ev_async *w, int revents) { auto *wt = reinterpret_cast<WorkerThread *>(w->data); wt->AsyncCb(); } ); ::ev_async_start(loop_, async_.get()); async_->data = this; thread_ = std::make_unique<std::thread>(&WorkerThread::MainLoop, this); NotifyLoop(); }
|
Async Handler Callback
在回调函数中我们需要查看当前队列是否还有任务,若有则执行,否则结束进入下一轮事件循环。在这里需要用锁来保护任务队列,因为其他线程 push task 的时候也会修改该队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| void WorkerThread::AsyncCb() { std::deque<Task> burst_; while (true) { { std::lock_guard<std::mutex> _{lock_}; if (!running_) { ::ev_async_stop(loop_, async_.get()); } if (tasks_.empty()) { break; } burst_.swap(tasks_); } while (!burst_.empty()) { auto task = std::move(burst_.front()); burst_.pop_front(); task(); } } }
|
Push Tasks
这里利用了 packaged_task
,用户可以通过返回的 future 来获取任务的返回值,或用来等待任务结束。在将任务塞进队列后需要调用 ev_async_send
通知事件循环来检查队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| template<class F, class... Args> decltype(auto) PushTask(F&& f, Args&&... args) { using result_type = std::invoke_result_t<F, Args...>; std::packaged_task<result_type(void)> task{std::bind(std::forward<F>(f), std::forward<Args>(args)...)};
auto result = task.get_future(); { std::lock_guard<std::mutex> _{lock_}; if (running_) { tasks_.emplace_back(std::move(task)); } } NotifyLoop();
return result; }
|
Push Timers
请见本文。