state-消息可用性

背景分析

当长连接保证了稳定性,那么下一步就是要保证消息的可靠性,通常对于消息来说,用户视角要求消息从A客户端到B客户端发出的顺序和接受到的顺序一致,不重不漏有序及时

技术约束

  1. 高可用,首先就是要足够的高可用,对于最基本的收发消息来说保证其核心链路的可用性非常重要,属于IM产品的生命线,对此必须提供5个9
  2. 低延迟,即时通讯系统强调的是即时,因此将消息毫秒级的发送到对端是基本诉求
  3. 高吞吐,对于极端的群聊场景,万人群聊在特定时间全部活跃没发一条消息都是一次DDos攻击。

技术方案

消息的可靠性可以分为,上行消息可靠和下行消息可靠,客户端A发送的消息发送到服务端后被回复ack则必然送达到对端B上,这就是上行消息可靠性,消息从服务端下发给B客户端时,只要客户端B回复了ack则说明消息一定能送达到客户端B,并且是按照规定的顺序,这就是下行消息可靠性。

消息可靠性 = 不漏+不重+有序+及时

最简单的消息传输就是两端的消息通信,a将消息跨越不可靠的网络传输给b时,就会出现四种情况

  1. 消息被遗漏
  2. 消息重复
  3. 消息到达不及时
  4. 消息乱序到达

消息遗漏:A通过策略即可,单造成的问题是如果网络不可靠消息并没有丢失,而是延迟达到,则b会接受两次重复的消息。

消息重复:对此,需要一种策略能够识别两次消息是重复的,则需要定义具有唯一性的id,服务端通过这个id可以判断消息是否被接受过。

消息延迟:会导致被误认为消息丢失,进而触发超时重试机制,就会倒置消息乱序,为此这个上行消息的id必须还要可比较,基于其递增性来进行排序,确保消息的有序性。b端存储接受过的max_id,只有接受到消息id==max_id+1时才会确认回复ack,其余情况都会被拦截忽略,但是在网络不稳定情况下,超时重试的概率增加,那么这种无效性消息通信将会影响整个网络的负载,整体网络延迟就会增高。通常的做法是可以批处理,服务端维护一个消息队列来实现tcp的滑动窗口的功能,来保证消息有序,并且当小雨max_id的id过来时,直接回复对方取消对方的重试定时器无效的网络通信,当接收到比max_id+1还要大的id时,可以直接告诉对方有消息漏洞,直接一个批量请求将消息拉过来即可。

这都有助于降低整个网络延迟,但是对于当前IM项目来说,他是构建在tcp上的协议,tcp至少保证了消息从a端服务器协议栈到达b端服务器协议栈,那么消息丢失的可能性已经非常非常小了,维护如此复杂的通信协议,对于业务层来说时不必要的,所以最简单的做法就是check max_id+1这一条规则即可,大于和小于max_id+1的id均被忽略即可。

但是想要做到及时性,那么消息在服务器的转发过程中就不能有同步落库的操作,因为落库会严重限制并发度(数据层时共享资源),比粗保证整个收发消息的流程中不会有全局共享的资源,以便于高度兵法,这其中的瓶颈就需要异步落库的方式来保证,将消息存入MQ中(MQ保证消息不丢失)后就回复上行消息ack,然后MQ异步消费此消息时对其分配msg_id,然后发送push,下推到客户端B完成下行消息。

上行消息可靠

基于上述描述,消息可靠性模型来看,需要一个具有唯一且严格递增到消息ID才行,这个id必须由消息产生的源头来生成,否则无法定序。

所以对上行消息来说,客户端要维护一个client_id,他是客户端自行生成的,在会话维度的一个严格递增的uint64的id

当客户端于服务端建立长连接后进行登陆时,必须把client_id携带过来,这样就完成来服务端的初始化,state server会将其记录到自身的session state中,作为max_id保证上行消息可靠。

当断线消息重连时,session state被复用,max_id也会被用来复用,不受影响

当用户换了一个登陆设备时,由于也创建了一个新的连接,不会有之前发送过的消息存在,此时client_id从0开始自增即可。

每次state server都会比较max_client_id+1是否等于当前的client_id,如果是则方形次消息写入MQ,并回复ack,否则直接忽略即可(简单可靠)

下行消息可靠

当消息进入MQ后就已经进入下行消息的阶段,此时由MQ保证消息的可靠性;消息最终会被消费,业务层消费消息,为其分配msg_id,msg_id用来保证消息的可靠性,其唯一性时在一次session范围内的,毕竟一跳消息脱离了会话框时没有意义的。

msg_id由服务器分配,服务端就必须考虑竞争全局资源造成的并发问题,每发一个消息就要分发一个msg_id,QPS至少时万亿级别,肯定是性能瓶颈所在,其次服务端要维护大量会话的msg_id的严格递增,在可用性上也有巨大的挑战,这是一个分布式ID生成的经典场景。

对于服务端来说,要维护整个会话消息的有序性,这雨上行消息中的client_id极其不同,client_id在设备切换后建立新的连接后可以从0开始,而不用一只保持着有序性

推送给客户端B时,客户端B用其进行排序,如果发现有确实ID,则主动发送pull请求进行拉取,每次客户端进接受msg_id+1的id消息,一旦存在消息漏洞,则直接拉取消息,这样的好处是可以减少下行消息的重试机制(这需要大量的内存资源),消息补洞操作和消息漫游同步历史消息的接口可以是一个,这样简化来整体设计,用拉模式作为推送失败的兜底手段,可以保证消息的有序性。

但是仅靠拉模式还是不够,没有消息回执的话,将会倒置消息不能及时送达,比如客户端B收到客户端A发送的第一条消息,就在服务端下行push的时候丢失拉,客户端B并不知道最新的msg_id是什么,他也就不知道消息存在漏洞,此时要么他还是轮训来拉,但是这样就会导致无效的网络请求变多,失去了push消息的意义,要么就让服务端进行超时重试,这就会对state server增加内存成本,但这是必要的,否则会影响下行消息的及时性,极度影响用户体验。

一种有效的优化方案是,在state server中对conn state进保证一个msgTimer,那就是对于当前连接最新的push消息,仅对这个push消息进行超时重试即可,如果新的push消息到来可以直接将之前的定时器取消(这就是为什么时间轮的优势比较大,当然这里后面可以再优化),此时state server内存中仅保留最新push的msg,这样既解决拉最后push消息丢失客户端无感知的问题,也解决来维护大量消息重发定时器的资源浪费,此时客户端B仅需要回复最新消息的ack即可,其他消息不需要回复,一旦出现消息漏洞直接走兜底的pull模式拉取数据即可,这样降低了对长连接的依赖,弱网环境下更好。

在极端情况下,像微信等应用为了应对弱网环境,都会退化为pull模式,所以pull模式会作为一种兜底手段,较好的补充来长连接的不稳定性。

这种方式最大的问题就是需要一个严格递增的消息ID,这就是一个老大难的问题来。。

先要确认的是这个ID的生成不能是单点的,否则会有可用性问题,因此需要分布式ID生成,分布式ID的生成很难保证严格递增这个特性,因为ID生成通常在10ms以下进行相应,这倒置数据密集型系统中常用复制来存储多个数据副本来提高可用性这个手段无效(任何复制都需要通过网络,难以保证在10ms以下的p99)

首先,必须要明确,这个ID生成器的生命周期,在一个会话创建时呗创建,在会话终止时被回收,所以ID生成器只需要保证在一个会话范畴内严格递增即可,那么uint64的id足够来

seq_id时不需要担心由于其顺序性被人爬去,会通过其他一次一密的策略保证消息安全性。

其次,我们不需要完全严格递增的id,下行消息的通信过程中可以偶尔的调变,因为可以使用拉模式进行消息补洞,那么如果某一次msg_id 123紧接着msg_id 300,那么客户端只需要进行一次拉取请求即可,从time line中拉取消息进行消息补洞(当然不存在msg_id 124的消息,拉取回事失败的,但是此时我们时可以定义某种错误码来通知客户端的),所以我们仅需要在一个全生命周期中大多数时候保持养鹅递增而在少数场景允许跳变的id,当然跳变不允许出现回退,这将倒置系统崩溃乱序。

那如何保证连续递增呢,可以使用redis,使用incrby这个命令可以实现一个连续递增的ID,当redis的master切换时将受限于redis的主从同步延迟,出现ID的跳变问题,解决办法通常时通过lua脚本来实现,但这样有悖于云原生的概念,通常使用redis做分布式ID生成,但是这样不太好维护,因为没有持久化,稍微有点问题,可能数据就丢失了,造成消息混乱,其次内存容量大小优先,单据存不了太多,分布式的话,lua机哦啊笨很难保证云原生的特性(频繁的容器调度会导致一些不一致性,lua脚本容易造成崩溃)

解决办法时设计一个完善的分布书ID生成系统,这个系统一个专门为消息ID来设计,考虑连续性和递增性,同时一个msgID的有序性仅在一个会话的生命周期中存在。

使用存算分离方式,底层使用raft维护分布式kv保证持久数据的一致性并突破单机容量的限制,对于sessionID对应作为一个key,value是一个uint64值,每次执行incrby操作。

为了对应万亿级兵法,需要有一个缓存层,但是需要避免缓存层与存储层的交互减少IO放大,在缓存节点中每次进行分段划分,对于每个key存储cru_seq与max_seq,每次当cur_seq自增到max_seq的时候,再去存储层按照step去请求一次分段,max_seq=cur_seq+step。

对于sdk层,使用向redis cluster模式的slot概念进行key的hash划分即可,这样当进行扩容或者失败重试时直接去存储层load一次max_seq即可,此时会发生跳变,因此我们能做到给业务方的承诺就是大多数情况下seq_id时严格递增的,在扩容与重启的时候会发生跳变但是一定保证递增性。

cache server是可以水平扩展的,根据业务的容量规划,划分合理的slot,理论上可以无限扩展,存储层使用raft算法联合成kv集群提供分布式存储能力。

因此业务上需要通过客户端主动拉取的方式来进行消息补洞,这一操作即可作为消息漫游时的历史消息同步,也可以作为群聊消息风暴时的批处理优化,又可以作为下行消息丢失时的消息补洞,一举三得。

任务分解

  1. 实现上行消息可靠
    1. 客户端生成client_id
    2. 登陆时记录到state server的状态中
    3. 短线重连,max_client_id被复用(connID复用,那么其中的状态也被复用)
    4. 断开链接后,state server的conn应该被回收
    5. state server check client_id调用rpc写入业务
    6. 在业务rpc回复ok后对max_client_id自增
    7. 并回复客户端ack
  2. 实现下行消息可靠
    1. 业务push消息发送给state server,只需要直接转发即可
    2. 客户端检查下行消息的msg_id是否等于max_msg_id+1,如果是则直接显示
    3. 如果不是则调用pull请求进行兜底拉取,通过msg_id作为offset拉取历史同步接口进行拉取
    4. 客户端每次都要将接口请求的msg_id进行持久化保存(有的一些im会将其称之为seq_id)