好好学习,天天向上

一枚心跳的旅程 — pika心跳机制

简单的背景介绍

  1. pika是AMQP 0-9-1协议的一个纯Python实现。本文使用的pika版本为0.10.0
  2. AMQP 0-9-1提供了心跳机制,来确保应用服务层及时发现已崩溃的连接(以及完全无响应的对端)。心跳机制也能保证进程不被某些网络设备杀掉。
  3. RabbitMQ的心跳机制(搬运自官网)
    • 使用心跳超时间隔(heartbeat timeout interval)。该值定义了经过多长时间,RabbitMQ和客户端库应该认为对端的TCP连接已经失活(dead)。该值是在客户端和RabbitMQ服务器在连接的时候协商决定的。必须配置客户端请求心跳。在RabbitMQ v3.0及之后的版本,broker会尝试协商默认的heartbeat值(客户端仍然可以否决该值)。默认值是60s(3.5.5版本之前则是580s)
    • 每半个超时时间发送一个心跳帧。在两次心跳没有响应后,就会认为对端不可达。

角色说明

HeartbeatChecker:心跳检查器

  • 初始化:__init__(connection, interval, idle_count=MAX_IDLE_COUNT)

    1. 将心跳检查器与connnection对象绑定,设置心跳检查间隔_interval、经过多少个丢失的心跳间隔数后认为连接idle或者断连的心跳间隔数_max_idle_count(默认情况下是MAX_IDLE_COUNT=2)
    2. 初始化多个计数器为0
    3. 为最后一个定时器初始化句柄(None)
    4. 调用_setup_timer()方法设置定时器在每个_interval秒后触发
  • 设置定时器:_setup_timer()

    添加定时器,在每个_interval秒后调用send_and_check()方法检查及发送心跳

  • 定时器方法:send_and_check()

    由定时器调用,需要时发送一个心跳,检查是否错过了任何心跳,并在太长时间处于idle状态的情况下断开连接

    1. 判断连接是否处于idle状态,是则调用_close_connection()方法关闭连接 > 如何判断连接状态:当前的idle计数器大于或等于设置的最大idel计数器时,则认为连接处于idle状态

    2. 如果connection对象并未收到任何数据,则将当前的idle计数器加一;否则清零当前的idle计数器 > 如何判断connection对象收到数据:判断接收到的字节数计数器(_bytes_received)以及连接对象收到的字节数计数器(bytes_received_on_connection)是否相等。

    3. 调用_update_counters()方法更新发送/接收到的字节数计数器

    4. 调用_send_heartbeat_frame()方法发送一个心跳帧给对端(也就是RabbitMQ服务器啦),同时将发送的心跳帧计数器(_heartbeat_frames_sent)加一。

    5. 调用_start_timer()方法启动新一轮的定时器。这个时候会先判断connection对象的定时器是否还是当前这个实例,如果是的话,会调用_setup_timer()方法设置定时器。

  • 接收心跳帧:received()

    将收到的心跳帧计时器加一

  • 停止心跳检查器:stop()

    将该定时器从绑定的connection对象移除

Connection:实现与RabbitMQ通信的核心类

只说明几个跟心跳有关的方法

  • 创建心跳检查器:_create_heartbeat_checker()

    如果参数heartbeat有值并且大于0,则使用当前的connection实例(self)和设置的heartbeat值来初始化一个心跳检查器

  • 停止并删除绑定的心跳检查器:_remove_heartbeat()

  • 协商心跳超时时间:_tune_heartbeat_timeout(client_value, server_value)

    • client_value是客户端给出的心跳超时时间,None表示完全接受服务端给出的值(即server_value),0表示禁用心跳,否则使用一个正整数(s)
    • server_value是broker建议的心跳超时时间,0表示禁用心跳
    • 该方法最终返回一个使用的心跳超时时间,并将其返回给服务器

旅程开始

在pika与RabbitMQ服务器通信最最最开始的时候,是需要创建连接的。这个时候呢,pika作为客户端,需要设置一个heartbeat(heartbeat_interval已弃用)参数。默认情况下,该参数的值由DEFAULT_HEARTBEAT_TIMEOUT(默认是None)指定,表示RabbitMQ你说心跳时间间隔是啥就是啥,我客户端这边完全没意见。

接着,就是连接建立的各种过程中。在这个过程中,当接收到broker发送的Connection.Tune时,客户端会根据对端的心跳时间间隔和客户端定义的心跳间隔(也就是heartbeat参数)协商一个合适的心跳时间间隔(调用_tune_heartbeat_timeout()方法)。协商策略是: * 如果客户端这边的值为None,则心跳时间间隔由RabbitMQ决定; * 如果客户端这边的值或者RabbitMQ那边的值为0,则禁用心跳; * 否则,选客户端这边的值和RabbitMQ那边的值之间的最大值。

然后根据协商后的心跳时间间隔,绑定一个新的心跳检查器。此时,这个新的心跳检查器就会带着它的处理句柄跳进IOLoop,也就是轮询器中,等待它的时间的到来。

后面就是业务交互过程了。

在处理从socket接收到的帧时,如果接收到了心跳,则调用心跳检查器的received()方法,增加心跳检查器的收到心跳帧计时器。

在轮询的过程中,心跳检查器的时间来了,它会执行绑定的处理句柄,也就是send_and_check()方法。此时,它会设置idle次数计数器、更新接收/发送帧计数器、响应一个心跳帧给对端。然后,带着它的处理句柄再次跳进轮询器中,等待新一轮的时间的到来。

当然,这是正常的情况。如果上面这一步中,心跳检查器发现自己绑定的connection已经多次没收到数据了,次数多得已经超过自己的容忍程度(此时,猜测对端已经死掉),这个时候,会主动断开与对端的连接。又或者如果绑定的connection的心跳检查器已经不再是自己了,则不再跳进轮询器。

另外有一点需要注意的是,pika中有个BlockingConnection(它位于pika的异步之上,提供阻塞连接)。因为是阻塞连接,因此定时器在这里并没啥用处,从而导致心跳检查器无法定时触发。这进一步导致了,如果在RabbitMQ多次发送心跳进行探测的时候客户端都阻塞无法响应的话,RabbitMQ会认为客户端已经不在了,从而断开与客户端之间的连接~~

到了最后的最后,告别的时刻来临。此时,会把心跳检查器停止(如果,它还在的话)……

参考

请言小午吃个甜筒~~