goim源码剖析
Contents
Comet
comet为用户代理服务器,用于客户端的连接,根据情况可部署多个comet(部署机房选择以用户接入为基础,如:最近接入、按运营商接入)。
流程图
comet支持tcp以及websocket的方式和客户端交互,而和logic、job模块之间的交互采用rpc的方式,comet的主要流程图如下:


- 客户端首先连接到
comet服务,comet调用logic来校验用户的合法性,logic会返回一个subKey给comet,该subKey成为该连接的唯一标示; - 客户端接下来可以发心跳包给
comet,同时,job服务将MQ-Kafka的消息转发到对应comet,comet再将其转发到对应的客户端
其中Logic前面可以加上一层4层代理服务器,如LVS。
主要逻辑代码分析
bucket
|
|

房间信息推送流程:
|
|
单播推送流程:
单播相对于”房播(群播)”,简化了房间检索的步骤。
|
|
注:
subKey的生成采用city32的hash算法,bucket是一个大小为n的hashMap slice,其主要目的是将数据切分成更小的块,从而降低资源的竞争。
多播以单播推送流程类似,广播也类似。
room
|
|
next *Channel是一个双向链表,复杂度为o(1),效率比较高。
tcp
协议:

BodySize范围:0<=BodySize<=MaxBodySize = int32(1<<10)PackSize范围:RawHeaderSize<=PackSize<=MacPackSize(RawHeaderSize+BodySize)
websocket
RPC
rpc用于logic、job service模块的通信。
|
|
- func (this *PushRPC) Ping(arg *proto.NoArg, reply *proto.NoReply) error
心跳rpc
- func (this *PushRPC) PushMsg(arg *proto.PushMsgArg, reply *proto.NoReply) (err error)
根据指定的subKey推送消息
- func (this *PushRPC) MPushMsg(arg *proto.MPushMsgArg, reply *proto.MPushMsgReply) (err error)
根据指定的subKey一次推送多个消息,主要用于群播
- func (this *PushRPC) MPushMsgs(arg *proto.MPushMsgsArg, reply *proto.MPushMsgsReply) (err error)
多播
- func (this *PushRPC) Broadcast(arg *proto.BoardcastArg, reply *proto.NoReply) (err error)
广播
- func (this *PushRPC) BroadcastRoom(arg *proto.BoardcastRoomArg, reply *proto.NoReply) (err error)
按房间推送
- func (this *PushRPC) Rooms(arg *proto.NoArg, reply *proto.RoomsReply) (err error)
获取所有房间,包括下线的房间
ring

ring是一个环形对象池,用于管理协议对象-proto,其内存结构如图所示,rp为可读游标,wp为可写游标,内存的大小为4的整数倍。
- func (r *Ring) Get() (proto *proto.Proto, err error)
获取一个proto对象的引用,用于读,不会移动可读游标
- func (r *Ring) GetAdv()
移动可读游标
- func (r *Ring) Set() (proto *proto.Proto, err error)
获取一个proto对象的引用,用于写,不会移动可写游标
- func (r *Ring) SetAdv()
移动可写游标
- func (r *Ring) Reset()
重置pool对象
注:
如果wp移动过快,会影响rp游标指向的数据,如图:

当程序运行到第3步的时候,wp已经又重新超过rp的索引了,这时候,第6个对象还没被rp读取过,但是它的数据已经被修改了,这样即使rp读取第6个对象,也是一个dirty对象。
Signal
信号处理模块,可用于在线加载配置,配置动态加载的信号为SIGHUP。
Job
Job负责消费kafka消息,然后转发至comet。
单/多播

广播

按房间推送

主要逻辑代码分析
comet

|
|
- func (c *Comet) Push(arg *proto.MPushMsgArg) (err error)
循环选择一个MPushMsgArg channel,将消息推送至该信道
- func (c *Comet) BroadcastRoom(arg *proto.BoardcastRoomArg) (err error)
按照房间推送,循环选择一个BoardcastRoomArg Channel,将消息推送至该信道
- func (c *Comet) Broadcast(arg *proto.BoardcastArg) (err error)
广播,循环选择一个BoardcastArg Channe,将消息推送至该信道
- func (c *Comet) process(pushChan chan *proto.MPushMsgArg, roomChan chan *proto.BoardcastRoomArg, broadcastChan chan *proto.BoardcastArg)
IO复用协程,作为pushChan,roomChan,broadcastChan信道的消费者,消费完后,转发comet service
room
|
|
room模块用于接收kafka的push模块发送的消息,每个room都有一个协程,其协程的信道缓冲区的大小为RoomOptions.BatchNum的两倍。
注:
1、room协程在接收到的消息条数>=BatchNum*2或者timeout时,才会触发转发消息的行为(转发至broadcastRoutines),即其具有汇总操作。
2、房间的消息使用了Libs/bytes.Writer进行汇总缓存。
push/kafka
push/kafka模块用于预处理消息,消息从kafka集群流出,经过kafka模块转发至push模块,push模块对消息预处理/过滤/分类,然后发至不同的comet 信道中,具体使用请参照Job的前3章图.
消息的分类:
KAFKA_MESSAGE_MULTI=>多播KAFKA_MESSAGE_BROADCAST=>广播KAFKA_MESSAGE_BROADCAST_ROOM=>群播/房播
Round/RoundOptions
时钟管理器
Logic

Logic是goim是主要业务处理模块,负责的内容有:
- 注册/注销
- 验证
- 消息
Push代理
协议
Push协议
其中ensure参数是额外的参数,用于控制消息是否必达,为布偶值。
其他http协议
删除comet service
| 接口名 | URL | 访问方式 |
|---|---|---|
删除comet service |
/1/server/del |
POST |
获取所有comet service或者room信息 |
/1/count |
POST |
请求例子:
/1/server/del
1
|
curl -XPOST "http://127.0.0.1:7172/1/server/del?server=1 |
/1/count
1
|
curl -XPOST "http://127.0.0.1:7172/1/count?type=room |
主要逻辑代码分析
Auth
|
|
用户验证模块
router
负责与Router Service交互,多个Router Service采用的是一致性hash算法,hash的输入为Router Serviceid,默认所有的Router Service权值是一样的,如果需要控制不同的权值,可以在配置router service的时候加多个端口实例或者同一个节点配置成多个serviceid标签。
注:
一致性hash没有实现动态扩容,即没有自动平衡,所以不能够动态改变Router service映射配置,且每个Logic节点的Router Service配置项需保持一致。
- func getRouterByServer(server string) (*xrpc.Clients, error)
通过router serverid获取对应的router service client
- func getRouterByUID(userID int64) (*xrpc.Clients, error)
通过userID获取router service client
- func getRouterNode(userID int64) string
通过userID获取到Router 节点
- func connect(userID int64, server, roomId int32) (seq int32, err error)
注册、验证用户,返回一个自增序列号(cometServiceID_incr)
- func disconnect(userID int64, seq, roomId int32) (has bool, err error)
注销一个用户
- func delServer(server int32) (err error)
下线一个comet service
- func allRoomCount(client *xrpc.Clients) (counter map[int32]int32, err error
获取所有房间个数
- func allServerCount(client *xrpc.Clients) (counter map[int32]int32, err error)
获取所有comet-service
- func genSubKey(userId int64) (res map[int32][]string)
通过用户ID生成subKey;同一个用户可以同时多处登陆或者同处多实例登陆,它们都会被同等对待。
subKey生成代码:
|
|
seq为每个comet service的自增长序索引,假设comet service 1接受到一个用户请求,用户的ID为10,而此时Router buckets的seqIdx为9,则subKey = 10_9
- func getSubKeys(res chan *proto.MGetReply, serverId string, userIds []int64) | func genSubKeys(userIds []int64) (divide map[int32][]string)
并行获取多个用户信息,返回值为map[comet.serverId][]subkey.
rpc
|
|
LogicService用于心跳的检测以及客户端的注册/注销。
- func (r *RPC) Connect(arg *proto.ConnArg, reply *proto.ConnReply) (err error)
注册
- func (r *RPC) Disconnect(arg *proto.DisconnArg, reply *proto.DisconnReply) (err error)
注销
注:
goim不支持离线消息,如果需要支持离线消息的推送,应hook该模块,实现离线消息推送逻辑
Router
主要逻辑代码分析
session
|
|
客户端会话信息管理,以用户id为单位,即每个用户有且拥有一个session,session包含了用户每个连接的comet service信息,以及每个连接所属的roomid。
- func (s *Session) Put(server int32) (seq int32)
关联一个subKey和session
- func (s *Session) PutRoom(server int32, roomId int32) (seq int32)
关联一个subKey到comet servicey 以及room
- func (s *Session) Servers() (seqs []int32, servers []int32)
返回session关联的所有comet service信息
- func (s *Session) Del(seq int32) (has, empty bool, server int32)
删除指定的subKey所关联的Session.Servers
- func (s *Session) DelRoom(seq int32, roomId int32) (has, empty bool, server int32)
删除指定的subKey、roomid所关联的Session.rooms
- func (s *Session) Count() int
返回session所关联的所有comet service信息
bucket
|
|
- func (b *Bucket) counter(userId int64, server int32, roomId int32, incr bool)
增加一个用户或者减少一个用户
- func (b *Bucket) Put(userId int64, server int32, roomId int32) (seq int32)
关联channel(session)到指定的用户id,comet id以及room id
- func (b *Bucket) Get(userId int64) (seqs []int32, servers []int32)
指定用户的session信息
- func (b *Bucket) GetAll() (userIds []int64, seqs [][]int32, servers [][]int32)
返回该bucket的所有用户、subKey、comet信息
其他函数省略…
cleaner
|
|
lru对象管理器,只负责管理,不负责触发GC,GC交给Runtime处理。
主要应用于客户端的session管理,定时处理掉一些过期的session对象。
1、数据结构使用map和双向列表,map用于快速检索;
2、双向链表用于快速剔除数据:因为从map中剔除数据,map的结构会实时改变,每剔除一个都得再次从起点开始遍历map,而使用链表不用重新遍历,时间复杂度为
O(logN)
Libs
缓冲io-bufio
Reader
|
|
Reader是一个具有缓存的可读IO。

主要函数
- func (b *Reader) Reset(r io.Reader)
重置IO,可读游标重置为0
- func (b *Reader) ResetBuffer(r io.Reader, buf []byte)
重置IO,可读游标重置为0,且b.buf变为buf
- func (b *Reader) Peek(n int) ([]byte, error)
窥探缓存的n个字节,可读游标维持不变,可写游标可能会改变;
如果可窥探的数据少于n,则调用b.fill()尝试读取远端数据用于填充b.buf.
- func (b *Reader) Pop(n int) ([]byte, error)
返回[b.r:b.r+n]处的数据,该函数会调用b.Peek,而且会改变可读游标和科协游标
- func (b *Reader) Discard(n int) (discarded int, err error)
丢弃n个数据;如果缓冲区的可读数据小于n,则一致调用b.fill()尝试读取远程的数据,直到b.buf的可读缓冲区大于或者等于n或者出现网络异常为止
- func (b *Reader) Read(p []byte) (n int, err error)
读取缓冲区数据;
如果缓冲区为空,为了提高效率,避免应用层的数据拷贝(kernel net stack=>b.buf==>p),直接将kernel net stack拷贝到p []byte,并且调用f.fill()整理缓冲区.
- func (b *Reader) fill()

fill读取远程新块数据到本地缓冲区的b.buf
fill会有数据的移动,如图 5-2所示。
Writer
|
|
Writer是一个具有缓存的可写IO.
- func (b *Writer) Reset(w io.Writer) {
重置IO,可写游标重置为0,句柄变为w
- func (b *Writer) ResetBuffer(w io.Writer, buf []byte)
重置IO,可写游标重置为0,句柄变为w,缓冲区变为buf
- func (b *Writer) flush() error
刷新缓冲区,将本地缓冲区的数据发送至内核网络栈;
游标被重置(不一定会被重置为0,可能为其他值,因为本地缓冲的数据大于内核可写缓冲区,这时还会造成数据的搬迁)。

- func (b *Writer) Available() int
可写字节数
- func (b *Writer) Buffered()
已写缓冲大小
- func (b *Writer) Write(p []byte) (nn int, err error)
将p []byte写到缓冲区或者直接写到网络内核栈(此时缓冲区已满),可能造成可写游标的移动
- func (b *Writer) WriteRaw(p []byte) (nn int, err error)
和b.Write()类似
- func (b *Writer) Peek(n int) ([]byte, error)
窥探可写缓冲区剩余值,如果可写缓冲区的剩余值小于n,会调用flush,可写游标相应可能会改变
timer
timer是一个最小堆算法实现的时钟对象,串行执行时钟任务,所以时钟任务应该尽量小,timer不适合太耗时的任务,当然用户可以控制时钟任务的并发。
bytes
Pool
|
|
pool内存组织如下,pool是一个链式存储的栈,数据从栈顶出,同时数据也从栈顶回收。

- func (p *Pool) Get() (b *Buffer)
获取一个缓冲区
- func (p *Pool) Put(b *Buffer)
归还一个缓冲区
- func (p *Pool) grow()
重置Pool对象
注:
pool 是一个不限大小的内存池,如果栈没有数据了,那么pool会调用glow()重新生成数据,所以最后可能造成的内存架构如下图所示

如果租借的速度大于归还的速度,会造成内存的溢出。
Writer
|
|
writer是一个具有缓冲的可写IO。
- func (w *Writer) Size()
缓冲区的大小
- func (w *Writer) Reset()
重置缓冲区,游标重置为0
- func (w *Writer) Buffer() []byte
返回缓冲区的内容
- func (w *Writer) Peek(n int) []byte
窥探缓冲区的n个字节,如果缓冲区的剩余空间小于n,则会调用w.grow()自增长数据缓冲区,游标会移动
- func (w *Writer) grow(n int)
按照2倍的大小增长缓冲区,会发生数据的移动
net
xrpc
根据原生的rpc封装的,其调用采用异步的方式,具有重连功能。
xrpc并没有做负载均衡的工作,只是简单做一下容灾而已,相对来时不是很灵活,用户可以稍微修改一下,就支持负载均衡了。
同时xrpc也没有在代码层面上实现[host:por]连接池,即同一个[host:port]配置只会有一个rpc长链接,除非增加多配置。
proto
主要的rpc协议说明:
job
|
|
logic
客户端上线:
|
|
客户端下线:
|
|
comet
Push RPC模块
心跳:
|
|
Job---->comet
|
|
router
增加用户:
|
|
移除用户:
|
|
其他:
|
|
推送协议
参照官网