架构方案设计总结

IM整体架构设计总结

一款全世界都在使用的企业IM系统

指标假设

  • DAU 5亿+,每人每天100条消息,收发消息p99 200ms以下,QPS平均50w,峰值75w
  • 每条消息10KB,则每日存储增长量约 450TB $(100*10*5*10^8KB)$

实现的功能

  1. 单聊/群聊/聊天室/多设备登陆/在线状态
  2. 文本消息/多媒体消息/离线同步/历史消息/消息漫游
  3. 多端同步/消息撤回/已读未读/离线推送
  4. 添加好友/会话列表/好友列表

服务划分

分层结构

IM架构图


对于接入层,可以将其要解决的问题分为四个部分:

  1. 获取最佳IP地址,使得客户端接入最佳的网关机,因为未来使得连接的质量更高,选择最近负载最低的网关机将获得收益
  2. 对连接的管理,保证长连接被可靠的持有,连接断开之后能够快速的重连,维护连接的基本路由信息,实现基本的上行/下行消息的收发功能,以最小的内存占用为优化目标,是的单机存储更多的长连接,尽可能减少有状态服务重启对用户体验的影响
  3. 隔离变更频繁的控制状态,心跳,重试,回执,路由等状态,尽可能的使用分布式内存,计算与存储分离,保证处理逻辑的频繁迭代不会影响线上功能
  4. 下行消息下发时,减少网络调用次数,降低单聊下行消息的延迟,提高群聊下行消息的吞吐

对于业务层来说,其为了实现业务功能之外,为应对快速迭代的需求,其在设计上需要更多的考虑可维护性。基于经典的洋葱架构,我们可以简单的拆解为应用层,领域层,基础设施层,对于复杂业务也可以设置BFF服务(Backend for fronted,前后端分离)

  1. 应用层:对于应用层来说,用来处理简单的面向业务需求的处理逻辑,比如权限管理,业务规则,数据聚合等需求
  2. 领域层:领域层用来沉淀业务无关可跨领域服用的功能需求,提高核心的业务价值,主要可以分为用户,会话,消息三个部分,用户管理负责用户获取用户信息,用户的配置,生成用户ID等功能,会话管理用来分配会话ID管理群聊和单聊的元信息,组内成员列表,加入群,推出群,禁言等对会话进行管理等功能,消息管理也以消息作为实体,控制消息的同步,存储,状态变更等控制行为
  3. 基础设施层:基于依赖倒置原则,保证领域层不依赖任何基础设施,而是通过定义标准的适配接口屏蔽对基础设施库的依赖,在基础设施层来封装,Mq,redis,timeline,rpc,服务发现等基础组件的时候,由于定义了适配接口实现了依赖倒置,基础设施层可以随意替换不必修改代码,从业务角度来看,基础设施的代码虽然重要,但不是业务核心,可随时替换最佳组件

对于存储服务来说,大型互联网架构上,为方便数据可复用,都会抽象出独立的数据服务,也更加方便数据团队维护和迭代,通常可以叫做逻辑数据库,面向业务抽象定制化等存储模型,用来更加便携的描述数据特征,对业务提供更灵活高效的数据存储服务。

对存储服务来说,可以划分为,普通结构化数据的存储,例如用户信息,会话信息。对于计数数据,由于其巨量的吞吐我们将其存储在特殊的专用引擎上并封装为counter server,更好的提供极大规模的计数服务,对于消息服务来说其挑战在于短时效消息的及时可靠的多端同步,海量消息的存储和检索,因此抽象出专用的timeline模型,对业务层屏蔽存储细节,存储模型底层基于数据时效实现冷热数据分离存储,权衡高吞吐,低延迟,高可靠,低成本等技术目标的综合效果最大化。

消息状态机

状态机分析法

IM本质是三端通信,从消息的角度去看,一次收发过程,可以得到一个状态机。

当发送方将消息发送给服务端时,如果服务端通过clientID判断了消息幂等,则服务端接收到消息,消息变更为分配ID到状态,在此状态接入层会调用message server,message server会请求seq server获得唯一的seqID,然后异步的写入到timeline server中,并尝试立即进行在线消息的同步。

message server调用state server的push rpc将消息分发下去,此时消息进入下行中状态,如果当前接收方不在线,则下行消息将被拒绝,如果接收方在线,则客户端通过seqID对下行消息进行幂等处理,保证仅发送一次,消息状态变为接收成功,否则由于网络丢包,seqID重复等问题导致客户端多次接收相同消息,则此消息将被拒绝,网络超时等原因是可重试的,则会变为下行失败,交由接入层state server中的消息飞行计时器判断,超时后则会进行重试,消息的状态进入下行重试阶段,重试成功则进入接收成功的状态。

当接收方打开app后,会自动进入消息拉取中,通过api gateway访问message server,其从timeline server中拉取离线消息,timeline server根据策略自动进行冷热数据的分离,无需区别离线还是历史数据的存储

如果拉取的消息过多,客户端会进行分页消息的拉取,拉取失败则会重新刷新,知道将消息补洞为止,保证消息的可靠送达。

如果客户端拉取历史消息,则处理逻辑同上。

接口定义

接入层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
syntax = "proto3";

package messagepack;

// 作为持有与客户端建立的长连接服务,有状态可靠性要求高
service IMGateway{
// 由客户端调用,上行消息,这里用rpc表达出来,实际物理形式会自己写网络模型处理
rpc uplink(UplinkReq) returns (BaseResponse) {}
// 由state server调用,告知网关机下推push消息给客户端
rpc downlink(DownlinkReq) returns (BaseResponse) {}
}

// 查询IP列表的服务,本质是一个http服务,治理使用rpc进行表达
service IPConfig {
// 由客户端调用,获取ip列表
rpc IPListInfo(IPListInfoReq) returns (IPListInfoRes) {}
}

// 接入层策略服务,用来处理消息
service State{
// 由gateway server调用,将上行消息传入,解析消息类型并做出相应处理
rpc handleMessage (UplinkReq) returns (BaseResponse) {}
// 由下游message server调用,将下行消息发送给接入层
rpc push(DownlinkReq) returns (BaseResponse){}
}

// 此为客户端与服务端消息协议中的变长消息头,解析后保证Qos的处理机制
// 最后逻辑交由state server继续完成
message UplinkMessageHeader{
uint32 client_id = 1;
uint32 session_id = 2;
}

message DownlinkMessageHeader{
uint32 seq_id = 1;
uint32 session_id = 2;
uint32 device_id = 3;
}

message UplinkReq {
UplinkMessageHeader header = 1;
bytes payload = 2;
}

message DownlinkReq {
DownlinkMessageHeader header = 1;
bytes payload = 2;
}

message IPListInfoReq {
string client_ip = 1;
}

message BaseResponse{
int32 errno = 1;
ErrorType errmsg = 2;
}

message IPListInfoRes {
repeated string endpoint_list = 1;
BaseResponse beas_res = 2;
}

业务层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
syntax = "proto3";

package messagepack;

service Message {
// 发送消息
rpc send(SendReq) returns (BaseResponse) {}
// 拉取消息
rpc pull(PushReq) returns (BaseResponse) {}
// 消息回执
rpc receipt(PushReq) returns (baseResponse) {}
}

enum MessageType {
BASEMSG = 1; // 默认文本消息
RECEIPTMSG = 2; // 回执消息
CONTROLMSG = 2; // 控制消息
}

enum SessoionType {
C2C = 1; // 单聊会话
GROUP = 2; // 群聊会话
}

enum ErrorType {
REFUSE = 1; // 拒绝,一种不可重试性错误
OK = 2; // 代表成功,成功作为一种特殊的失败处理
FAILED = 3; // 失败是可重试的错误
}

message BizMsgHead {
string form_user_id = 1;
string session_id = 2;
string to_user_id = 3;
SessoionType sessoion_type = 4;
MessageType message_type = 5;
}

message IMMessage {
BizMsgHead head = 1;
bytes content = 2;
}

message ACK {
BizMsgHead head 1;
}

message SendReq{
string message_type = 1;
bytes payload = 2;
}

message PushReq {
MessageType message_type = 1;
bytes payload = 2;
}

message PullReq {
MessageType Message_type = 1;
bytes payload = 2;
}

message BaseResponse {
int32 errno = 1;
ErrorType errmsg = 2;
}

存储设计

KV的设计

  • 序号生成:redis key=seq_id:{session_id}, value:int64
  • 离线消息:
    • redis key=offline_msg:u{user_id} value:zset(sorce为seqID,number为message的pb)
    • redis key=offline_msg:s{session_id} value:zset(sorce为seqID,number为message的pb)
  • 在线消息:
    • 单聊(redis key=online_router:session_to_did:{session_id}, value=pb(device_id List))
    • 群聊(gateway local men key=sessionID, value=list{connID})
  • Counter server:
    • 总未读数:key=count_total:{user_id}, value=int64
    • 会话未读数:key=count:{user_id}:{session_id}, value=int64

timeline server是一个用来存储消息数据的数据服务,所有对消息的存取都通过timeline server,而timeline server将封装对底层数据库的细节,离线消息将使用redis缓存,历史消息将使用hbase等列存储模型,但上游服务将完全不感知,至于内部的冷热数据交换策略,则由timeline server来维护。

序列号生成器是另一个存储层的组件,专用来生成消息的seqID,会话和用户的userID也可以由此序列号生成器生成,也可以由mysql自增主键生成,在这里使用redis的incrby命令进行严格递增并通过lua脚本进行跳变防止id回退,对于更复杂的消息序列号生成方案后续补充。

Counter server专用于存储计数数据,主要用来解决消息未读数的变更频繁问题,counter server消费message server中消息回执变更消息,通过合并变更来降低最终写入DB的QPS,底层使用分布式rocksdb来做存储引擎,实现高并发的读写。

数据表的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 一个sql上的逻辑表示
-- 真是场景下不会使用mysql
CREATE TABLE `message` (
`id` bigint(20) unsigned NOT NULL COMMENT `主键ID`,
`seq_id` bigint(64) NOT NULL DEFAULT '' COMMENT '序列号ID',
`session_id` bigint(64) NOT NULL DEFAULT '' COMMENT '会话ID',
`session_type` bigint(8) NOT NULL DEFAULT '' COMMENT '会话类型',
`from_user_id` bigint(64) NOT NULL DEFAULT '' COMMENT '消息的发送者',
`content` TEXT NOT NULL DEFAULT '' COMMENT '消息内容',
`statue` bigint(8) NOT NULL DEFAULT '' COMMENT '消息状态',
`create_time` bigint(20) NOT NULL COMMENT '创建事件',
`modify_time` bigint(20) NOT NULL COMMENT '更改时间',
`delete_time` bigint(20) NOT NULL COMMENT '删除事件',
PRIMARY KET (`id`),
UNIQUE KEY `uniq_seq_id` (`seq_id`)
) ENGINE=InnoDB DEFAULT CHARSET=urf8mb4 COMMENT=`历史消息表`

总结

当前IM项目是一个不断演进的生产级项目,后面也会不断完善其功能,架构设计的目的并不是追求完美,深度挖掘需求后各种目标的权衡才是关键,通过DDD拆解服务,通过状态机分析拆解接口 ,通过接口的工作负载情况,设计存储结构,这样能得到一个满足需求的解,而通过逐步添加约束条件考虑更大规模的问题,在架构中添加各种优化策略,最终形成一个可落地的方案。