好好学习,天天向上

uwsgi代码小札|信号与signal.c

说明:当前使用的版本为uwsgi 2.0

目前在网络上能够找到的uwsgi源码相关的文档少之又少(当然,也不排除是官方文档大家觉得已经足够,或者是如我一般愚笨的人少之又少╮(╯_╰)╭)。在这个背景下,作为uwsgi小白的我,由于种种原因,决定自食其力。因此,准备开一个uwsgi源码阅读系列,结合官网,整理自己阅读uwsgi源码过程中的所思所想所得,作为不备之需。本系列没有deadline,不定时更新。

免责声明:由于本文作者(也就是我)目前尚未全面掌握uwsgi,C代码勉强能读懂,因此假若本文有错误的地方,请不要犹豫,不要客气,尽情在评论中提出!

好了,废话说完了,进入正题︿( ̄︶ ̄)︿

uWSGI中的信号及其处理

uWSGI信号量与UNIX/Posix信号量毫无共同之处。你可以定义一个信号,绑定该信号接收到时应该触发的操作(信号处理函数)。通过uwsgi中的信号,你可以监控到文件/目录的修改,可以定时进行一些处理工作等等等等。

相关结构说明

uwsgi_signal_entry

1
2
3
4
5
6
struct uwsgi_signal_entry {
int wid; // worker id
uint8_t modifier1;
char receiver[64]; // 信号目标,也就是signal targer
void *handler; // 信号处理程序
};

信号表保存在uwsgi_shared结构中,定义为struct uwsgi_signal_entry *signal_table; 信号表由所有worker共享 (并通过共享锁对抗竞争条件)。当一个workder接收到一个信号,它会搜索信号表,查找对应的处理程序并执行。

定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 普通定时器
struct uwsgi_timer {
int value; // 定时时间
int fd;
int id;
int registered;
uint8_t sig; // 绑定的信号量
};
// 红黑定时器(rbtimer)
struct uwsgi_signal_rb_timer {
int value; // 定时时间
int registered;
int iterations;
int iterations_done;
uint8_t sig; // 绑定的信号量
struct uwsgi_rb_timer *uwsgi_rb_timer;
};

监控器

1
2
3
4
5
6
7
8
// 文件监控器
struct uwsgi_fmon {
char filename[0xff]; // 监控的文件名
int fd;
int id;
int registered;
uint8_t sig; // 绑定的信号值
};

uWSGI中的信号API

信号API位于core/signal.c文件中。

处理uwsgi信号: uwsgi_signal_handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
int uwsgi_signal_handler(struct wsgi_request *wsgi_req, uint8_t sig) {
// 搜索信号表(signal_table),找到对应的信号项use
struct uwsgi_signal_entry *use = NULL;

int pos = (uwsgi.mywid * 256) + sig;

use = &uwsgi.shared->signal_table[pos];
// 执行处理函数前的一系列校验
if (!use->handler)
return -1;

if (!uwsgi.p[use->modifier1]->signal_handler) {
return -1;
}

// master模式下,如果注册该信号的worker id和当前的worker id不一致,并且注册该信号的worker id又不为0(表示master进程吗?)的话,返回错误。
if (uwsgi.master_process) {
if (use->wid != 0 && use->wid != uwsgi.mywid) {
uwsgi_log("[uwsgi-signal] you have registered this signal in worker %d memory area, only that process will be able to run it\n", use->wid);
return -1;
}
}
// lazy模式下(没有master),只有相同的worker(注册该信号的worker id和当前的worker id一致)才能运行处理函数
else if (uwsgi.lazy) {
if (use->wid != uwsgi.mywid) {
uwsgi_log("[uwsgi-signal] you have registered this signal in worker %d memory area, only that process will be able to run it\n", use->wid);
return -1;
}
}
else {
// 当master不活跃时,那么worker1就是主导COW的进程
if (use->wid != 1 && use->wid != uwsgi.mywid) {
uwsgi_log("[uwsgi-signal] you have registered this signal in worker %d memory area, only that process will be able to run it\n", use->wid);
return -1;
}
}

// 这里,设置harakiri (如果需要)

if (uwsgi.mywid > 0 && wsgi_req) {// 当前是worker的情况下
uwsgi.workers[uwsgi.mywid].sig = 1;
uwsgi.workers[uwsgi.mywid].signum = sig;
uwsgi.workers[uwsgi.mywid].signals++;
if (uwsgi.harakiri_options.workers > 0) {
set_harakiri(wsgi_req, uwsgi.harakiri_options.workers);
}
}
else if (uwsgi.muleid > 0) { // 当前是mule的情况下
uwsgi.mules[uwsgi.muleid - 1].sig = 1;
uwsgi.mules[uwsgi.muleid - 1].signum = sig;
uwsgi.mules[uwsgi.muleid - 1].signals++;
if (uwsgi.harakiri_options.mules > 0) {
set_mule_harakiri(uwsgi.harakiri_options.mules);
}
}
else if (uwsgi.i_am_a_spooler && (getpid() == uwsgi.i_am_a_spooler->pid)) { // 当前是spooler的情况下
if (uwsgi.harakiri_options.spoolers > 0) {
set_spooler_harakiri(uwsgi.harakiri_options.spoolers);
}
}
// 执行信号处理函数,返回值保存在ret中。
int ret = uwsgi.p[use->modifier1]->signal_handler(sig, use->handler);

if (uwsgi.mywid > 0 && wsgi_req) {// 当前是worker的情况下
uwsgi.workers[uwsgi.mywid].sig = 0;
if (uwsgi.workers[uwsgi.mywid].cores[wsgi_req->async_id].harakiri > 0) {
set_harakiri(wsgi_req, 0);
}
}
else if (uwsgi.muleid > 0) { // 当前是mule的情况下
uwsgi.mules[uwsgi.muleid - 1].sig = 0;
if (uwsgi.mules[uwsgi.muleid - 1].harakiri > 0) {
set_mule_harakiri(0);
}
}
else if (uwsgi.i_am_a_spooler && (getpid() == uwsgi.i_am_a_spooler->pid)) { // 当前是spooler的情况下
if (uwsgi.harakiri_options.spoolers > 0) {
set_spooler_harakiri(0);
}
}

return ret;
}

uwsgi_signal_registered

检查信号在当前worker内是否注册了处理函数。如果注册了,返回1,否则返回0

1
2
3
4
5
6
7
8
int uwsgi_signal_registered(uint8_t sig) {

int pos = (uwsgi.mywid * 256) + sig;
if (uwsgi.shared->signal_table[pos].handler != NULL)
return 1;

return 0;
}
### uwsgi_register_signal 注册信号以及相应的处理函数等。返回0表示注册成功,非0表示有异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int uwsgi_register_signal(uint8_t sig, char *receiver, void *handler, uint8_t modifier1) {
// 初始化一个信号项
struct uwsgi_signal_entry *use = NULL;
// 相关校验:必须有个master,并且只有master/worker可以注册信号处理函数,另外,receiver长度必须不大于63
if (!uwsgi.master_process) {
uwsgi_log("you cannot register signals without a master\n");
return -1;
}

if (uwsgi.mywid == 0 && uwsgi.workers[0].pid != uwsgi.mypid) {
uwsgi_log("only the master and the workers can register signal handlers\n");
return -1;
}

if (strlen(receiver) > 63)
return -1;
// 通过共享锁对抗竞争条件
uwsgi_lock(uwsgi.signal_table_lock);
// 找到信号表中注册该信号的位置
int pos = (uwsgi.mywid * 256) + sig;
use = &uwsgi.shared->signal_table[pos];
// 判断master是否已经注册了这个信号,如果已注册,则返回错误
if (use->handler && uwsgi.mywid == 0) {
uwsgi_log("[uwsgi-signal] you cannot re-register a signal as the master !!!\n");
uwsgi_unlock(uwsgi.signal_table_lock);
return -1;
}
// 信号项的成员赋值
strncpy(use->receiver, receiver, strlen(receiver) + 1);
use->handler = handler;
use->modifier1 = modifier1;
use->wid = uwsgi.mywid;

if (use->receiver[0] == 0) {// 未指定signal接受者,使用默认接收者
uwsgi_log("[uwsgi-signal] signum %d registered (wid: %d modifier1: %d target: default, any worker)\n", sig, uwsgi.mywid, modifier1);
}
else { // 指定了signal接受者
uwsgi_log("[uwsgi-signal] signum %d registered (wid: %d modifier1: %d target: %s)\n", sig, uwsgi.mywid, modifier1, receiver);
}

// 对于cow(即写即拷)的特殊处理:如果是注册在master上,则将它给每个worker拷贝一份
if (uwsgi.mywid == 0) {
int i;
for(i=1;i<=uwsgi.numproc;i++) {
int pos = (i * 256);
memcpy(&uwsgi.shared->signal_table[pos], &uwsgi.shared->signal_table[0], sizeof(struct uwsgi_signal_entry) * 256);
}
}

uwsgi_unlock(uwsgi.signal_table_lock); // 释放锁

return 0;
}

添加文件监控器: uwsgi_add_file_monitor

1
2
3
// sig: 信号值
// filename:文件名
int uwsgi_add_file_monitor(uint8_t sig, char *filename)
  1. 检查参数filename的长度,不能超过254
  2. 通过文件监控表锁uwsgi.fmon_table_lock来处理竞争
  3. 如果当前已注册的文件监控器大小ushared->files_monitored_cnt已经是64(可添加的最大数),则退出
  4. 将信号sig和文件名filename信息附加到已监控文件数组ushared->files_monitored的尾部,修改注册状态。

添加定时器: uwsgi_add_timer

返回0表示表示添加成功,非0表示失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int uwsgi_add_timer(uint8_t sig, int secs) {
// 不在master模式下,则返回错误。
if (!uwsgi.master_process) return -1;

uwsgi_lock(uwsgi.timer_table_lock);
// 判断是否已经添加了64个定时器,有则退出,否则将sig和secs注册到定时器表(master会用它来添加项到时间队列中)中
if (ushared->timers_cnt < 64) {

// fill the timer table, the master will use it to add items to the event queue
ushared->timers[ushared->timers_cnt].value = secs;
ushared->timers[ushared->timers_cnt].registered = 0;
ushared->timers[ushared->timers_cnt].sig = sig;
ushared->timers_cnt++;
}
else {
uwsgi_log("you can register max 64 timers !!!\n");
uwsgi_unlock(uwsgi.timer_table_lock);
return -1;
}

uwsgi_unlock(uwsgi.timer_table_lock);

return 0;

}
### 添加rb定时器: uwsgi_signal_add_rb_timer 返回0表示表示添加成功,非0表示失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int uwsgi_signal_add_rb_timer(uint8_t sig, int secs, int iterations) {
// 不在master模式下,则返回错误。
if (!uwsgi.master_process)
return -1;

uwsgi_lock(uwsgi.rb_timer_table_lock);
// 判断是否已经添加了64个定时器,有则退出,否则将sig和secs注册到定时器表(master会用它来添加项到时间队列中)中
if (ushared->rb_timers_cnt < 64) {

// fill the timer table, the master will use it to add items to the event queue
ushared->rb_timers[ushared->rb_timers_cnt].value = secs;
ushared->rb_timers[ushared->rb_timers_cnt].registered = 0;
ushared->rb_timers[ushared->rb_timers_cnt].iterations = iterations;
ushared->rb_timers[ushared->rb_timers_cnt].iterations_done = 0;
ushared->rb_timers[ushared->rb_timers_cnt].sig = sig;
ushared->rb_timers_cnt++;
}
else {
uwsgi_log("you can register max 64 rb_timers !!!\n");
uwsgi_unlock(uwsgi.rb_timer_table_lock);
return -1;
}

uwsgi_unlock(uwsgi.rb_timer_table_lock);

return 0;

}
### 路由信号量: uwsgi_route_signal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
void uwsgi_route_signal(uint8_t sig) {
// 找到信号sig在信号表中对应的信号项
int pos = (uwsgi.mywid * 256) + sig;
struct uwsgi_signal_entry *use = &ushared->signal_table[pos];
int i;

// 如果信号接收者是worker/worker0(默认),则发送给第一个可用worker
if (use->receiver[0] == 0 || !strcmp(use->receiver, "worker") || !strcmp(use->receiver, "worker0")) {
if (uwsgi_signal_send(ushared->worker_signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to workers pool\n", sig);
}
}
// 如果信号接收者是workers,则发送给所有worker
else if (!strcmp(use->receiver, "workers")) {
for (i = 1; i <= uwsgi.numproc; i++) {
if (uwsgi_signal_send(uwsgi.workers[i].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to worker %d\n", sig, i);
}
}
}
// 如果信号接收者是active-workers,则发送给所有活跃worker(active worker,判定标准是worker不是cheaped/suspended)
else if (!strcmp(use->receiver, "active-workers")) {
for (i = 1; i <= uwsgi.numproc; i++) {
if (uwsgi.workers[i].pid > 0 && !uwsgi.workers[i].cheaped && !uwsgi.workers[i].suspended) {
if (uwsgi_signal_send(uwsgi.workers[i].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to worker %d\n", sig, i);
}
}
}
}
// 如果信号接收者是指定的worker,则发送给指定的worker(对于workerN,N不能大于最大的worker数)
else if (!strncmp(use->receiver, "worker", 6)) {
i = atoi(use->receiver + 6);
if (i > uwsgi.numproc) {
uwsgi_log("invalid signal target: %s\n", use->receiver);
}
if (uwsgi_signal_send(uwsgi.workers[i].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to worker %d\n", sig, i);
}
}
// 如果信号接收者是subscribed,则发送给订阅了这个信号的进程(当前版本未实现)
else if (!strcmp(use->receiver, "subscribed")) {
}
// 如果信号接收者是spooler,则发送给spooler
else if (!strcmp(use->receiver, "spooler")) {
if (ushared->worker_signal_pipe[0] != -1) {
if (uwsgi_signal_send(ushared->spooler_signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to the spooler\n", sig);
}
}
}
// 如果信号接收者是mules,则发送给所有的mule
else if (!strcmp(use->receiver, "mules")) {
for (i = 0; i < uwsgi.mules_cnt; i++) {
if (uwsgi_signal_send(uwsgi.mules[i].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to mule %d\n", sig, i + 1);
}
}
}
// 如果信号接收者是指定的mule,则发送给指定的mule(对于muleN,N不能大于最大的mule数)
else if (!strncmp(use->receiver, "mule", 4)) {
i = atoi(use->receiver + 4);
if (i > uwsgi.mules_cnt) {
uwsgi_log("invalid signal target: %s\n", use->receiver);
}
else if (i == 0) {
if (uwsgi_signal_send(ushared->mule_signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to a mule\n", sig);
}
}
else {
if (uwsgi_signal_send(uwsgi.mules[i - 1].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to mule %d\n", sig, i);
}
}
}
// 如果信号接收者是farm_name,那么会根据name获取uwsg_farm实例,然后将信号发送给name指定的farm
else if (!strncmp(use->receiver, "farm_", 5)) {
char *name = use->receiver + 5;
struct uwsgi_farm *uf = get_farm_by_name(name);
if (!uf) {
uwsgi_log("unknown farm: %s\n", name);
return;
}
if (uwsgi_signal_send(uf->signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to farm %d (%s)\n", sig, uf->id, uf->name);
}
}
// 如果信号接收者是指定的farm,则发送给指定的farm(对于farmN,N不能大于最大的farm数)
else if (!strncmp(use->receiver, "farm", 4)) {
i = atoi(use->receiver + 4);
if (i > uwsgi.farms_cnt || i <= 0) {
uwsgi_log("invalid signal target: %s\n", use->receiver);
}
else {
if (uwsgi_signal_send(uwsgi.farms[i - 1].signal_pipe[0], sig)) {
uwsgi_log("could not deliver signal %d to farm %d (%s)\n", sig, i, uwsgi.farms[i - 1].name);
}
}
}
// 如果信号接收者不是上面任意一个,则输出日志后退出
else {
// unregistered signal, sending it to all the workers
uwsgi_log("^^^ UNSUPPORTED SIGNAL TARGET: %s ^^^\n", use->receiver);
}

}
### 其他一些函数 * 创建信号管道: create_signal_pipe
1
void create_signal_pipe(int *sigpipe)
* 发送远程信号: uwsgi_remote_signal_send
1
int uwsgi_remote_signal_send(char *addr, uint8_t sig)
* 发送信号: uwsgi_signal_send
1
int uwsgi_signal_send(int fd, uint8_t sig)

  • 等待信号: uwsgi_signal_wait
    1
    int uwsgi_signal_wait(struct wsgi_request *wsgi_req, int signum)
  • 接收信号: uwsgi_receive_signal
    1
    int uwsgi_receive_signal(struct wsgi_request *wsgi_req, int fd, char *name, int id) 
    # 参考
  • uWSGI信号框架:官方 | 自译中文版
请言小午吃个甜筒~~