前言
最近刚加入了rockgo开源项目,一边阅读源码一边记录。该项目是基于ECS思想,go语言实现的游戏服务端框架。截至今日,master分支实现的功能还不完全基于ECS,并且还有一些要完善。先对master分支现存的代码分析,后续完善代码补充文档。
模块
根据example中的SingleNodeWithActor例子分析,该例子比较全面,各个模块均使用到
conf & launcher
conf作为配置文件,用于指定当前节点是什么角色。具体实现是launcher功能, Server = RockGO.DefaultServer()中的Server实际上是launcherComponent.
1
2
3
Server.AddComponentGroup("gate", []ecs.IComponent{g})
Server.AddComponentGroup("room", []ecs.IComponent{&RoomManagerComponent{}})
Server.Serve()
通过AddComponentGroup将组件放在componentGroup成员变量中,下面的Serve函数名感觉有有点误导,所做的是事情是默认将NodeComponent和ActorProxyComponent也放在componentGroup中。所以componentGroup存放的是整个项目所用到的所有的component。指定具体某个节点的功能是AttachGroupsTo的作用,参数则是作为命令行SingleNodeWithActor.exe -node node_master的node参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (this *LauncherComponent) Serve() {
//添加NodeComponent组件,使对象成为分布式节点
this.Root().AddComponent(&Cluster.NodeComponent{})
//添加ActorProxy组件,组织节点间的通信
this.Root().AddComponent(&Actor.ActorProxyComponent{})
//添加组件到待选组件列表,默认添加master,child组件
this.AddComponentGroup("master", []ecs.IComponent{&Cluster.MasterComponent{}})
this.AddComponentGroup("child", []ecs.IComponent{&Cluster.ChildComponent{}})
if config.Config.ClusterConfig.IsLocationMode && config.Config.ClusterConfig.Role[0] != "single" {
this.AddComponentGroup("location", []ecs.IComponent{&Cluster.LocationComponent{}})
}
//处理single模式
if len(config.Config.ClusterConfig.Role) == 0 || config.Config.ClusterConfig.Role[0] == "single" {
config.Config.ClusterConfig.Role = this.componentGroup.AllGroupsName()
}
//添加基础组件组,一般通过组建组的定义决定服务器节点的服务角色
err := this.componentGroup.AttachGroupsTo(config.Config.ClusterConfig.Role, this.Root())
...
}
AttachGroupsTo会统合普遍情况并且从componentGroup选出符合conf的component, attachGroupTo则是为每一个componentGroup创建一个含有该group所有component的Object。所以结果如图是树状结构。

1
2
3
4
5
6
7
8
9
10
func (this *ComponentGroups) AttachGroupsTo(groupName []string, target *ecs.Object) error {
...
for _, name := range groupName {
if g, ok := this.group[name]; ok {
g.attachGroupTo(target)
}
...
}
return nil
}
gate & network
gate是客户端和服务端通信的入口。客户端和服务端通信方式写在network,服务端node之间的通信使用rpc。自定义的服务TestApi需要继承ApiBase,ApiBase再注册TestApi的服务,所用的方法大体一致。则是在基类中存一个父类的变量instance interface{},父类在初始化的时候将instance设置为自己。
1
2
3
4
5
6
7
8
9
10
11
12
13
type ApiBase struct {
//注入子类
instance interface{}
protoc MessageProtocol
parent *ecs.Object
isInit bool
}
func NewTestApi() *TestApi {
r := &TestApi{}
r.Instance(r).SetMT2ID(Testid2mt).SetProtocol(&MessageProtocol.JsonProtocol{})
return r
}
注册的具体过程,在netapi中存两个变量,用户再提供对应协议的对应关系。用于提供的对应关系保存在mt2id中。
1
2
3
4
5
6
7
8
var route = map[uint32]*methodType{}
var mt2id = map[reflect.Type]uint32{}
var Testid2mt = map[reflect.Type]uint32{
reflect.TypeOf(&TestMessage{}): 1,
reflect.TypeOf(&TestCreateRoom{}): 2,
reflect.TypeOf(&CreateResult{}): 3,
}
在Init过程中, 通过Init和已经得到的mt2id,初始化route。这里的用法见附的通过reflect调用函数, 筛选出符合条件的函数,满足条件是传入参数为3个,0是调用者本身,1为session类型,2为导出类型。这样可以route得到从index到方法的对应关系,方法包含调用者,函数本身,参数类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (this *DefaultGateComponent) Awake(ctx *ecs.Context) {
...
this.NetAPI.Init(this.Parent())
...
}
func (this *ApiBase) Init(parent ...*ecs.Object) {
...
this.RegisterGroup(this.instance)
...
}
func (this *ApiBase) RegisterGroup(api interface{}) {
this.checkInit()
typ := reflect.TypeOf(api)
//检查类型,如果是处理函数,改用 Register
switch typ.Kind() {
case reflect.Func:
this.Register(api)
return
}
logger.Info(fmt.Sprintf("====== start to register API group: [ %s ] ======", typ.Elem().Name()))
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
numin := mtype.NumIn()
if numin != 3 {
continue
}
sessType := mtype.In(1)
if sessType != st {
continue
}
argsType := mtype.In(2)
if !utils.IsExportedOrBuiltinType(argsType) {
continue
}
if index, ok := mt2id[argsType]; ok {
if _, exist := route[index]; exist {
panic(ErrApiRepeated)
} else {
route[index] = &methodType{
resv: reflect.ValueOf(api),
method: method.Func,
argsType: argsType,
}
}
logger.Info(fmt.Sprintf("Add api: [ %s ], handler: [ %s.%s(*network.Session,*%s) ]", argsType.Elem().Name(), typ.Elem().Name(), mname, argsType.Elem().Name()))
}
}
logger.Info(fmt.Sprintf("====== register API group: [ %s ] end ======", typ.Elem().Name()))
}
服务端服务循环, 取得handle并且监听处理, 这里可以自行扩展协议。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//Serve listen and handle
func (ts *Server) Serve() error {
h := ts.getHandler()
if err := h.Listen(); err != nil {
return err
}
return h.Handle()
}
func (ts *Server) getHandler() (sh ServerHandler) {
if ts.conf.Proto == "tcp" {
sh = &tcpHandler{conf: ts.conf, ts: ts}
} else if ts.conf.Proto == "udp" {
sh = &udpHandler{conf: ts.conf, ts: ts}
} else if ts.conf.Proto == "ws" {
sh = &websocketHandler{conf: ts.conf, ts: ts}
} else {
panic("unsupport protocol: " + ts.conf.Proto)
}
return
}
客户端发送协议和服务端注册协议一致,打解包也需要一致。客户端发送格式包长度-消息ID-消息体,解包调用取得对应的消息ID和消息体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//发送创建房间消息
{
var message = "{\"UID\":123456}"
//此处便宜行事,消息id写死,推荐做法,使用协议对应工具,使用消息结构体
//本客户端使用websocket,协议类型:4-4-n 包长度-消息ID-消息体
messageType := uint32(2)
msg := make([]byte, 4)
msg = append(msg, []byte(message)...)
binary.BigEndian.PutUint32(msg[:4], messageType)
}
func (h *websocketHandler) recv(sess *Session, conn *websocket.Conn) {
handler := func(poolCtx []interface{},args ...interface{}) {
...
if h.conf.Handler != nil {
h.conf.Handler(args[0].(*Session), args[1].([]byte))
} else {
mid, mes := h.conf.PackageProtocol.ParseMessage(ctx, args[1].([]byte))
if h.conf.NetAPI != nil && mid != nil {
h.ts.invoke(ctx, mid[0], mes)
}
...
}
}
for !h.ts.isClosed {
_, pkg, err := conn.ReadMessage()
...
// use goroutine pool
if h.conf.PoolMode {
var wid int32
var ok bool
m,PropertyOk:=sess.GetProperty("workerID")
if wid,ok=m.(int32);!PropertyOk || !ok{
wid = WORKER_ID_RANDOM
}
h.gpool.AddJobFixed(handler, []interface{}{sess, pkg}, wid)
} else {
go handler(nil,sess, pkg)
}
}
}
通过invoke->Route,再对消息体解包,这样函数和参数类型都可以通过route得到,完成函数调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (this *ApiBase) Route(sess *Session, messageID uint32, data []byte) {
this.checkInit()
defer utils.CheckError()
if mt, ok := route[messageID]; ok {
v := reflect.New(mt.argsType.Elem())
err := this.protoc.Unmarshal(data, v.Interface())
if err != nil {
logger.Debug(fmt.Sprintf("unmarshal message failed :%s ,%s", mt.argsType.Elem().Name(), err))
return
}
args := []reflect.Value{
mt.resv,
reflect.ValueOf(sess),
v,
}
mt.method.Call(args)
return
}
...
}
rpc & cluster
此项目的rpc是go自带的rpc,在此基础上做了些封装和修改。采用的是特有gob编码。rpc是cluster内部采用的通信方式。一个Service所能提供的服务通过NodeComponent的注册功能实现。
1
2
3
func (this *NodeComponent) Register(rcvr interface{}) error {
return this.rpcServer.Register(rcvr)
}
Server内部使用serviceMap sync.Map // map[string]*service存储方法名对应方法。这里用的方法和network基本一致,格式稍有差别,1参数要求导出,2参数要求导出,返回值是error类型。类似func (this *MasterService) ReportNodeInfo(args *NodeInfo, reply *bool) error {...}即可,这样建立函数名与函数的对应关系。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
...
// Install the methods
s.method = suitableMethods(s.typ, true)
...
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() < 2 {
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
continue
}
numin := mtype.NumIn()
var replyType reflect.Type
if numin > 2 {
// Second arg must be a pointer.
replyType = mtype.In(2)
if replyType.Kind() != reflect.Ptr {
continue
}
// reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
continue
}
} else {
replyType = nil
}
// Method needs one out.
if mtype.NumOut() != 1 {
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
logger.Info(fmt.Sprintf("rpc.RegisterGroup: service: [ %s ], method [ %s ] is registed", typ.Elem().Name(), mname))
}
return methods
}
一个服务调用另一个服务,首选需要与该服务建立连接。比如调用MasterService,则需要ConnectToMaster,连接完成之后会保存对应的rpcClient,不需要每次call重新建立连。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (this *ChildComponent) ConnectToMaster() {
addr := config.Config.ClusterConfig.MasterAddress
...
for {
...
this.rpcMaster, err = this.nodeComponent.ConnectToNode(addr, callback)
...
}
...
}
func (this *NodeComponent) ConnectToNode(addr string, callback func(event string, data ...interface{})) (*rpc.TcpClient, error) {
client, err := rpc.NewTcpClient("tcp", addr, callback)
if err != nil {
return nil, err
}
this.rpcClient.Store(addr, client)
logger.Info(fmt.Sprintf(" connect to node: [ %s ] success", addr))
return client, nil
}
取得一个rpcClient一般情况则是通过master或者location中转,比如err = loginNode.Call("LoginComponent.Login", message.Account, &pInfo),先取得loginNode(rpcClient),通过 node, err := nodec.GetNode("login", Cluster.SELECTOR_TYPE_MIN_LOAD)取得(选择策略可以自定义)。优先查询LocationService,查不到再查MasterService, 再通过上述的ConnectToNode取的rpcClient。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (this *NodeComponent) GetNode(role string, selectorType ...SelectorType) (*NodeID, error) {
var nodeID *NodeID
var err error
//优先查询位置服务器
if this.islocationMode {
nodeID, err = this.GetNodeFromLocation(role, selectorType...)
if err == nil {
return nodeID, nil
}
}
//位置服务器不存在或不可用时在master上查询
nodeID, err = this.GetNodeFromMaster(role, selectorType...)
if err != nil {
return nil, err
}
return nodeID, nil
}
新建一个rpcClient, rpcClient存有pending: make(map[uint64]*Call),作用是为了有回包的Call做记录,并且自动开启两个线程go client.input,go client.StartHeartBeat(client.closeHeartbeat), 心跳不必说,go client.input则是处理回包。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewClientWithConn(conn net.Conn, callback ...func(event string, data ...interface{})) *TcpClient {
encBuf := bufio.NewWriter(conn)
codec := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
client := &TcpClient{
conn: conn,
codec: codec,
pending: make(map[uint64]*Call),
}
...
go client.input()
client.closeHeartbeat = make(chan struct{})
go client.StartHeartBeat(client.closeHeartbeat)
return client
}
go client.input则是读取回包, 根据发包的seq取出之前的call,将返回信息解码到call的reply,同时标记call.done()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (client *TcpClient) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
...
case response.Error != "":
...
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
...
}
一个服务调用另一个服务流程如下,使用连接得到的rpcClient发起调用,根据传入的&reply判断是否需要回包。如果需要回包则会调用client.send(call)函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
err := this.rpcMaster.Call("MasterService.ReportNodeInfo", args, &reply)
func (client *TcpClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
...
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
...
}
func (client *TcpClient) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
...
call.Done = done
if reply == nil {
call.Type = RPC_CALL_TYPE_WITHOUTREPLY
client.sendWithoutReply(call)
} else {
call.Type = RPC_CALL_TYPE_NORMAL
client.send(call)
}
return call
}
client.send和client.sendWithoutReply基本上一致, go原生的rpc没有client.sendWithoutReply方法。区别在于client.send根据seq在pending中缓存当前call。client.sendWithoutReply在发包没有错误之后标记call.done()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (client *TcpClient) send(call *Call) {
...
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
client.request.Type = call.Type
err := client.codec.WriteRequest(&client.request, call.Args)
...
}
发起Call之后, Call函数阻塞在Call中,等待call.Done,无需回包的调用在调用结束之后直接标记call.done(),需要回包则在client.input中等待处理回包,处理完成后标记call.done()一次call的流程结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
func (client *TcpClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
if client.IsClosed() {
return ErrShutdown
}
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
select {
case <-timer.After(CallTimeout):
call.Error = ErrTimeout
return ErrTimeout
case <-call.Done:
return call.Error
}
}
作为rpc的server,则是解码,调用业务逻辑,然后再编码返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
...
codec.IOCallback()
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
...
}
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done()
}
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// ParseMessage the method, providing a new value for the reply.
args := []reflect.Value{s.rcvr, argv}
if mtype.ReplyType != nil {
args = append(args, replyv)
}
returnValues := function.Call(args)
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
if req.Type == RPC_CALL_TYPE_NORMAL {
reqi := replyv.Interface()
server.sendResponse(sending, req, reqi, codec, errmsg)
}
server.freeRequest(req)
}
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
sending.Lock()
if reply == nil {
reply = invalidRequest
}
err := codec.WriteResponse(resp, reply)
if debugLog && err != nil {
logger.Info("rpc: writing response:", err)
}
sending.Unlock()
server.freeResponse(resp)
}
cluster
集群中的rpc方式如上。一个集群默认有MasterService。如果是单节点,那么服务都在一起,同时具有MasterComponent,ChildComponent,如果采用Actor模式,则还会有ActorProxyComponent,一个集群默认有一个MasterService,其他所有的节点都是ChildComponent。ChildComponent的功能在于连接主节点,并且每隔一段时间上报信息。
1
2
3
4
5
func (this *ChildComponent) Awake(ctx *ecs.Context) {
...
go this.ConnectToMaster()
go this.DoReport()
}
主节点维持与所有节点的链接,并且存储所有子节点的信息,根据子节点的上报更新节点信息。如果负载过大则可以启动LocationService,LocationService将master存储的节点信息同步到自己。LocationService可以横向扩展,承载更多的节点。虽然会牺牲一些数据一致性,但是没有太大的影响,如果查询LocationService查不到,则还是查master然后同步到LocationService。如果master节点失效,LocationService还是可以提供服务,等待master重启即可。
1
2
3
4
5
func (this *LocationComponent) Awake(ctx *ecs.Context) {
...
go this.DoLocationSync()
}
ecs
这版的ecs形式有点类似unity。整个服务的循环如下。系统内注册的ISystem有AwakeSystem{}, StartSystem{}, UpdateSystem{}, DestroySystem{},然后依次调用每个System的UpdateFrame函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func NewServerNode() *Server {
//构造运行时
runtime := ecs.NewRuntime(ecs.Config{ThreadPoolSize: runtime.NumCPU()})
runtime.UpdateFrameByInterval(time.Millisecond * 100)
...
return launcher
}
func NewRuntime(config Config) *Runtime {
validateConfig(&config)
runtime := &Runtime{
root: NewObject("runtime"),
updateLock: &sync.Mutex{},
workers: threadpool.New(),
factory: config.Factory,
innerSystems: []ISystem{&AwakeSystem{}, &StartSystem{}, &UpdateSystem{}, &DestroySystem{}},
locker: &sync.RWMutex{},
}
runtime.root.runtime = runtime
runtime.workers.MaxThreads = config.ThreadPoolSize
for _, s := range runtime.innerSystems {
s.Init(runtime)
}
return runtime
}
func (this *Runtime) UpdateFrameByInterval(duration time.Duration) chan<- struct{} {
shutdown := make(chan struct{})
c := time.Tick(duration)
go func() {
ticking := false
for {
select {
case <-shutdown:
return
case <-c:
if ticking {
continue
}
//上一帧还未执行完毕时,跳过一帧,避免帧滚雪球
ticking = true
this.UpdateFrame()
ticking = false
}
}
}()
return shutdown
}
func (this *Runtime) UpdateFrame() {
this.locker.RLock()
defer this.locker.RUnlock()
//内部系统间是有序的,awake->start->update->destroy
for _, s := range this.innerSystems {
s.UpdateFrame()
}
//自定义系统整体顺序在内部系统之后,是否需要同系统独立执行,在updateFrame接口实现
for _, s := range this.customSystems {
s.UpdateFrame()
}
}
每个System的接口如下,
1
2
3
4
5
6
7
type ISystem interface {
Init(runtime *Runtime)
UpdateFrame()
Filter(component IComponent)
IndependentFilter(op int, component IComponent)
Name() string
}
每个System内部维护着Componentlist,在Init时初始化,在Filter时判断Component是否实现了该System的接口,比如LoginComponent实现了Awake方法,那么LoginComponent就会入AwakeSystem中的component list,在执行的时候,取出每个component,并执行其中的Awake函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (this *AwakeSystem) UpdateFrame() {
this.locker.Lock()
this.components, this.temp = this.temp, this.components
this.locker.Unlock()
for c := this.temp.Front(); c != nil; c = this.temp.Front() {
this.wg.Add(1)
ctx := &Context{
Runtime: this.runtime,
}
v := c.Value.(IAwake)
//name:=c.Value.(IComponent).Type().String()
this.runtime.workers.Run(func() {
//logger.Debug("awake: "+name)
v.Awake(ctx)
this.wg.Done()
})
this.temp.Remove(c)
}
this.wg.Wait()
}
Awake,Start,Destroy都是只执行一次,Update每帧都要执行,所以UpdateSystem的UpdateFrame的实现略有不同,需要考虑到新加入和删除的component,同时每次执行后不执行remove操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (this *UpdateSystem) UpdateFrame() {
this.locker.Lock()
//添加上一帧新添加的组件
if this.push.Len() > 0 {
this.components.PushBackList(this.push)
this.push = list.New()
}
//移除上一帧删除的组件,添加在前,移除在后,同一帧添加又移除,不执行update
if this.pop.Len() > 0 {
for p := this.pop.Front(); p != nil; p = p.Next() {
for c := this.components.Front(); c != nil; c = c.Next() {
if c.Value == p.Value {
this.components.Remove(c)
}
}
}
this.pop = list.New()
}
this.locker.Unlock()
Iter := iter.FromList(this.components)
for c, err := Iter.Next(); err == nil; c, err = Iter.Next() {
this.wg.Add(1)
ctx := &Context{
Runtime: this.runtime,
}
v := c.(IUpdate)
this.runtime.workers.Run(func() {
v.Update(ctx)
this.wg.Done()
})
}
this.wg.Wait()
}
component最基本的方法是AddComponent, 在AddComponent中通过GetRequire接口判断组件的依赖关系,依赖关系需要用户自己指明。然后通过SystemFilter将component放到对应的System中的list中,这样可以在UpdateFrame中得到执行。最后执行init.Initialize()函数。总结:当执行AddComponent时会立即执行其Initialize函数,并且Awake,Start,Update,Destroy会在后续由系统统一调度。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (o *Object) AddComponent(component IComponent) *Object {
component.Init(reflect.TypeOf(component), o.Runtime(), o)
err := o.WithLock(func() error {
...
if require, ok := component.(IRequire); ok {
for obj, requires := range require.GetRequire() {
for _, require := range requires {
if !obj.HasComponent(require) {
return ErrMissingComponent
}
}
}
}
o.components = append(o.components, component)
runtime := o._runtime()
if runtime != nil {
runtime.SystemFilter(component)
}
return nil
})
if err != nil {
logger.Error(err)
}
utils.Try(func() {
if init, ok := component.(IInit); ok {
init.Initialize()
}
})
return o
}
actor
actor模型对于整个的架构作用在于所有的远程的调用会以Actor为通信的起始点,只要知道ActorID就可以通过Tell方法通信。
ActorID是对NodeID的扩展,通过ActorID可以得到NodeID
1
2
Target such as "127.0.0.1:8888:XXXXXXX",
means "IP:PORT:LOCALACTORID"
Actor建立服务初始化服务接口使用AddHandler,每一个服务创建一个ActorComponent,并将ActorComponent(也可以看作是Actor)和ActorService到ActorComponentProxy中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (this *RoomManagerComponent) Awake(ctx *ecs.Context) {
...
//注册actor消息
this.AddHandler(Service_RoomMgr_NewRoom, this.NewRoom, true)
}
func (this *ActorBase) AddHandler(service string, handler func(message *ActorMessageInfo) error, isService ...bool) {
...
this.MessageHandler[service] = handler
actor := this.Actor()
if actor != nil && len(isService) > 0 {
err := actor.RegisterService(service)
...
}
}
func (this *ActorBase) Actor() *ActorComponent {
...
if this.actor == nil && this.parent != nil {
err := this.parent.Find(&this.actor)
if err != nil {
actor := NewActorComponent(this.actorType)
this.parent.AddComponent(actor)
this.actor = actor
}
}
return this.actor
}
一个Actor向另一个Actor总分为两种情况,一个是本地Actor调用,还有远程Actor调用。
gate向room发起请求,首先将自己提升为ActorServiceCaller,只是在ActorProxyComponent上封装一层缓存. Call的过程在没有缓存时首先获取ActorService.在AddHandler的时候每个ActorProxyComponent已经存有了ActorService,所以优先本地找服务,如果查找不到就需要到到MasterService或者LocationService查找(在rpc模块已经说过了),将查找到的ActorService返回,缓存后调用。这里最终的是本地ActorService和rpc返回的ActorService中的actor不是同一个类型。因为如果在本地的话,可以通过proxy直接获取ActorService,这时ActorService中的Actor实际上是ActorComponent,而通过rpc得到是actorID,重新初始化成的Actor。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (this *TestApi) CreateRoom(sess *network.Session, message *TestCreateRoom) {
//升级session为actor服务调用器
serviceCaller, err := this.Upgrade(sess)
...
//调用创建房间服务
reply, err := serviceCaller.Call("room", Service_RoomMgr_NewRoom, sess.ID)
}
func (this *ActorServiceCaller) Call(role string, serviceName string, args ...interface{}) ([]interface{}, error) {
var err error
//优先尝试缓存客户端,避免反复查询,尽量去中心化
service, ok := this.services[serviceName]
if ok {
res, err := service.Call(args...)
if err != nil {
delete(this.services, serviceName)
} else {
return res, err
}
}
//无缓存,或者通过缓存调用失败,重新查询调用
service, err = this.proxy.GetActorService(role, serviceName)
if err != nil {
return nil, err
}
this.services[serviceName] = service
res, err := service.Call(args...)
if err != nil {
delete(this.services, serviceName)
}
return res, err
}
func (this *ActorProxyComponent) GetActorService(role string, serviceName string) (*ActorService, error) {
var service *ActorService
var err error
//优先尝试本地服务
service, err = this.GetLocalActorService(serviceName)
if err == nil {
return service, nil
}
//获取远程服务
if role == LOCAL_SERVICE {
return nil, errors.New("role is empty")
}
client, err := this.nodeComponent.GetNodeClientByRole(role)
if err != nil {
return nil, err
}
var reply ActorID
err = client.Call("ActorProxyService.ServiceInquiry", serviceName, &reply)
if err != nil {
return nil, err
}
return NewActorService(NewActor(reply, this), serviceName), nil
}
res, err := service.Call(args...)这里service中actor的实际类型是不确定的,如果是本地的Call,则Actor是ActorComponent,如果是远程Call,则Actor(概念)是Actor(interface)
1
2
3
4
5
6
7
8
9
func (this *ActorService) Call(args ...interface{}) ([]interface{}, error) {
mes := NewActorMessage(this.Service, args...)
reply := &ActorMessage{}
err := this.actor.Tell(nil, mes, &reply)
if err != nil {
return nil, err
}
return reply.Data, nil
}
当是ActorComponent时,直接将消息放在本地的队列里。
1
2
3
4
5
6
7
8
9
10
func (this *ActorComponent) Tell(sender IActor, message *ActorMessage, reply ...**ActorMessage) error {
messageInfo := &ActorMessageInfo{
Sender: sender,
Message: message,
}
...
this.queueReceive <- messageInfo
...
return messageInfo.err
}
当是Actor时,调用Emit,Emit先判断是否本地,如果本地流程和上面一样,如果不是本地就走rpc。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (this *Actor) Tell(sender IActor, message *ActorMessage, reply ...**ActorMessage) error {
messageInfo := &ActorMessageInfo{
Sender: sender,
Message: message,
}
if len(reply) != 0 {
messageInfo.reply = reply[0]
}
return this.proxy.Emit(this.ID(), messageInfo)
}
func (this *ActorProxyComponent) Emit(actorID ActorID, messageInfo *ActorMessageInfo) error {
senderID := "unknown"
if messageInfo.Sender != nil {
senderID = messageInfo.Sender.ID().String()
}
logger.Debug(fmt.Sprintf("actor: [ %s ] send message [ %s ] to actor [ %s ]", senderID, messageInfo.Message.Service, actorID.String()))
nodeID := actorID.GetNodeID()
//本地消息不走网络
if nodeID == this.nodeID {
return this.LocalTell(actorID, messageInfo)
}
//非本地消息走网络代理
client, err := this.nodeComponent.GetNodeClient(nodeID)
if err != nil {
return err
}
var sender ActorID
if messageInfo.Sender != nil {
sender = messageInfo.Sender.ID()
}
err = client.Call("ActorProxyService.Tell", &ActorRpcMessageInfo{
Target: actorID,
Sender: sender,
Message: messageInfo.Message}, messageInfo.reply)
if err != nil {
return err
}
return nil
}
这里应该会有疑问,当时Actor已经确定是远程调用了,为什么还要判断是否是本地。原因在于远程调用的函数ActorProxyService.Tell调用的也是Emit,对于本地来说是远端,对于远端是本地,代码直接复用。
1
2
3
4
5
6
7
8
func (this *ActorProxyService) Tell(args *ActorRpcMessageInfo, reply *ActorMessage) error {
minfo := &ActorMessageInfo{
Sender: NewActor(args.Sender, this.proxy),
Message: args.Message,
reply: &reply,
}
return this.proxy.Emit(args.Target, minfo)
}
接收方是ActorComponent中的queueReceive chan *ActorMessageInfo //接收消息队列,在ActorComponentInitialize时go this.dispatch()。接收到消息就会开始处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (this *ActorComponent) dispatch() {
var messageInfo *ActorMessageInfo
var ok bool
for {
select {
case <-this.close:
atomic.StoreInt32(&this.active, 0)
close(this.queueReceive)
//收到关闭信号后会继续处理完剩余消息
case messageInfo, ok = <-this.queueReceive:
if !ok {
return
}
switch this.ActorType {
case ACTOR_TYPE_SYNC:
this.handle(messageInfo)
case ACTOR_TYPE_ASYNC:
go this.handle(messageInfo)
default:
}
}
}
}
这里的处理对所有Component做筛选,所以处理函数都是形如func(message *ActorMessageInfo) error,然后具体执行函数体。执行结束标记this.isDone = true,回包是rpc实现的,所以即使本身节点没有提供服务的Actor,但是可以向其他Actor请求并正确回包。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (this *ActorComponent) handle(messageInfo *ActorMessageInfo) {
cps := this.Parent().AllComponents()
var err error = nil
var val interface{}
for val, err = cps.Next(); err == nil; val, err = cps.Next() {
if messageHandler, ok := val.(IActorMessageHandler); ok {
if handler, ok := messageHandler.MessageHandlers()[messageInfo.Message.Service]; ok {
this.Catch(handler, messageInfo)
}
}
}
}
func (this *ActorComponent) Catch(handler func(message *ActorMessageInfo) error, m *ActorMessageInfo) {
...
err := handler(m)
if err != nil {
m.replyError(err)
} else {
m.replySuccess()
}
}
func (this *ActorMessageInfo) replySuccess() {
if this.done != nil && !this.isDone {
this.done <- struct{}{}
this.isDone = true
}
}
附
通过reflect调用函数
通过下面两种方式调用都可以,第一种不用传调用本身,第二种需要。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Stu struct {
Name string
}
func (this *Stu) Fn(p1, p2 int) int {
return p1 + p2
}
func main() {
s := &Stu{"Hank"}
valueS := reflect.ValueOf(s)
method := valueS.MethodByName("Fn")
paramList := []reflect.Value{
reflect.ValueOf(22),
reflect.ValueOf(20),
}
resultList := method.Call(paramList)
fmt.Println(resultList[0].Int()) // 42
paramList2 := []reflect.Value{
reflect.ValueOf(s),
reflect.ValueOf(22),
reflect.ValueOf(20),
}
typ := reflect.TypeOf(s)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
fmt.Println(mtype, " ", mname)
fmt.Println(method.Func.Call(paramList2)[0].Int()) // 42
}
}