Worker Thread Based on libev - Supporting Timers

前文,本文将在其基础上添加对定时器的支持。Github 地址:worker_thread

Timer Structure

我们需要一个辅助结构来存储定时器的状态等信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct Timer : public std::enable_shared_from_this<Timer> {
enum { ADD, DELETE, NONE };

Timer(long delay, long repeat, struct ev_loop *loop);
~Timer();

void Start();
void Stop();

void Invoke();
void InvokeOnce();

long delay;
long repeat;
struct ev_loop *loop;
struct ev_timer *timer;
int action;
Task task;
};

由于在 libev 不保证线程安全,因此ev_<handle>_startev_<handle>_stop 必须保证在事件循环所在的线程中调用,在上述结构中即是 Timer::StartTimer::Stop 必须在事件循环中调用。

Modified Async Handler Callback

ev_async 的回调中添加处理定时器的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void WorkerThread::AsyncCb() {
std::deque<Task> burst_task;
std::list<std::shared_ptr<Timer>> pending_timer;
while (true) {
{
std::lock_guard<std::mutex> _{lock_};
if (!running_) {
::ev_async_stop(loop_, async_.get());
}
if (tasks_.empty() && pending_timers_.empty()) {
break;
}
burst_task.swap(tasks_);
pending_timer.swap(pending_timers_);
}
while (!burst_task.empty()) {
auto task = std::move(burst_task.front());
burst_task.pop_front();
task();
}
while (!pending_timer.empty()) {
auto timer = pending_timer.front();
pending_timer.pop_front();
if (timer->action == Timer::ADD) {
RegisterTimer(timer);
} else {
UnregisterTimer(timer);
}
}
}
}

其中 pending_timers_ 成员变量即保存了待处理的 Timer 变量,处理的动作由其 action 成员来指明。

Push Timers

这里即是准备等待添加到事件循环的定时器结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template<class F, class... Args>
decltype(auto) PushTimer(long delay, long repeat, F&& f, Args&&... args) {
using result_type = std::invoke_result_t<F, Args...>;
std::packaged_task<result_type(void)>
task{std::bind(std::forward<F>(f), std::forward<Args>(args)...)};

auto result = task.get_future();
auto timer = std::make_shared<Timer>(delay, repeat, loop_);

timer->task = std::move(task);
timer->action = Timer::ADD;
{
std::lock_guard<std::mutex> _{lock_};
if (running_) {
pending_timers_.emplace_back(timer);
}
}
NotifyLoop();

return result;
}