架构方案设计总结
IM整体架构设计总结
一款全世界都在使用的企业IM系统
指标假设
- DAU 5亿+,每人每天100条消息,收发消息p99 200ms以下,QPS平均50w,峰值75w
- 每条消息10KB,则每日存储增长量约 450TB $(100*10*5*10^8KB)$
实现的功能
- 单聊/群聊/聊天室/多设备登陆/在线状态
- 文本消息/多媒体消息/离线同步/历史消息/消息漫游
- 多端同步/消息撤回/已读未读/离线推送
- 添加好友/会话列表/好友列表
服务划分
分层结构
IM架构图
对于接入层,可以将其要解决的问题分为四个部分:
- 获取最佳IP地址,使得客户端接入最佳的网关机,因为未来使得连接的质量更高,选择最近负载最低的网关机将获得收益
- 对连接的管理,保证长连接被可靠的持有,连接断开之后能够快速的重连,维护连接的基本路由信息,实现基本的上行/下行消息的收发功能,以最小的内存占用为优化目标,是的单机存储更多的长连接,尽可能减少有状态服务重启对用户体验的影响
- 隔离变更频繁的控制状态,心跳,重试,回执,路由等状态,尽可能的使用分布式内存,计算与存储分离,保证处理逻辑的频繁迭代不会影响线上功能
- 下行消息下发时,减少网络调用次数,降低单聊下行消息的延迟,提高群聊下行消息的吞吐
对于业务层来说,其为了实现业务功能之外,为应对快速迭代的需求,其在设计上需要更多的考虑可维护性。基于经典的洋葱架构,我们可以简单的拆解为应用层,领域层,基础设施层,对于复杂业务也可以设置BFF服务(Backend for fronted,前后端分离)
- 应用层:对于应用层来说,用来处理简单的面向业务需求的处理逻辑,比如权限管理,业务规则,数据聚合等需求
- 领域层:领域层用来沉淀业务无关可跨领域服用的功能需求,提高核心的业务价值,主要可以分为用户,会话,消息三个部分,用户管理负责用户获取用户信息,用户的配置,生成用户ID等功能,会话管理用来分配会话ID管理群聊和单聊的元信息,组内成员列表,加入群,推出群,禁言等对会话进行管理等功能,消息管理也以消息作为实体,控制消息的同步,存储,状态变更等控制行为
- 基础设施层:基于依赖倒置原则,保证领域层不依赖任何基础设施,而是通过定义标准的适配接口屏蔽对基础设施库的依赖,在基础设施层来封装,Mq,redis,timeline,rpc,服务发现等基础组件的时候,由于定义了适配接口实现了依赖倒置,基础设施层可以随意替换不必修改代码,从业务角度来看,基础设施的代码虽然重要,但不是业务核心,可随时替换最佳组件
对于存储服务来说,大型互联网架构上,为方便数据可复用,都会抽象出独立的数据服务,也更加方便数据团队维护和迭代,通常可以叫做逻辑数据库,面向业务抽象定制化等存储模型,用来更加便携的描述数据特征,对业务提供更灵活高效的数据存储服务。
对存储服务来说,可以划分为,普通结构化数据的存储,例如用户信息,会话信息。对于计数数据,由于其巨量的吞吐我们将其存储在特殊的专用引擎上并封装为counter server,更好的提供极大规模的计数服务,对于消息服务来说其挑战在于短时效消息的及时可靠的多端同步,海量消息的存储和检索,因此抽象出专用的timeline模型,对业务层屏蔽存储细节,存储模型底层基于数据时效实现冷热数据分离存储,权衡高吞吐,低延迟,高可靠,低成本等技术目标的综合效果最大化。
消息状态机
状态机分析法
IM本质是三端通信,从消息的角度去看,一次收发过程,可以得到一个状态机。
当发送方将消息发送给服务端时,如果服务端通过clientID判断了消息幂等,则服务端接收到消息,消息变更为分配ID到状态,在此状态接入层会调用message server,message server会请求seq server获得唯一的seqID,然后异步的写入到timeline server中,并尝试立即进行在线消息的同步。
message server调用state server的push rpc将消息分发下去,此时消息进入下行中状态,如果当前接收方不在线,则下行消息将被拒绝,如果接收方在线,则客户端通过seqID对下行消息进行幂等处理,保证仅发送一次,消息状态变为接收成功,否则由于网络丢包,seqID重复等问题导致客户端多次接收相同消息,则此消息将被拒绝,网络超时等原因是可重试的,则会变为下行失败,交由接入层state server中的消息飞行计时器判断,超时后则会进行重试,消息的状态进入下行重试阶段,重试成功则进入接收成功的状态。
当接收方打开app后,会自动进入消息拉取中,通过api gateway访问message server,其从timeline server中拉取离线消息,timeline server根据策略自动进行冷热数据的分离,无需区别离线还是历史数据的存储
如果拉取的消息过多,客户端会进行分页消息的拉取,拉取失败则会重新刷新,知道将消息补洞为止,保证消息的可靠送达。
如果客户端拉取历史消息,则处理逻辑同上。
接口定义
接入层1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62syntax = "proto3";
package messagepack;
// 作为持有与客户端建立的长连接服务,有状态可靠性要求高
service IMGateway{
// 由客户端调用,上行消息,这里用rpc表达出来,实际物理形式会自己写网络模型处理
rpc uplink(UplinkReq) returns (BaseResponse) {}
// 由state server调用,告知网关机下推push消息给客户端
rpc downlink(DownlinkReq) returns (BaseResponse) {}
}
// 查询IP列表的服务,本质是一个http服务,治理使用rpc进行表达
service IPConfig {
// 由客户端调用,获取ip列表
rpc IPListInfo(IPListInfoReq) returns (IPListInfoRes) {}
}
// 接入层策略服务,用来处理消息
service State{
// 由gateway server调用,将上行消息传入,解析消息类型并做出相应处理
rpc handleMessage (UplinkReq) returns (BaseResponse) {}
// 由下游message server调用,将下行消息发送给接入层
rpc push(DownlinkReq) returns (BaseResponse){}
}
// 此为客户端与服务端消息协议中的变长消息头,解析后保证Qos的处理机制
// 最后逻辑交由state server继续完成
message UplinkMessageHeader{
uint32 client_id = 1;
uint32 session_id = 2;
}
message DownlinkMessageHeader{
uint32 seq_id = 1;
uint32 session_id = 2;
uint32 device_id = 3;
}
message UplinkReq {
UplinkMessageHeader header = 1;
bytes payload = 2;
}
message DownlinkReq {
DownlinkMessageHeader header = 1;
bytes payload = 2;
}
message IPListInfoReq {
string client_ip = 1;
}
message BaseResponse{
int32 errno = 1;
ErrorType errmsg = 2;
}
message IPListInfoRes {
repeated string endpoint_list = 1;
BaseResponse beas_res = 2;
}
业务层1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66syntax = "proto3";
package messagepack;
service Message {
// 发送消息
rpc send(SendReq) returns (BaseResponse) {}
// 拉取消息
rpc pull(PushReq) returns (BaseResponse) {}
// 消息回执
rpc receipt(PushReq) returns (baseResponse) {}
}
enum MessageType {
BASEMSG = 1; // 默认文本消息
RECEIPTMSG = 2; // 回执消息
CONTROLMSG = 2; // 控制消息
}
enum SessoionType {
C2C = 1; // 单聊会话
GROUP = 2; // 群聊会话
}
enum ErrorType {
REFUSE = 1; // 拒绝,一种不可重试性错误
OK = 2; // 代表成功,成功作为一种特殊的失败处理
FAILED = 3; // 失败是可重试的错误
}
message BizMsgHead {
string form_user_id = 1;
string session_id = 2;
string to_user_id = 3;
SessoionType sessoion_type = 4;
MessageType message_type = 5;
}
message IMMessage {
BizMsgHead head = 1;
bytes content = 2;
}
message ACK {
BizMsgHead head 1;
}
message SendReq{
string message_type = 1;
bytes payload = 2;
}
message PushReq {
MessageType message_type = 1;
bytes payload = 2;
}
message PullReq {
MessageType Message_type = 1;
bytes payload = 2;
}
message BaseResponse {
int32 errno = 1;
ErrorType errmsg = 2;
}
存储设计
KV的设计
- 序号生成:redis key=seq_id:{session_id}, value:int64
- 离线消息:
- redis key=offline_msg:u{user_id} value:zset(sorce为seqID,number为message的pb)
- redis key=offline_msg:s{session_id} value:zset(sorce为seqID,number为message的pb)
- 在线消息:
- 单聊(redis key=online_router:session_to_did:{session_id}, value=pb(device_id List))
- 群聊(gateway local men key=sessionID, value=list{connID})
- Counter server:
- 总未读数:key=count_total:{user_id}, value=int64
- 会话未读数:key=count:{user_id}:{session_id}, value=int64
timeline server是一个用来存储消息数据的数据服务,所有对消息的存取都通过timeline server,而timeline server将封装对底层数据库的细节,离线消息将使用redis缓存,历史消息将使用hbase等列存储模型,但上游服务将完全不感知,至于内部的冷热数据交换策略,则由timeline server来维护。
序列号生成器是另一个存储层的组件,专用来生成消息的seqID,会话和用户的userID也可以由此序列号生成器生成,也可以由mysql自增主键生成,在这里使用redis的incrby命令进行严格递增并通过lua脚本进行跳变防止id回退,对于更复杂的消息序列号生成方案后续补充。
Counter server专用于存储计数数据,主要用来解决消息未读数的变更频繁问题,counter server消费message server中消息回执变更消息,通过合并变更来降低最终写入DB的QPS,底层使用分布式rocksdb来做存储引擎,实现高并发的读写。
数据表的设计
1 | -- 一个sql上的逻辑表示 |
总结
当前IM项目是一个不断演进的生产级项目,后面也会不断完善其功能,架构设计的目的并不是追求完美,深度挖掘需求后各种目标的权衡才是关键,通过DDD拆解服务,通过状态机分析拆解接口 ,通过接口的工作负载情况,设计存储结构,这样能得到一个满足需求的解,而通过逐步添加约束条件考虑更大规模的问题,在架构中添加各种优化策略,最终形成一个可落地的方案。