说明:当前使用的版本为uwsgi 2.0
目前在网络上能够找到的uwsgi源码相关的文档少之又少(当然,也不排除是官方文档大家觉得已经足够,或者是如我一般愚笨的人少之又少╮(╯_╰)╭)。在这个背景下,作为uwsgi小白的我,由于种种原因,决定自食其力。因此,准备开一个uwsgi源码阅读系列,结合官网,整理自己阅读uwsgi源码过程中的所思所想所得,作为不备之需。本系列没有deadline,不定时更新。
免责声明:由于本文作者(也就是我)目前尚未全面掌握uwsgi,C代码勉强能读懂,因此假若本文有错误的地方,请不要犹豫,不要客气,尽情在评论中提出!
好了,废话说完了,进入正题︿( ̄︶ ̄)︿
uWSGI中的信号及其处理
uWSGI信号量与UNIX/Posix信号量毫无共同之处。你可以定义一个信号,绑定该信号接收到时应该触发的操作(信号处理函数)。通过uwsgi中的信号,你可以监控到文件/目录的修改,可以定时进行一些处理工作等等等等。
相关结构说明
uwsgi_signal_entry
1 | struct uwsgi_signal_entry { |
信号表保存在
uwsgi_shared
结构中,定义为struct uwsgi_signal_entry *signal_table;
信号表由所有worker共享 (并通过共享锁对抗竞争条件)。当一个workder接收到一个信号,它会搜索信号表,查找对应的处理程序并执行。
定时器
1 | // 普通定时器 |
监控器
1 | // 文件监控器 |
uWSGI中的信号API
信号API位于core/signal.c文件中。
处理uwsgi信号: uwsgi_signal_handler
1 | int uwsgi_signal_handler(struct wsgi_request *wsgi_req, uint8_t sig) { |
uwsgi_signal_registered
检查信号在当前worker内是否注册了处理函数。如果注册了,返回1,否则返回0 1
2
3
4
5
6
7
8int uwsgi_signal_registered(uint8_t sig) {
int pos = (uwsgi.mywid * 256) + sig;
if (uwsgi.shared->signal_table[pos].handler != NULL)
return 1;
return 0;
}uwsgi_register_signal
注册信号以及相应的处理函数等。返回0表示注册成功,非0表示有异常
1 | int uwsgi_register_signal(uint8_t sig, char *receiver, void *handler, uint8_t modifier1) { |
添加文件监控器: uwsgi_add_file_monitor
1 | // sig: 信号值 |
- 检查参数filename的长度,不能超过254
- 通过文件监控表锁
uwsgi.fmon_table_lock
来处理竞争 - 如果当前已注册的文件监控器大小
ushared->files_monitored_cnt
已经是64(可添加的最大数),则退出 - 将信号sig和文件名filename信息附加到已监控文件数组
ushared->files_monitored
的尾部,修改注册状态。
添加定时器: uwsgi_add_timer
返回0表示表示添加成功,非0表示失败。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25int uwsgi_add_timer(uint8_t sig, int secs) {
// 不在master模式下,则返回错误。
if (!uwsgi.master_process) return -1;
uwsgi_lock(uwsgi.timer_table_lock);
// 判断是否已经添加了64个定时器,有则退出,否则将sig和secs注册到定时器表(master会用它来添加项到时间队列中)中
if (ushared->timers_cnt < 64) {
// fill the timer table, the master will use it to add items to the event queue
ushared->timers[ushared->timers_cnt].value = secs;
ushared->timers[ushared->timers_cnt].registered = 0;
ushared->timers[ushared->timers_cnt].sig = sig;
ushared->timers_cnt++;
}
else {
uwsgi_log("you can register max 64 timers !!!\n");
uwsgi_unlock(uwsgi.timer_table_lock);
return -1;
}
uwsgi_unlock(uwsgi.timer_table_lock);
return 0;
}uwsgi_signal_add_rb_timer
返回0表示表示添加成功,非0表示失败。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28int uwsgi_signal_add_rb_timer(uint8_t sig, int secs, int iterations) {
// 不在master模式下,则返回错误。
if (!uwsgi.master_process)
return -1;
uwsgi_lock(uwsgi.rb_timer_table_lock);
// 判断是否已经添加了64个定时器,有则退出,否则将sig和secs注册到定时器表(master会用它来添加项到时间队列中)中
if (ushared->rb_timers_cnt < 64) {
// fill the timer table, the master will use it to add items to the event queue
ushared->rb_timers[ushared->rb_timers_cnt].value = secs;
ushared->rb_timers[ushared->rb_timers_cnt].registered = 0;
ushared->rb_timers[ushared->rb_timers_cnt].iterations = iterations;
ushared->rb_timers[ushared->rb_timers_cnt].iterations_done = 0;
ushared->rb_timers[ushared->rb_timers_cnt].sig = sig;
ushared->rb_timers_cnt++;
}
else {
uwsgi_log("you can register max 64 rb_timers !!!\n");
uwsgi_unlock(uwsgi.rb_timer_table_lock);
return -1;
}
uwsgi_unlock(uwsgi.rb_timer_table_lock);
return 0;
}uwsgi_route_signal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107void uwsgi_route_signal(uint8_t sig) {
// 找到信号sig在信号表中对应的信号项
int pos = (uwsgi.mywid * 256) + sig;
struct uwsgi_signal_entry *use = &ushared->signal_table[pos];
int i;
// 如果信号接收者是worker/worker0(默认),则发送给第一个可用worker
if (use->receiver[0] == 0 || !strcmp(use->receiver, "worker") || !strcmp(use->receiver, "worker0")) {
if (uwsgi_signal_send(ushared->worker_signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to workers pool\n", sig);
}
}
// 如果信号接收者是workers,则发送给所有worker
else if (!strcmp(use->receiver, "workers")) {
for (i = 1; i <= uwsgi.numproc; i++) {
if (uwsgi_signal_send(uwsgi.workers[i].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to worker %d\n", sig, i);
}
}
}
// 如果信号接收者是active-workers,则发送给所有活跃worker(active worker,判定标准是worker不是cheaped/suspended)
else if (!strcmp(use->receiver, "active-workers")) {
for (i = 1; i <= uwsgi.numproc; i++) {
if (uwsgi.workers[i].pid > 0 && !uwsgi.workers[i].cheaped && !uwsgi.workers[i].suspended) {
if (uwsgi_signal_send(uwsgi.workers[i].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to worker %d\n", sig, i);
}
}
}
}
// 如果信号接收者是指定的worker,则发送给指定的worker(对于workerN,N不能大于最大的worker数)
else if (!strncmp(use->receiver, "worker", 6)) {
i = atoi(use->receiver + 6);
if (i > uwsgi.numproc) {
uwsgi_log("invalid signal target: %s\n", use->receiver);
}
if (uwsgi_signal_send(uwsgi.workers[i].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to worker %d\n", sig, i);
}
}
// 如果信号接收者是subscribed,则发送给订阅了这个信号的进程(当前版本未实现)
else if (!strcmp(use->receiver, "subscribed")) {
}
// 如果信号接收者是spooler,则发送给spooler
else if (!strcmp(use->receiver, "spooler")) {
if (ushared->worker_signal_pipe[0] != -1) {
if (uwsgi_signal_send(ushared->spooler_signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to the spooler\n", sig);
}
}
}
// 如果信号接收者是mules,则发送给所有的mule
else if (!strcmp(use->receiver, "mules")) {
for (i = 0; i < uwsgi.mules_cnt; i++) {
if (uwsgi_signal_send(uwsgi.mules[i].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to mule %d\n", sig, i + 1);
}
}
}
// 如果信号接收者是指定的mule,则发送给指定的mule(对于muleN,N不能大于最大的mule数)
else if (!strncmp(use->receiver, "mule", 4)) {
i = atoi(use->receiver + 4);
if (i > uwsgi.mules_cnt) {
uwsgi_log("invalid signal target: %s\n", use->receiver);
}
else if (i == 0) {
if (uwsgi_signal_send(ushared->mule_signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to a mule\n", sig);
}
}
else {
if (uwsgi_signal_send(uwsgi.mules[i - 1].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to mule %d\n", sig, i);
}
}
}
// 如果信号接收者是farm_name,那么会根据name获取uwsg_farm实例,然后将信号发送给name指定的farm
else if (!strncmp(use->receiver, "farm_", 5)) {
char *name = use->receiver + 5;
struct uwsgi_farm *uf = get_farm_by_name(name);
if (!uf) {
uwsgi_log("unknown farm: %s\n", name);
return;
}
if (uwsgi_signal_send(uf->signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to farm %d (%s)\n", sig, uf->id, uf->name);
}
}
// 如果信号接收者是指定的farm,则发送给指定的farm(对于farmN,N不能大于最大的farm数)
else if (!strncmp(use->receiver, "farm", 4)) {
i = atoi(use->receiver + 4);
if (i > uwsgi.farms_cnt || i <= 0) {
uwsgi_log("invalid signal target: %s\n", use->receiver);
}
else {
if (uwsgi_signal_send(uwsgi.farms[i - 1].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to farm %d (%s)\n", sig, i, uwsgi.farms[i - 1].name);
}
}
}
// 如果信号接收者不是上面任意一个,则输出日志后退出
else {
// unregistered signal, sending it to all the workers
uwsgi_log("^^^ UNSUPPORTED SIGNAL TARGET: %s ^^^\n", use->receiver);
}
}create_signal_pipe
1
void create_signal_pipe(int *sigpipe)
uwsgi_remote_signal_send
1
int uwsgi_remote_signal_send(char *addr, uint8_t sig)
uwsgi_signal_send
1
int uwsgi_signal_send(int fd, uint8_t sig)