简单的背景介绍
- pika是AMQP 0-9-1协议的一个纯Python实现。本文使用的pika版本为0.10.0
- AMQP 0-9-1提供了心跳机制,来确保应用服务层及时发现已崩溃的连接(以及完全无响应的对端)。心跳机制也能保证进程不被某些网络设备杀掉。
- RabbitMQ的心跳机制(搬运自官网)
- 使用心跳超时间隔(heartbeat timeout interval)。该值定义了经过多长时间,RabbitMQ和客户端库应该认为对端的TCP连接已经失活(dead)。该值是在客户端和RabbitMQ服务器在连接的时候协商决定的。必须配置客户端请求心跳。在RabbitMQ v3.0及之后的版本,broker会尝试协商默认的heartbeat值(客户端仍然可以否决该值)。默认值是60s(3.5.5版本之前则是580s)
- 每半个超时时间发送一个心跳帧。在两次心跳没有响应后,就会认为对端不可达。
角色说明
HeartbeatChecker
:心跳检查器
初始化:
__init__(connection, interval, idle_count=MAX_IDLE_COUNT)
- 将心跳检查器与connnection对象绑定,设置心跳检查间隔_interval、经过多少个丢失的心跳间隔数后认为连接idle或者断连的心跳间隔数_max_idle_count(默认情况下是MAX_IDLE_COUNT=2)
- 初始化多个计数器为0
- 为最后一个定时器初始化句柄(None)
- 调用
_setup_timer()
方法设置定时器在每个_interval秒后触发
设置定时器:
_setup_timer()
添加定时器,在每个_interval秒后调用
send_and_check()
方法检查及发送心跳定时器方法:
send_and_check()
由定时器调用,需要时发送一个心跳,检查是否错过了任何心跳,并在太长时间处于idle状态的情况下断开连接
判断连接是否处于idle状态,是则调用
_close_connection()
方法关闭连接 > 如何判断连接状态:当前的idle计数器大于或等于设置的最大idel计数器时,则认为连接处于idle状态如果connection对象并未收到任何数据,则将当前的idle计数器加一;否则清零当前的idle计数器 > 如何判断connection对象收到数据:判断接收到的字节数计数器(
_bytes_received
)以及连接对象收到的字节数计数器(bytes_received_on_connection
)是否相等。调用
_update_counters()
方法更新发送/接收到的字节数计数器调用
_send_heartbeat_frame()
方法发送一个心跳帧给对端(也就是RabbitMQ服务器啦),同时将发送的心跳帧计数器(_heartbeat_frames_sent
)加一。调用
_start_timer()
方法启动新一轮的定时器。这个时候会先判断connection对象的定时器是否还是当前这个实例,如果是的话,会调用_setup_timer()
方法设置定时器。
接收心跳帧:
received()
将收到的心跳帧计时器加一
停止心跳检查器:
stop()
将该定时器从绑定的connection对象移除
Connection
:实现与RabbitMQ通信的核心类
只说明几个跟心跳有关的方法
创建心跳检查器:
_create_heartbeat_checker()
如果参数heartbeat有值并且大于0,则使用当前的connection实例(self)和设置的heartbeat值来初始化一个心跳检查器
停止并删除绑定的心跳检查器:
_remove_heartbeat()
协商心跳超时时间:
_tune_heartbeat_timeout(client_value, server_value)
- client_value是客户端给出的心跳超时时间,None表示完全接受服务端给出的值(即server_value),0表示禁用心跳,否则使用一个正整数(s)
- server_value是broker建议的心跳超时时间,0表示禁用心跳
- 该方法最终返回一个使用的心跳超时时间,并将其返回给服务器
旅程开始
在pika与RabbitMQ服务器通信最最最开始的时候,是需要创建连接的。这个时候呢,pika作为客户端,需要设置一个heartbeat
(heartbeat_interval
已弃用)参数。默认情况下,该参数的值由DEFAULT_HEARTBEAT_TIMEOUT
(默认是None
)指定,表示RabbitMQ你说心跳时间间隔是啥就是啥,我客户端这边完全没意见。
接着,就是连接建立的各种过程中。在这个过程中,当接收到broker发送的Connection.Tune时,客户端会根据对端的心跳时间间隔和客户端定义的心跳间隔(也就是heartbeat
参数)协商一个合适的心跳时间间隔(调用_tune_heartbeat_timeout()
方法)。协商策略是: * 如果客户端这边的值为None,则心跳时间间隔由RabbitMQ决定; * 如果客户端这边的值或者RabbitMQ那边的值为0,则禁用心跳; * 否则,选客户端这边的值和RabbitMQ那边的值之间的最大值。
然后根据协商后的心跳时间间隔,绑定一个新的心跳检查器。此时,这个新的心跳检查器就会带着它的处理句柄跳进IOLoop,也就是轮询器中,等待它的时间的到来。
后面就是业务交互过程了。
在处理从socket接收到的帧时,如果接收到了心跳,则调用心跳检查器的received()
方法,增加心跳检查器的收到心跳帧计时器。
在轮询的过程中,心跳检查器的时间来了,它会执行绑定的处理句柄,也就是send_and_check()
方法。此时,它会设置idle次数计数器、更新接收/发送帧计数器、响应一个心跳帧给对端。然后,带着它的处理句柄再次跳进轮询器中,等待新一轮的时间的到来。
当然,这是正常的情况。如果上面这一步中,心跳检查器发现自己绑定的connection已经多次没收到数据了,次数多得已经超过自己的容忍程度(此时,猜测对端已经死掉),这个时候,会主动断开与对端的连接。又或者如果绑定的connection的心跳检查器已经不再是自己了,则不再跳进轮询器。
另外有一点需要注意的是,pika中有个BlockingConnection
(它位于pika的异步之上,提供阻塞连接)。因为是阻塞连接,因此定时器在这里并没啥用处,从而导致心跳检查器无法定时触发。这进一步导致了,如果在RabbitMQ多次发送心跳进行探测的时候客户端都阻塞无法响应的话,RabbitMQ会认为客户端已经不在了,从而断开与客户端之间的连接~~
到了最后的最后,告别的时刻来临。此时,会把心跳检查器停止(如果,它还在的话)……