rockgo实现思路

分布式游戏服务器框架

Posted by Yaoxh6 on September 1, 2022

前言

最近刚加入了rockgo开源项目,一边阅读源码一边记录。该项目是基于ECS思想,go语言实现的游戏服务端框架。截至今日,master分支实现的功能还不完全基于ECS,并且还有一些要完善。先对master分支现存的代码分析,后续完善代码补充文档。

模块

根据example中的SingleNodeWithActor例子分析,该例子比较全面,各个模块均使用到

conf & launcher

conf作为配置文件,用于指定当前节点是什么角色。具体实现是launcher功能, Server = RockGO.DefaultServer()中的Server实际上是launcherComponent.

1
2
3
    Server.AddComponentGroup("gate", []ecs.IComponent{g})
    Server.AddComponentGroup("room", []ecs.IComponent{&RoomManagerComponent{}})
    Server.Serve()

通过AddComponentGroup将组件放在componentGroup成员变量中,下面的Serve函数名感觉有有点误导,所做的是事情是默认将NodeComponentActorProxyComponent也放在componentGroup中。所以componentGroup存放的是整个项目所用到的所有的component。指定具体某个节点的功能是AttachGroupsTo的作用,参数则是作为命令行SingleNodeWithActor.exe -node node_master的node参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (this *LauncherComponent) Serve() {
	//添加NodeComponent组件,使对象成为分布式节点
	this.Root().AddComponent(&Cluster.NodeComponent{})

	//添加ActorProxy组件,组织节点间的通信
	this.Root().AddComponent(&Actor.ActorProxyComponent{})

	//添加组件到待选组件列表,默认添加master,child组件
	this.AddComponentGroup("master", []ecs.IComponent{&Cluster.MasterComponent{}})
	this.AddComponentGroup("child", []ecs.IComponent{&Cluster.ChildComponent{}})
	if config.Config.ClusterConfig.IsLocationMode && config.Config.ClusterConfig.Role[0] != "single" {
		this.AddComponentGroup("location", []ecs.IComponent{&Cluster.LocationComponent{}})
	}

	//处理single模式
	if len(config.Config.ClusterConfig.Role) == 0 || config.Config.ClusterConfig.Role[0] == "single" {
		config.Config.ClusterConfig.Role = this.componentGroup.AllGroupsName()
	}

	//添加基础组件组,一般通过组建组的定义决定服务器节点的服务角色
	err := this.componentGroup.AttachGroupsTo(config.Config.ClusterConfig.Role, this.Root())
    ...
}

AttachGroupsTo会统合普遍情况并且从componentGroup选出符合conf的component, attachGroupTo则是为每一个componentGroup创建一个含有该group所有component的Object。所以结果如图是树状结构。 object

1
2
3
4
5
6
7
8
9
10
func (this *ComponentGroups) AttachGroupsTo(groupName []string, target *ecs.Object) error {
    ...
	for _, name := range groupName {
		if g, ok := this.group[name]; ok {
			g.attachGroupTo(target)
		}
        ...
	}
	return nil
}

gate & network

gate是客户端和服务端通信的入口。客户端和服务端通信方式写在network,服务端node之间的通信使用rpc。自定义的服务TestApi需要继承ApiBase,ApiBase再注册TestApi的服务,所用的方法大体一致。则是在基类中存一个父类的变量instance interface{},父类在初始化的时候将instance设置为自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
type ApiBase struct {
	//注入子类
	instance interface{}
	protoc   MessageProtocol
	parent   *ecs.Object
	isInit   bool
}

func NewTestApi() *TestApi {
	r := &TestApi{}
	r.Instance(r).SetMT2ID(Testid2mt).SetProtocol(&MessageProtocol.JsonProtocol{})
	return r
}

注册的具体过程,在netapi中存两个变量,用户再提供对应协议的对应关系。用于提供的对应关系保存在mt2id中。

1
2
3
4
5
6
7
8
var route = map[uint32]*methodType{}
var mt2id = map[reflect.Type]uint32{}

var Testid2mt = map[reflect.Type]uint32{
	reflect.TypeOf(&TestMessage{}):    1,
	reflect.TypeOf(&TestCreateRoom{}): 2,
	reflect.TypeOf(&CreateResult{}):   3,
}

Init过程中, 通过Init和已经得到的mt2id,初始化route。这里的用法见附的通过reflect调用函数, 筛选出符合条件的函数,满足条件是传入参数为3个,0是调用者本身,1为session类型,2为导出类型。这样可以route得到从index到方法的对应关系,方法包含调用者,函数本身,参数类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (this *DefaultGateComponent) Awake(ctx *ecs.Context) {
	...
	this.NetAPI.Init(this.Parent())
	...
}

func (this *ApiBase) Init(parent ...*ecs.Object) {
	...
	this.RegisterGroup(this.instance)
	...
}

func (this *ApiBase) RegisterGroup(api interface{}) {
	this.checkInit()

	typ := reflect.TypeOf(api)

	//检查类型,如果是处理函数,改用 Register
	switch typ.Kind() {
	case reflect.Func:
		this.Register(api)
		return
	}

	logger.Info(fmt.Sprintf("====== start to register API group: [ %s ] ======", typ.Elem().Name()))
	for m := 0; m < typ.NumMethod(); m++ {
		method := typ.Method(m)
		mtype := method.Type
		mname := method.Name
		// Method must be exported.
		if method.PkgPath != "" {
			continue
		}
		numin := mtype.NumIn()
		if numin != 3 {
			continue
		}

		sessType := mtype.In(1)
		if sessType != st {
			continue
		}

		argsType := mtype.In(2)
		if !utils.IsExportedOrBuiltinType(argsType) {
			continue
		}

		if index, ok := mt2id[argsType]; ok {
			if _, exist := route[index]; exist {
				panic(ErrApiRepeated)
			} else {
				route[index] = &methodType{
					resv:     reflect.ValueOf(api),
					method:   method.Func,
					argsType: argsType,
				}
			}
			logger.Info(fmt.Sprintf("Add api: [ %s ], handler: [ %s.%s(*network.Session,*%s) ]", argsType.Elem().Name(), typ.Elem().Name(), mname, argsType.Elem().Name()))
		}
	}
	logger.Info(fmt.Sprintf("======   register API group: [ %s ] end   ======", typ.Elem().Name()))
}

服务端服务循环, 取得handle并且监听处理, 这里可以自行扩展协议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//Serve listen and handle
func (ts *Server) Serve() error {
	h := ts.getHandler()
	if err := h.Listen(); err != nil {
		return err
	}
	return h.Handle()
}

func (ts *Server) getHandler() (sh ServerHandler) {
	if ts.conf.Proto == "tcp" {
		sh = &tcpHandler{conf: ts.conf, ts: ts}
	} else if ts.conf.Proto == "udp" {
		sh = &udpHandler{conf: ts.conf, ts: ts}
	} else if ts.conf.Proto == "ws" {
		sh = &websocketHandler{conf: ts.conf, ts: ts}
	} else {
		panic("unsupport protocol: " + ts.conf.Proto)
	}
	return
}

客户端发送协议和服务端注册协议一致,打解包也需要一致。客户端发送格式包长度-消息ID-消息体,解包调用取得对应的消息ID消息体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
	//发送创建房间消息
	{
		var message = "{\"UID\":123456}"
		//此处便宜行事,消息id写死,推荐做法,使用协议对应工具,使用消息结构体
		//本客户端使用websocket,协议类型:4-4-n  包长度-消息ID-消息体
		messageType := uint32(2)
		msg := make([]byte, 4)
		msg = append(msg, []byte(message)...)
		binary.BigEndian.PutUint32(msg[:4], messageType)
	}

	func (h *websocketHandler) recv(sess *Session, conn *websocket.Conn) {
		handler := func(poolCtx []interface{},args ...interface{}) {
			...
			if h.conf.Handler != nil {
				h.conf.Handler(args[0].(*Session), args[1].([]byte))
			} else {
				mid, mes := h.conf.PackageProtocol.ParseMessage(ctx, args[1].([]byte))
				if h.conf.NetAPI != nil && mid != nil {
					h.ts.invoke(ctx, mid[0], mes)
				}
				...
			}
		}
		for !h.ts.isClosed {
			_, pkg, err := conn.ReadMessage()
			...
			// use goroutine pool
			if h.conf.PoolMode {
				var wid int32
				var ok bool
				m,PropertyOk:=sess.GetProperty("workerID")
				if wid,ok=m.(int32);!PropertyOk || !ok{
					wid = WORKER_ID_RANDOM
				}
				h.gpool.AddJobFixed(handler, []interface{}{sess, pkg}, wid)
			} else {
				go handler(nil,sess, pkg)
			}
		}
	}

通过invoke->Route,再对消息体解包,这样函数和参数类型都可以通过route得到,完成函数调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (this *ApiBase) Route(sess *Session, messageID uint32, data []byte) {
	this.checkInit()
	defer utils.CheckError()

	if mt, ok := route[messageID]; ok {
		v := reflect.New(mt.argsType.Elem())
		err := this.protoc.Unmarshal(data, v.Interface())
		if err != nil {
			logger.Debug(fmt.Sprintf("unmarshal message failed :%s ,%s", mt.argsType.Elem().Name(), err))
			return
		}
		args := []reflect.Value{
			mt.resv,
			reflect.ValueOf(sess),
			v,
		}
		mt.method.Call(args)
		return
	}
	...
}

rpc & cluster

此项目的rpc是go自带的rpc,在此基础上做了些封装和修改。采用的是特有gob编码。rpc是cluster内部采用的通信方式。一个Service所能提供的服务通过NodeComponent的注册功能实现。

1
2
3
func (this *NodeComponent) Register(rcvr interface{}) error {
	return this.rpcServer.Register(rcvr)
}

Server内部使用serviceMap sync.Map // map[string]*service存储方法名对应方法。这里用的方法和network基本一致,格式稍有差别,1参数要求导出,2参数要求导出,返回值是error类型。类似func (this *MasterService) ReportNodeInfo(args *NodeInfo, reply *bool) error {...}即可,这样建立函数名与函数的对应关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
	s := new(service)
	s.typ = reflect.TypeOf(rcvr)
	s.rcvr = reflect.ValueOf(rcvr)
	sname := reflect.Indirect(s.rcvr).Type().Name()
	...
	// Install the methods
	s.method = suitableMethods(s.typ, true)
	...
	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
		return errors.New("rpc: service already defined: " + sname)
	}
	return nil
}

func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
	methods := make(map[string]*methodType)
	for m := 0; m < typ.NumMethod(); m++ {
		method := typ.Method(m)
		mtype := method.Type
		mname := method.Name
		// Method must be exported.
		if method.PkgPath != "" {
			continue
		}
		// Method needs three ins: receiver, *args, *reply.
		if mtype.NumIn() < 2 {
			continue
		}
		// First arg need not be a pointer.
		argType := mtype.In(1)
		if !isExportedOrBuiltinType(argType) {
			continue
		}
		numin := mtype.NumIn()

		var replyType reflect.Type
		if numin > 2 {
			// Second arg must be a pointer.
			replyType = mtype.In(2)
			if replyType.Kind() != reflect.Ptr {
				continue
			}
			// reply type must be exported.
			if !isExportedOrBuiltinType(replyType) {
				continue
			}
		} else {
			replyType = nil
		}

		// Method needs one out.
		if mtype.NumOut() != 1 {
			continue
		}
		// The return type of the method must be error.
		if returnType := mtype.Out(0); returnType != typeOfError {
			continue
		}
		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
		logger.Info(fmt.Sprintf("rpc.RegisterGroup: service: [ %s ], method [ %s ] is registed", typ.Elem().Name(), mname))
	}
	return methods
}

一个服务调用另一个服务,首选需要与该服务建立连接。比如调用MasterService,则需要ConnectToMaster,连接完成之后会保存对应的rpcClient,不需要每次call重新建立连。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (this *ChildComponent) ConnectToMaster() {
	addr := config.Config.ClusterConfig.MasterAddress
	...
	for {
		...
		this.rpcMaster, err = this.nodeComponent.ConnectToNode(addr, callback)
		...
	}
	...
}

func (this *NodeComponent) ConnectToNode(addr string, callback func(event string, data ...interface{})) (*rpc.TcpClient, error) {
	client, err := rpc.NewTcpClient("tcp", addr, callback)
	if err != nil {
		return nil, err
	}

	this.rpcClient.Store(addr, client)

	logger.Info(fmt.Sprintf("  connect to node: [ %s ] success", addr))
	return client, nil
}

取得一个rpcClient一般情况则是通过master或者location中转,比如err = loginNode.Call("LoginComponent.Login", message.Account, &pInfo),先取得loginNode(rpcClient),通过 node, err := nodec.GetNode("login", Cluster.SELECTOR_TYPE_MIN_LOAD)取得(选择策略可以自定义)。优先查询LocationService,查不到再查MasterService, 再通过上述的ConnectToNode取的rpcClient。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (this *NodeComponent) GetNode(role string, selectorType ...SelectorType) (*NodeID, error) {
	var nodeID *NodeID
	var err error
	//优先查询位置服务器
	if this.islocationMode {
		nodeID, err = this.GetNodeFromLocation(role, selectorType...)
		if err == nil {
			return nodeID, nil
		}
	}
	//位置服务器不存在或不可用时在master上查询
	nodeID, err = this.GetNodeFromMaster(role, selectorType...)
	if err != nil {
		return nil, err
	}
	return nodeID, nil
}

新建一个rpcClient, rpcClient存有pending: make(map[uint64]*Call),作用是为了有回包的Call做记录,并且自动开启两个线程go client.input,go client.StartHeartBeat(client.closeHeartbeat), 心跳不必说,go client.input则是处理回包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewClientWithConn(conn net.Conn, callback ...func(event string, data ...interface{})) *TcpClient {
	encBuf := bufio.NewWriter(conn)
	codec := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
	client := &TcpClient{
		conn:    conn,
		codec:   codec,
		pending: make(map[uint64]*Call),
	}
	...
	go client.input()

	client.closeHeartbeat = make(chan struct{})
	go client.StartHeartBeat(client.closeHeartbeat)

	return client
}

go client.input则是读取回包, 根据发包的seq取出之前的call,将返回信息解码到callreply,同时标记call.done()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (client *TcpClient) input() {
	var err error
	var response Response
	for err == nil {
		response = Response{}
		err = client.codec.ReadResponseHeader(&response)
		if err != nil {
			break
		}
		seq := response.Seq
		client.mutex.Lock()
		call := client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()

		switch {
		case call == nil:
			...
		case response.Error != "":
			...
		default:
			err = client.codec.ReadResponseBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()
		}
	}
	...
}

一个服务调用另一个服务流程如下,使用连接得到的rpcClient发起调用,根据传入的&reply判断是否需要回包。如果需要回包则会调用client.send(call)函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
err := this.rpcMaster.Call("MasterService.ReportNodeInfo", args, &reply)

func (client *TcpClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
	...
	call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
	...
}

func (client *TcpClient) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	...
	call.Done = done
	if reply == nil {
		call.Type = RPC_CALL_TYPE_WITHOUTREPLY
		client.sendWithoutReply(call)
	} else {
		call.Type = RPC_CALL_TYPE_NORMAL
		client.send(call)
	}
	return call
}

client.sendclient.sendWithoutReply基本上一致, go原生的rpc没有client.sendWithoutReply方法。区别在于client.send根据seqpending中缓存当前call。client.sendWithoutReply在发包没有错误之后标记call.done()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (client *TcpClient) send(call *Call) {
	...
	seq := client.seq
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()

	// Encode and send the request.
	client.request.Seq = seq
	client.request.ServiceMethod = call.ServiceMethod
	client.request.Type = call.Type
	err := client.codec.WriteRequest(&client.request, call.Args)
	...
}

发起Call之后, Call函数阻塞在Call中,等待call.Done,无需回包的调用在调用结束之后直接标记call.done(),需要回包则在client.input中等待处理回包,处理完成后标记call.done()一次call的流程结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (client *TcpClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
	if client.IsClosed() {
		return ErrShutdown
	}
	call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
	select {
	case <-timer.After(CallTimeout):
		call.Error = ErrTimeout
		return ErrTimeout
	case <-call.Done:
		return call.Error
	}
}

作为rpc的server,则是解码,调用业务逻辑,然后再编码返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	wg := new(sync.WaitGroup)
	for {
		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
		...
		codec.IOCallback()
		wg.Add(1)
		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
	}
	...
}

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
	if wg != nil {
		defer wg.Done()
	}
	mtype.Lock()
	mtype.numCalls++
	mtype.Unlock()
	function := mtype.method.Func
	// ParseMessage the method, providing a new value for the reply.
	args := []reflect.Value{s.rcvr, argv}
	if mtype.ReplyType != nil {
		args = append(args, replyv)
	}
	returnValues := function.Call(args)
	// The return value for the method is an error.
	errInter := returnValues[0].Interface()
	errmsg := ""
	if errInter != nil {
		errmsg = errInter.(error).Error()
	}
	if req.Type == RPC_CALL_TYPE_NORMAL {
		reqi := replyv.Interface()
		server.sendResponse(sending, req, reqi, codec, errmsg)
	}
	server.freeRequest(req)
}

func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
	resp := server.getResponse()
	// Encode the response header
	resp.ServiceMethod = req.ServiceMethod
	if errmsg != "" {
		resp.Error = errmsg
		reply = invalidRequest
	}
	resp.Seq = req.Seq
	sending.Lock()

	if reply == nil {
		reply = invalidRequest
	}
	err := codec.WriteResponse(resp, reply)
	if debugLog && err != nil {
		logger.Info("rpc: writing response:", err)
	}

	sending.Unlock()
	server.freeResponse(resp)
}

cluster

集群中的rpc方式如上。一个集群默认有MasterService。如果是单节点,那么服务都在一起,同时具有MasterComponent,ChildComponent,如果采用Actor模式,则还会有ActorProxyComponent,一个集群默认有一个MasterService,其他所有的节点都是ChildComponentChildComponent的功能在于连接主节点,并且每隔一段时间上报信息。

1
2
3
4
5
func (this *ChildComponent) Awake(ctx *ecs.Context) {
	...
	go this.ConnectToMaster()
	go this.DoReport()
}

主节点维持与所有节点的链接,并且存储所有子节点的信息,根据子节点的上报更新节点信息。如果负载过大则可以启动LocationService,LocationServicemaster存储的节点信息同步到自己。LocationService可以横向扩展,承载更多的节点。虽然会牺牲一些数据一致性,但是没有太大的影响,如果查询LocationService查不到,则还是查master然后同步到LocationService。如果master节点失效,LocationService还是可以提供服务,等待master重启即可。

1
2
3
4
5
func (this *LocationComponent) Awake(ctx *ecs.Context) {
	...
	go this.DoLocationSync()
}

ecs

这版的ecs形式有点类似unity。整个服务的循环如下。系统内注册的ISystemAwakeSystem{}, StartSystem{}, UpdateSystem{}, DestroySystem{},然后依次调用每个SystemUpdateFrame函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func NewServerNode() *Server {
	//构造运行时
	runtime := ecs.NewRuntime(ecs.Config{ThreadPoolSize: runtime.NumCPU()})
	runtime.UpdateFrameByInterval(time.Millisecond * 100)
	...
	return launcher
}

func NewRuntime(config Config) *Runtime {
	validateConfig(&config)
	runtime := &Runtime{
		root:         NewObject("runtime"),
		updateLock:   &sync.Mutex{},
		workers:      threadpool.New(),
		factory:      config.Factory,
		innerSystems: []ISystem{&AwakeSystem{}, &StartSystem{}, &UpdateSystem{}, &DestroySystem{}},
		locker:       &sync.RWMutex{},
	}
	runtime.root.runtime = runtime
	runtime.workers.MaxThreads = config.ThreadPoolSize
	for _, s := range runtime.innerSystems {
		s.Init(runtime)
	}
	return runtime
}

func (this *Runtime) UpdateFrameByInterval(duration time.Duration) chan<- struct{} {
	shutdown := make(chan struct{})
	c := time.Tick(duration)
	go func() {
		ticking := false
		for {
			select {
			case <-shutdown:
				return
			case <-c:
				if ticking {
					continue
				}
				//上一帧还未执行完毕时,跳过一帧,避免帧滚雪球
				ticking = true
				this.UpdateFrame()
				ticking = false
			}
		}
	}()
	return shutdown
}

func (this *Runtime) UpdateFrame() {
	this.locker.RLock()
	defer this.locker.RUnlock()
	//内部系统间是有序的,awake->start->update->destroy
	for _, s := range this.innerSystems {
		s.UpdateFrame()
	}
	//自定义系统整体顺序在内部系统之后,是否需要同系统独立执行,在updateFrame接口实现
	for _, s := range this.customSystems {
		s.UpdateFrame()
	}
}

每个System的接口如下,

1
2
3
4
5
6
7
type ISystem interface {
	Init(runtime *Runtime)
	UpdateFrame()
	Filter(component IComponent)
	IndependentFilter(op int, component IComponent)
	Name() string
}

每个System内部维护着Componentlist,在Init时初始化,在Filter时判断Component是否实现了该System的接口,比如LoginComponent实现了Awake方法,那么LoginComponent就会入AwakeSystem中的component list,在执行的时候,取出每个component,并执行其中的Awake函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (this *AwakeSystem) UpdateFrame() {
	this.locker.Lock()
	this.components, this.temp = this.temp, this.components
	this.locker.Unlock()

	for c := this.temp.Front(); c != nil; c = this.temp.Front() {
		this.wg.Add(1)
		ctx := &Context{
			Runtime: this.runtime,
		}

		v := c.Value.(IAwake)
		//name:=c.Value.(IComponent).Type().String()
		this.runtime.workers.Run(func() {
			//logger.Debug("awake: "+name)
			v.Awake(ctx)
			this.wg.Done()
		})
		this.temp.Remove(c)
	}
	this.wg.Wait()
}

Awake,Start,Destroy都是只执行一次,Update每帧都要执行,所以UpdateSystemUpdateFrame的实现略有不同,需要考虑到新加入和删除的component,同时每次执行后不执行remove操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (this *UpdateSystem) UpdateFrame() {
	this.locker.Lock()
	//添加上一帧新添加的组件
	if this.push.Len() > 0 {
		this.components.PushBackList(this.push)
		this.push = list.New()
	}
	//移除上一帧删除的组件,添加在前,移除在后,同一帧添加又移除,不执行update
	if this.pop.Len() > 0 {
		for p := this.pop.Front(); p != nil; p = p.Next() {
			for c := this.components.Front(); c != nil; c = c.Next() {
				if c.Value == p.Value {
					this.components.Remove(c)
				}
			}
		}
		this.pop = list.New()
	}
	this.locker.Unlock()

	Iter := iter.FromList(this.components)
	for c, err := Iter.Next(); err == nil; c, err = Iter.Next() {
		this.wg.Add(1)
		ctx := &Context{
			Runtime: this.runtime,
		}
		v := c.(IUpdate)
		this.runtime.workers.Run(func() {
			v.Update(ctx)
			this.wg.Done()
		})
	}
	this.wg.Wait()
}

component最基本的方法是AddComponent, 在AddComponent中通过GetRequire接口判断组件的依赖关系,依赖关系需要用户自己指明。然后通过SystemFiltercomponent放到对应的System中的list中,这样可以在UpdateFrame中得到执行。最后执行init.Initialize()函数。总结:当执行AddComponent时会立即执行其Initialize函数,并且Awake,Start,Update,Destroy会在后续由系统统一调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (o *Object) AddComponent(component IComponent) *Object {
	component.Init(reflect.TypeOf(component), o.Runtime(), o)
	err := o.WithLock(func() error {
		...
		if require, ok := component.(IRequire); ok {
			for obj, requires := range require.GetRequire() {
				for _, require := range requires {
					if !obj.HasComponent(require) {
						return ErrMissingComponent
					}
				}
			}
		}
		o.components = append(o.components, component)

		runtime := o._runtime()
		if runtime != nil {
			runtime.SystemFilter(component)
		}
		return nil
	})
	if err != nil {
		logger.Error(err)
	}
	utils.Try(func() {
		if init, ok := component.(IInit); ok {
			init.Initialize()
		}
	})
	return o
}

actor

actor模型对于整个的架构作用在于所有的远程的调用会以Actor为通信的起始点,只要知道ActorID就可以通过Tell方法通信。 ActorID是对NodeID的扩展,通过ActorID可以得到NodeID

1
2
Target such as "127.0.0.1:8888:XXXXXXX",
means "IP:PORT:LOCALACTORID"

Actor建立服务初始化服务接口使用AddHandler,每一个服务创建一个ActorComponent,并将ActorComponent(也可以看作是Actor)和ActorServiceActorComponentProxy中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (this *RoomManagerComponent) Awake(ctx *ecs.Context) {
	...
	//注册actor消息
	this.AddHandler(Service_RoomMgr_NewRoom, this.NewRoom, true)
}

func (this *ActorBase) AddHandler(service string, handler func(message *ActorMessageInfo) error, isService ...bool) {
	...
	this.MessageHandler[service] = handler
	actor := this.Actor()
	if actor != nil && len(isService) > 0 {
		err := actor.RegisterService(service)
		...
	}
}

func (this *ActorBase) Actor() *ActorComponent {
	...
	if this.actor == nil && this.parent != nil {
		err := this.parent.Find(&this.actor)
		if err != nil {
			actor := NewActorComponent(this.actorType)
			this.parent.AddComponent(actor)
			this.actor = actor
		}
	}
	return this.actor
}

一个Actor向另一个Actor总分为两种情况,一个是本地Actor调用,还有远程Actor调用。 gateroom发起请求,首先将自己提升为ActorServiceCaller,只是在ActorProxyComponent上封装一层缓存. Call的过程在没有缓存时首先获取ActorService.在AddHandler的时候每个ActorProxyComponent已经存有了ActorService,所以优先本地找服务,如果查找不到就需要到到MasterService或者LocationService查找(在rpc模块已经说过了),将查找到的ActorService返回,缓存后调用。这里最终的是本地ActorService和rpc返回的ActorService中的actor不是同一个类型。因为如果在本地的话,可以通过proxy直接获取ActorService,这时ActorService中的Actor实际上是ActorComponent,而通过rpc得到是actorID,重新初始化成的Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (this *TestApi) CreateRoom(sess *network.Session, message *TestCreateRoom) {
	//升级session为actor服务调用器
	serviceCaller, err := this.Upgrade(sess)
	...
	//调用创建房间服务
	reply, err := serviceCaller.Call("room", Service_RoomMgr_NewRoom, sess.ID)
}

func (this *ActorServiceCaller) Call(role string, serviceName string, args ...interface{}) ([]interface{}, error) {
	var err error
	//优先尝试缓存客户端,避免反复查询,尽量去中心化
	service, ok := this.services[serviceName]
	if ok {
		res, err := service.Call(args...)
		if err != nil {
			delete(this.services, serviceName)
		} else {
			return res, err
		}
	}
	//无缓存,或者通过缓存调用失败,重新查询调用
	service, err = this.proxy.GetActorService(role, serviceName)
	if err != nil {
		return nil, err
	}
	this.services[serviceName] = service
	res, err := service.Call(args...)
	if err != nil {
		delete(this.services, serviceName)
	}
	return res, err
}

func (this *ActorProxyComponent) GetActorService(role string, serviceName string) (*ActorService, error) {
	var service *ActorService
	var err error
	//优先尝试本地服务
	service, err = this.GetLocalActorService(serviceName)
	if err == nil {
		return service, nil
	}

	//获取远程服务
	if role == LOCAL_SERVICE {
		return nil, errors.New("role is empty")
	}
	client, err := this.nodeComponent.GetNodeClientByRole(role)
	if err != nil {
		return nil, err
	}
	var reply ActorID
	err = client.Call("ActorProxyService.ServiceInquiry", serviceName, &reply)
	if err != nil {
		return nil, err
	}
	return NewActorService(NewActor(reply, this), serviceName), nil
}

res, err := service.Call(args...)这里service中actor的实际类型是不确定的,如果是本地的Call,则ActorActorComponent,如果是远程Call,则Actor(概念)是Actor(interface)

1
2
3
4
5
6
7
8
9
func (this *ActorService) Call(args ...interface{}) ([]interface{}, error) {
	mes := NewActorMessage(this.Service, args...)
	reply := &ActorMessage{}
	err := this.actor.Tell(nil, mes, &reply)
	if err != nil {
		return nil, err
	}
	return reply.Data, nil
}

当是ActorComponent时,直接将消息放在本地的队列里。

1
2
3
4
5
6
7
8
9
10
func (this *ActorComponent) Tell(sender IActor, message *ActorMessage, reply ...**ActorMessage) error {
	messageInfo := &ActorMessageInfo{
		Sender:  sender,
		Message: message,
	}
	...
	this.queueReceive <- messageInfo
	...
	return messageInfo.err
}

当是Actor时,调用Emit,Emit先判断是否本地,如果本地流程和上面一样,如果不是本地就走rpc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (this *Actor) Tell(sender IActor, message *ActorMessage, reply ...**ActorMessage) error {
	messageInfo := &ActorMessageInfo{
		Sender:  sender,
		Message: message,
	}
	if len(reply) != 0 {
		messageInfo.reply = reply[0]
	}
	return this.proxy.Emit(this.ID(), messageInfo)	
}

func (this *ActorProxyComponent) Emit(actorID ActorID, messageInfo *ActorMessageInfo) error {
	senderID := "unknown"
	if messageInfo.Sender != nil {
		senderID = messageInfo.Sender.ID().String()
	}
	logger.Debug(fmt.Sprintf("actor: [ %s ] send message [ %s ] to actor [ %s ]", senderID, messageInfo.Message.Service, actorID.String()))
	nodeID := actorID.GetNodeID()

	//本地消息不走网络
	if nodeID == this.nodeID {
		return this.LocalTell(actorID, messageInfo)
	}
	//非本地消息走网络代理
	client, err := this.nodeComponent.GetNodeClient(nodeID)
	if err != nil {
		return err
	}
	var sender ActorID
	if messageInfo.Sender != nil {
		sender = messageInfo.Sender.ID()
	}
	err = client.Call("ActorProxyService.Tell", &ActorRpcMessageInfo{
		Target:  actorID,
		Sender:  sender,
		Message: messageInfo.Message}, messageInfo.reply)
	if err != nil {
		return err
	}
	return nil
}

这里应该会有疑问,当时Actor已经确定是远程调用了,为什么还要判断是否是本地。原因在于远程调用的函数ActorProxyService.Tell调用的也是Emit,对于本地来说是远端,对于远端是本地,代码直接复用。

1
2
3
4
5
6
7
8
func (this *ActorProxyService) Tell(args *ActorRpcMessageInfo, reply *ActorMessage) error {
	minfo := &ActorMessageInfo{
		Sender:  NewActor(args.Sender, this.proxy),
		Message: args.Message,
		reply:   &reply,
	}
	return this.proxy.Emit(args.Target, minfo)
}

接收方是ActorComponent中的queueReceive chan *ActorMessageInfo //接收消息队列,在ActorComponentInitializego this.dispatch()。接收到消息就会开始处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (this *ActorComponent) dispatch() {
	var messageInfo *ActorMessageInfo
	var ok bool
	for {
		select {
		case <-this.close:
			atomic.StoreInt32(&this.active, 0)
			close(this.queueReceive)
			//收到关闭信号后会继续处理完剩余消息
		case messageInfo, ok = <-this.queueReceive:
			if !ok {
				return
			}
			switch this.ActorType {
			case ACTOR_TYPE_SYNC:
				this.handle(messageInfo)
			case ACTOR_TYPE_ASYNC:
				go this.handle(messageInfo)
			default:

			}
		}
	}
}

这里的处理对所有Component做筛选,所以处理函数都是形如func(message *ActorMessageInfo) error,然后具体执行函数体。执行结束标记this.isDone = true,回包是rpc实现的,所以即使本身节点没有提供服务的Actor,但是可以向其他Actor请求并正确回包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (this *ActorComponent) handle(messageInfo *ActorMessageInfo) {
	cps := this.Parent().AllComponents()
	var err error = nil
	var val interface{}
	for val, err = cps.Next(); err == nil; val, err = cps.Next() {
		if messageHandler, ok := val.(IActorMessageHandler); ok {
			if handler, ok := messageHandler.MessageHandlers()[messageInfo.Message.Service]; ok {
				this.Catch(handler, messageInfo)
			}
		}
	}
}

func (this *ActorComponent) Catch(handler func(message *ActorMessageInfo) error, m *ActorMessageInfo) {
	...
	err := handler(m)
	if err != nil {
		m.replyError(err)
	} else {
		m.replySuccess()
	}
}

func (this *ActorMessageInfo) replySuccess() {
	if this.done != nil && !this.isDone {
		this.done <- struct{}{}
		this.isDone = true
	}
}

通过reflect调用函数

通过下面两种方式调用都可以,第一种不用传调用本身,第二种需要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Stu struct {
	Name string
}

func (this *Stu) Fn(p1, p2 int) int {
	return p1 + p2
}

func main() {
	s := &Stu{"Hank"}
	valueS := reflect.ValueOf(s)
	method := valueS.MethodByName("Fn")
	paramList := []reflect.Value{
		reflect.ValueOf(22),
		reflect.ValueOf(20),
	}
	resultList := method.Call(paramList)
	fmt.Println(resultList[0].Int()) // 42

	paramList2 := []reflect.Value{
		reflect.ValueOf(s),
		reflect.ValueOf(22),
		reflect.ValueOf(20),
	}

	typ := reflect.TypeOf(s)
	for m := 0; m < typ.NumMethod(); m++ {
		method := typ.Method(m)
		mtype := method.Type
		mname := method.Name
		fmt.Println(mtype, " ", mname)
		fmt.Println(method.Func.Call(paramList2)[0].Int()) // 42
	}
}