goim源码剖析
Contents
Comet
comet
为用户代理服务器,用于客户端的连接,根据情况可部署多个comet
(部署机房选择以用户接入为基础,如:最近接入、按运营商接入)。
流程图
comet
支持tcp
以及websocket
的方式和客户端交互,而和logic
、job
模块之间的交互采用rpc
的方式,comet
的主要流程图如下:
- 客户端首先连接到
comet
服务,comet
调用logic
来校验用户的合法性,logic
会返回一个subKey
给comet
,该subKey
成为该连接的唯一标示; - 客户端接下来可以发心跳包给
comet
,同时,job
服务将MQ-Kafka
的消息转发到对应comet
,comet
再将其转发到对应的客户端
其中Logic前面可以加上一层4层代理服务器,如LVS。
主要逻辑代码分析
bucket
|
|
房间信息推送流程:
|
|
单播推送流程:
单播相对于”房播(群播)”,简化了房间检索的步骤。
|
|
注:
subKey的生成采用city32的hash算法,bucket是一个大小为n的hashMap slice,其主要目的是将数据切分成更小的块,从而降低资源的竞争。
多播以单播推送流程类似,广播也类似。
room
|
|
next *Channel
是一个双向链表,复杂度为o(1)
,效率比较高。
tcp
协议:
BodySize
范围:0<=BodySize<=MaxBodySize = int32(1<<10)
PackSize
范围:RawHeaderSize<=PackSize<=MacPackSize(RawHeaderSize+BodySize)
websocket
RPC
rpc
用于logic
、job service
模块的通信。
|
|
- func (this *PushRPC) Ping(arg *proto.NoArg, reply *proto.NoReply) error
心跳rpc
- func (this *PushRPC) PushMsg(arg *proto.PushMsgArg, reply *proto.NoReply) (err error)
根据指定的subKey
推送消息
- func (this *PushRPC) MPushMsg(arg *proto.MPushMsgArg, reply *proto.MPushMsgReply) (err error)
根据指定的subKey
一次推送多个消息,主要用于群播
- func (this *PushRPC) MPushMsgs(arg *proto.MPushMsgsArg, reply *proto.MPushMsgsReply) (err error)
多播
- func (this *PushRPC) Broadcast(arg *proto.BoardcastArg, reply *proto.NoReply) (err error)
广播
- func (this *PushRPC) BroadcastRoom(arg *proto.BoardcastRoomArg, reply *proto.NoReply) (err error)
按房间推送
- func (this *PushRPC) Rooms(arg *proto.NoArg, reply *proto.RoomsReply) (err error)
获取所有房间,包括下线的房间
ring
ring
是一个环形对象池,用于管理协议对象-proto
,其内存结构如图所示,rp
为可读游标,wp为可写游标,内存的大小为4
的整数倍。
- func (r *Ring) Get() (proto *proto.Proto, err error)
获取一个proto
对象的引用,用于读,不会移动可读游标
- func (r *Ring) GetAdv()
移动可读游标
- func (r *Ring) Set() (proto *proto.Proto, err error)
获取一个proto
对象的引用,用于写,不会移动可写游标
- func (r *Ring) SetAdv()
移动可写游标
- func (r *Ring) Reset()
重置pool
对象
注:
如果wp移动过快,会影响rp游标指向的数据,如图:
当程序运行到第3
步的时候,wp
已经又重新超过rp
的索引了,这时候,第6
个对象还没被rp
读取过,但是它的数据已经被修改了,这样即使rp
读取第6个对象,也是一个dirty
对象。
Signal
信号处理模块,可用于在线加载配置,配置动态加载的信号为SIGHUP
。
Job
Job
负责消费kafka
消息,然后转发至comet
。
单/多播
广播
按房间推送
主要逻辑代码分析
comet
|
|
- func (c *Comet) Push(arg *proto.MPushMsgArg) (err error)
循环选择一个MPushMsgArg channel
,将消息推送至该信道
- func (c *Comet) BroadcastRoom(arg *proto.BoardcastRoomArg) (err error)
按照房间推送,循环选择一个BoardcastRoomArg Channel
,将消息推送至该信道
- func (c *Comet) Broadcast(arg *proto.BoardcastArg) (err error)
广播,循环选择一个BoardcastArg Channe
,将消息推送至该信道
- func (c *Comet) process(pushChan chan *proto.MPushMsgArg, roomChan chan *proto.BoardcastRoomArg, broadcastChan chan *proto.BoardcastArg)
IO
复用协程,作为pushChan
,roomChan
,broadcastChan
信道的消费者,消费完后,转发comet service
room
|
|
room
模块用于接收kafka
的push
模块发送的消息,每个room
都有一个协程,其协程的信道缓冲区的大小为RoomOptions.BatchNum
的两倍。
注:
1、room协程在接收到的消息条数>=BatchNum*2或者timeout时,才会触发转发消息的行为(转发至broadcastRoutines),即其具有汇总操作。
2、房间的消息使用了Libs/bytes.Writer进行汇总缓存。
push/kafka
push/kafka
模块用于预处理消息,消息从kafka集群
流出,经过kafka
模块转发至push
模块,push
模块对消息预处理/过滤/分类,然后发至不同的comet 信道
中,具体使用请参照Job的
前3章图.
消息的分类:
KAFKA_MESSAGE_MULTI
=>多播KAFKA_MESSAGE_BROADCAST
=>广播KAFKA_MESSAGE_BROADCAST_ROOM
=>群播/房播
Round/RoundOptions
时钟管理器
Logic
Logic
是goim
是主要业务处理模块,负责的内容有:
- 注册/注销
- 验证
- 消息
Push
代理
协议
Push协议
其中ensure
参数是额外的参数,用于控制消息是否必达,为布偶值。
其他http协议
删除comet service
接口名 | URL | 访问方式 |
---|---|---|
删除comet service |
/1/server/del |
POST |
获取所有comet service 或者room 信息 |
/1/count |
POST |
请求例子:
/1/server/del
1
|
curl -XPOST "http://127.0.0.1:7172/1/server/del?server=1 |
/1/count
1
|
curl -XPOST "http://127.0.0.1:7172/1/count?type=room |
主要逻辑代码分析
Auth
|
|
用户验证模块
router
负责与Router Service
交互,多个Router Service
采用的是一致性hash
算法,hash
的输入为Router Serviceid
,默认所有的Router Service
权值是一样的,如果需要控制不同的权值,可以在配置router service
的时候加多个端口实例或者同一个节点配置成多个serviceid
标签。
注:
一致性hash没有实现动态扩容,即没有自动平衡,所以不能够动态改变Router service映射配置,且每个Logic节点的Router Service配置项需保持一致。
- func getRouterByServer(server string) (*xrpc.Clients, error)
通过router serverid
获取对应的router service client
- func getRouterByUID(userID int64) (*xrpc.Clients, error)
通过userID
获取router service client
- func getRouterNode(userID int64) string
通过userID
获取到Router 节点
- func connect(userID int64, server, roomId int32) (seq int32, err error)
注册、验证用户,返回一个自增序列号(cometServiceID_incr
)
- func disconnect(userID int64, seq, roomId int32) (has bool, err error)
注销一个用户
- func delServer(server int32) (err error)
下线一个comet service
- func allRoomCount(client *xrpc.Clients) (counter map[int32]int32, err error
获取所有房间个数
- func allServerCount(client *xrpc.Clients) (counter map[int32]int32, err error)
获取所有comet-service
- func genSubKey(userId int64) (res map[int32][]string)
通过用户ID
生成subKey
;同一个用户可以同时多处登陆或者同处多实例登陆,它们都会被同等对待。
subKey生成代码:
|
|
seq
为每个comet service
的自增长序索引,假设comet service 1
接受到一个用户请求,用户的ID
为10
,而此时Router buckets
的seqIdx
为9
,则subKey = 10_9
- func getSubKeys(res chan *proto.MGetReply, serverId string, userIds []int64) | func genSubKeys(userIds []int64) (divide map[int32][]string)
并行获取多个用户信息,返回值为map[comet.serverId][]subkey
.
rpc
|
|
LogicService
用于心跳的检测以及客户端的注册/注销。
- func (r *RPC) Connect(arg *proto.ConnArg, reply *proto.ConnReply) (err error)
注册
- func (r *RPC) Disconnect(arg *proto.DisconnArg, reply *proto.DisconnReply) (err error)
注销
注:
goim不支持离线消息,如果需要支持离线消息的推送,应hook该模块,实现离线消息推送逻辑
Router
主要逻辑代码分析
session
|
|
客户端会话信息管理,以用户id
为单位,即每个用户有且拥有一个session
,session
包含了用户每个连接的comet service
信息,以及每个连接所属的roomid
。
- func (s *Session) Put(server int32) (seq int32)
关联一个subKey
和session
- func (s *Session) PutRoom(server int32, roomId int32) (seq int32)
关联一个subKey
到comet servicey
以及room
- func (s *Session) Servers() (seqs []int32, servers []int32)
返回session
关联的所有comet service
信息
- func (s *Session) Del(seq int32) (has, empty bool, server int32)
删除指定的subKey
所关联的Session.Servers
- func (s *Session) DelRoom(seq int32, roomId int32) (has, empty bool, server int32)
删除指定的subKey
、roomid
所关联的Session.rooms
- func (s *Session) Count() int
返回session
所关联的所有comet service
信息
bucket
|
|
- func (b *Bucket) counter(userId int64, server int32, roomId int32, incr bool)
增加一个用户或者减少一个用户
- func (b *Bucket) Put(userId int64, server int32, roomId int32) (seq int32)
关联channel(session)
到指定的用户id
,comet id
以及room id
- func (b *Bucket) Get(userId int64) (seqs []int32, servers []int32)
指定用户的session
信息
- func (b *Bucket) GetAll() (userIds []int64, seqs [][]int32, servers [][]int32)
返回该bucket
的所有用户、subKey
、comet
信息
其他函数省略…
cleaner
|
|
lru
对象管理器,只负责管理,不负责触发GC
,GC
交给Runtime
处理。
主要应用于客户端的session
管理,定时处理掉一些过期的session
对象。
1、数据结构使用map和双向列表,map用于快速检索;
2、双向链表用于快速剔除数据:因为从map中剔除数据,map的结构会实时改变,每剔除一个都得再次从起点开始遍历map,而使用链表不用重新遍历,时间复杂度为
O(logN)
Libs
缓冲io-bufio
Reader
|
|
Reader
是一个具有缓存的可读IO
。
主要函数
- func (b *Reader) Reset(r io.Reader)
重置IO
,可读游标重置为0
- func (b *Reader) ResetBuffer(r io.Reader, buf []byte)
重置IO
,可读游标重置为0
,且b.buf
变为buf
- func (b *Reader) Peek(n int) ([]byte, error)
窥探缓存的n
个字节,可读游标维持不变,可写游标可能会改变;
如果可窥探的数据少于n
,则调用b.fill()
尝试读取远端数据用于填充b.buf
.
- func (b *Reader) Pop(n int) ([]byte, error)
返回[b.r:b.r+n]
处的数据,该函数会调用b.Peek
,而且会改变可读游标和科协游标
- func (b *Reader) Discard(n int) (discarded int, err error)
丢弃n
个数据;如果缓冲区的可读数据小于n
,则一致调用b.fill()
尝试读取远程的数据,直到b.buf
的可读缓冲区大于或者等于n
或者出现网络异常为止
- func (b *Reader) Read(p []byte) (n int, err error)
读取缓冲区数据;
如果缓冲区为空,为了提高效率,避免应用层的数据拷贝(kernel net stack=>b.buf==>p
),直接将kernel net stack
拷贝到p []byte
,并且调用f.fill()
整理缓冲区.
- func (b *Reader) fill()
fill
读取远程新块数据到本地缓冲区的b.buf
fill
会有数据的移动,如图 5-2
所示。
Writer
|
|
Writer
是一个具有缓存的可写IO
.
- func (b *Writer) Reset(w io.Writer) {
重置IO
,可写游标重置为0
,句柄变为w
- func (b *Writer) ResetBuffer(w io.Writer, buf []byte)
重置IO
,可写游标重置为0
,句柄变为w
,缓冲区变为buf
- func (b *Writer) flush() error
刷新缓冲区,将本地缓冲区的数据发送至内核网络栈;
游标被重置(不一定会被重置为0,可能为其他值,因为本地缓冲的数据大于内核可写缓冲区,这时还会造成数据的搬迁)。
- func (b *Writer) Available() int
可写字节数
- func (b *Writer) Buffered()
已写缓冲大小
- func (b *Writer) Write(p []byte) (nn int, err error)
将p []byte
写到缓冲区或者直接写到网络内核栈(此时缓冲区已满),可能造成可写游标的移动
- func (b *Writer) WriteRaw(p []byte) (nn int, err error)
和b.Write()类似
- func (b *Writer) Peek(n int) ([]byte, error)
窥探可写缓冲区剩余值,如果可写缓冲区的剩余值小于n
,会调用flush
,可写游标相应可能会改变
timer
timer
是一个最小堆算法实现的时钟对象,串行执行时钟任务,所以时钟任务应该尽量小,timer
不适合太耗时的任务,当然用户可以控制时钟任务的并发。
bytes
Pool
|
|
pool
内存组织如下,pool
是一个链式存储的栈
,数据从栈顶出,同时数据也从栈顶回收。
- func (p *Pool) Get() (b *Buffer)
获取一个缓冲区
- func (p *Pool) Put(b *Buffer)
归还一个缓冲区
- func (p *Pool) grow()
重置Pool
对象
注:
pool 是一个不限大小的内存池,如果栈没有数据了,那么pool会调用glow()重新生成数据,所以最后可能造成的内存架构如下图所示
如果租借的速度大于归还的速度,会造成内存的溢出。
Writer
|
|
writer
是一个具有缓冲的可写IO
。
- func (w *Writer) Size()
缓冲区的大小
- func (w *Writer) Reset()
重置缓冲区,游标重置为0
- func (w *Writer) Buffer() []byte
返回缓冲区的内容
- func (w *Writer) Peek(n int) []byte
窥探缓冲区的n
个字节,如果缓冲区的剩余空间小于n
,则会调用w.grow()
自增长数据缓冲区,游标会移动
- func (w *Writer) grow(n int)
按照2
倍的大小增长缓冲区,会发生数据的移动
net
xrpc
根据原生的rpc
封装的,其调用采用异步的方式,具有重连功能。
xrpc
并没有做负载均衡的工作,只是简单做一下容灾而已,相对来时不是很灵活,用户可以稍微修改一下,就支持负载均衡了。
同时xrpc
也没有在代码层面上实现[host:por]
连接池,即同一个[host:port]
配置只会有一个rpc
长链接,除非增加多配置。
proto
主要的rpc
协议说明:
job
|
|
logic
客户端上线:
|
|
客户端下线:
|
|
comet
Push RPC
模块
心跳:
|
|
Job---->comet
|
|
router
增加用户:
|
|
移除用户:
|
|
其他:
|
|
推送协议
参照官网