libcurl Multi Interface Together With libev


这是一个非常简单的 libev 和 libcurl 协作的例子。

libcurl Nonblocking Multi Interface

有时我们想在某个 event loop 中发送大量 http 请求,我们可以使用 libcurl 提供的 curl_multi_socket_action 机制来实现。要使用这个函数,需要设置两个主要选项: CURLMOPT_SOCKETFUNCTION 和 CURLMOPT_TIMERFUNCTION 。

CURLMOPT_TIMERFUNCTION

函数原型:

1
2
3
int timer_callback(CURLM *multi,    /* multi handle */
long timeout_ms, /* timeout in number of ms */
void *userp); /* private callback pointer */


该函数被用于通知用户设置一个一次性的超时为 timeout_ms 的定时器。若 timeout_ms 的值是 -1 则代表用户可以移除该定时器。


参数 userp 可以通过 CURLMOPT_TIMERDATA 选项来设置。

CURLMOPT_SOCKETFUNCTION

函数原型:

1
2
3
4
5
int socket_callback(CURL *easy,      /* easy handle */
curl_socket_t s, /* socket */
int what, /* describes the socket */
void *userp, /* private callback pointer */
void *socketp); /* private socket pointer */


该函数被用于通知用户 libcurl 感兴趣的 socket 事件。 s 即是关心的 socket 描述符, what 就是需要监听的事件,可能的值有 CURL_POLL_IN , CURL_POLL_OUT , CURL_POLL_INOUT 和 CURL_POLL_REMOVE 。顾名思义,前三个值即是需要事件循环监听的 I/O 事件;而第四个值则是通知用户可以移除与 s 关联的事件监听器。


参数 userp 可以通过 CURLMOPT_SOCKETDATA 选项来设置。而 socketp 则是与 s 关联的自定义数据,初始值是 NULL ,可以通过 curl_multi_assign 函数来设置,有了这个参数可以更方便地将 event loop 的 I/O 事件监听器和 s 进行关联。

Implementation

我们将使用 libev 作为主事件循环,这样一来定时器即是 struct ev_timer ,I/O 监听器即是 struct ev_io 。


定时器超时或者监听的事件出现后,我们需要调用前述的 curl_multi_socket_action 来让 libcurl 进行下一步操作。


那么最初始的事件如何设置呢,在使用 curl_multi_add_handle 添加请求时会调用定时器的回调函数来设置一个超时为 0ms 的定时器,接下来直接启动事件循环即可。


若在 curl_multi_add_handle 之前没有设置 CURLMOPT_TIMERFUNCTION ,那么可以在之后手动调用 action 函数:

1
curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &running);


或者直接手动注册一个超时为 0 的定时器,因为最终在定时器超时后也会调用上述函数。

Demo

Source Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/* file: curl-libev-demo.c */

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>

#include <curl/curl.h>
#include <ev.h>

#define _STR(x) #x
#define STR(x) _STR(x)

#define _LOGAT __FILE__ ":" STR(__LINE__) "] "
#define LOG(fmt, ...) fprintf(stdout, _LOGAT fmt "\n", ##__VA_ARGS__)

static int curl_sock_cb(CURL *e, curl_socket_t s, int what, void *userp, void *sockp);
static int curl_timer_cb(CURLM *m, long tmo, void *userp);
static void ev_timer_cb(EV_P_ struct ev_timer *w, int revents);

static void add_url(CURLM *m, const char *url);

int main() {
CURLM *m;
CURLMsg *msg;
int remain_msg;
struct ev_loop *loop;
struct ev_timer timer;

curl_global_init(CURL_GLOBAL_ALL);

m = curl_multi_init();
if (!m) {
LOG("failed to init multi");
goto __no_multi;
}

loop = ev_default_loop(0);
ev_set_userdata(loop, m);

ev_timer_init(&timer, ev_timer_cb, 0., 0.);
timer.data = loop;

curl_multi_setopt(m, CURLMOPT_SOCKETFUNCTION, curl_sock_cb);
curl_multi_setopt(m, CURLMOPT_SOCKETDATA, loop);
curl_multi_setopt(m, CURLMOPT_TIMERFUNCTION, curl_timer_cb);
curl_multi_setopt(m, CURLMOPT_TIMERDATA, &timer);

add_url(m, "https://github.com");
add_url(m, "https://blog.icpz.dev");

ev_run(loop, 0);

while ((msg = curl_multi_info_read(m, &remain_msg)) != NULL) {
CURL *e = NULL;
long http_code;
if (msg->msg != CURLMSG_DONE) {
LOG("invalid multi info msg");
continue;
}
e = msg->easy_handle;
if (msg->data.result != CURLE_OK) {
LOG("easy handle %p error with code %d", e, msg->data.result);
} else {
curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &http_code);
LOG("easy handle %p done with http code %ld", e, http_code);
}
curl_multi_remove_handle(m, e);
curl_easy_cleanup(e);
}

ev_loop_destroy(loop);
curl_multi_cleanup(m);
__no_multi:
curl_global_cleanup();

return 0;
}

static void add_url(CURLM *m, const char *url) {
CURL *e = curl_easy_init();

curl_easy_setopt(e, CURLOPT_URL, url);
curl_easy_setopt(e, CURLOPT_HEADER, 1L);
curl_easy_setopt(e, CURLOPT_NOBODY, 1L);
curl_multi_add_handle(m, e);
LOG("easy handle %p added", e);
}

static void ev_sock_cb(EV_P_ struct ev_io *w, int revents) {
int running;
int events = 0;
CURLM *m = (CURLM *)ev_userdata(EV_A);

events |= (revents & EV_READ ? CURL_CSELECT_IN : 0);
events |= (revents & EV_WRITE ? CURL_CSELECT_OUT : 0);
events |= (revents & EV_ERROR ? CURL_CSELECT_ERR : 0);

curl_multi_socket_action(m, w->fd, events, &running);
}

static int curl_sock_cb(CURL *e, curl_socket_t s, int what, void *userp, void *sockp) {
struct ev_io *io = (struct ev_io *)sockp;
struct ev_loop *loop = (struct ev_loop *)userp;
CURLM *m = (CURLM *)ev_userdata(loop);
int events = 0;

if (io) {
ev_io_stop(loop, io);
}
if (what == CURL_POLL_REMOVE) {
free(io);
curl_multi_assign(m, s, NULL);
goto __out;
}

if (!io) {
io = (struct ev_io *)malloc(sizeof *io);
curl_multi_assign(m, s, io);
io->data = m;
}
switch (what) {
case CURL_POLL_IN:
events |= EV_READ;
break;
case CURL_POLL_OUT:
events |= EV_WRITE;
break;
case CURL_POLL_INOUT:
events |= EV_READ;
events |= EV_WRITE;
break;
default:
break;
}
ev_io_init(io, ev_sock_cb, s, events);
ev_io_start(loop, io);

__out:
return 0;
}

static void ev_timer_cb(EV_P_ struct ev_timer *w, int revents) {
CURLM *m = (CURLM *)ev_userdata(EV_A);
int running;

curl_multi_socket_action(m, CURL_SOCKET_TIMEOUT, 0, &running);
}

static int curl_timer_cb(CURLM *m, long tmo, void *userp) {
struct ev_timer *timer = (struct ev_timer *)userp;
struct ev_loop *loop = (struct ev_loop *)timer->data;

ev_timer_stop(loop, timer);
if (tmo == -1) {
goto __out;
}
ev_timer_set(timer, (double)tmo / 1000., 0.);
ev_timer_start(loop, timer);

__out:
return 0;
}

Build & Run

1
2
3
gcc curl-libev-demo.c -o demo -lcurl -lev

./demo