skynet

skynet学习

Posted by Yaoxh6 on August 7, 2024

启动流程

入口在skynet_main.c文件的main函数, 在必要的初始化之后调用skynet_start,这里为所有线程以及模块加载的入口。

skynet_start

在这个函数里主要做了两件事情,第一初始化了logger模块,第二个启动bootstrap,第三是创建了多线程用于整个游戏的服务。

logger

logger模块一般是最先启动的模块,通过skynet_context_new启动,其他的模块也是同样的启动方式。首先是skynet_module_query,这里的主要作用是通过名字找到对应的so文件,并且绑定对应的函数到module中。

1
2
3
4
5
6
7
8
struct skynet_module {
	const char * name;
	void * module;
	skynet_dl_create create;
	skynet_dl_init init;
	skynet_dl_release release;
	skynet_dl_signal signal;
};

logger为例,则会找到logger.so,并且将service_logger.c中的logger_init绑定为module中的init函数。init函数主要区分有无文件名字参数,并且绑定回调函数。启动skynet之后,所见到的日志即为logger_cb的调用。那logger_cb是如何调用的?skynet为actor模型,logger服务的调用来源是从消息队列取出并且执行。从头开始可追溯到skynet_error函数。首先通过skynet_handle_findname取得logger服务,在skynet中,name,handle以及具体的server是互相绑定的。经过对具体log的一些封装,最后调用skynet_context_push,放到消息队列中。skynet每个服务对应的消息队列是循环队列,通过有单向链表构成的全局队列。具体的message先入本地队列再将本地队列放入到全局队列中。至此,消息入队列。

消息从队列中到被执行,从头追溯可到thread_workerskynet_context_message_dispatch,不断从全局队列中取出消息并且执行,首先skynet_globalmq_pop,然后通过skynet_mq_handle得到handle,得到handle就可以拿到具体的服务。不停的通过skynet_mq_pop将消息取出队列,并且判断ctx->cb是否存在并且执行,所以最后会执行到logger_cb

bootstrap

bootstrap默认参数使用config中的snlua bootstrap,同样使用skynet_context_new新建一个server, server为snlua,参数为bootstrap,直接跳转到snlua_init。首先skynet_callback(ctx, l , launch_cb);,设置自己的回调函数为launch_cb,然后调用const char * self = skynet_command(ctx, "REG", NULL);通过skynet_command调用到cmd_reg,并且拿回现在的handle_id,再加一为自己的handle_id,最后skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);给自己发送消息,skynet_send内部也会调用skynet_context_push,走的就是消息发送和接受那套流程。所以会调到launch_cb,这里清空回调函数,并且调用init_cb,传的参数是bootstrap.init_cb里面设置一些变量,最后调用loader.lua,参数是bootstrap,启动bootstrap.lua作为第一个服务。优先启动skynet.launch("snlua","launcher")launcher服务,并命名为.launcher,随后通过skynet.newservice启动cmastercslavedatacenterdservice_mgrmain。在main.lua中,再启动protoloaderconsoledebug_consolesimpledbwatchdog

有关bootstrap中的start函数调用流程。相当于调用skynet.lua中的函数.

1
2
3
4
5
6
7
function skynet.start(start_func)
	c.callback(skynet.dispatch_message)
	init_thread = skynet.timeout(0, function()
		skynet.init_service(start_func)
		init_thread = nil
	end)
end

首先将回调函数设置为dispatch_message,然后通过skynet.timeout触发,local session = cintcommand("TIMEOUT", timeout)发送TIMEOUT信号到队列,同时创建协程(有就复用)co_create_for_timeout->co_create,保存session和co,session_id_coroutine[session] = co。回调函数触发raw_dispatch_message,取出co,恢复执行。local co = session_id_coroutine[session]suspend(co, coroutine_resume(co, true, msg, sz, session)).

co_create有两种情况, 从协程池中取出或者新建,一开始一定是新建,通过回调触发后会让出执行权f = coroutine_yield "SUSPEND",并且保存在协程池中,下一次如果可以从协程池中取出,执行coroutine_resume(co, f),那么f = coroutine_yield "SUSPEND"中的f就为想要resume的f,此时f还不会立刻执行,因为要等到消息回调触发,所以f(coroutine_yield())再次让出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
local function co_create(f)
	local co = tremove(coroutine_pool)
	if co == nil then
		co = coroutine_create(function(...)
			f(...)
			while true do
				local session = session_coroutine_id[co]
				if session and session ~= 0 then
					local source = debug.getinfo(f,"S")
					skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
						session,
						skynet.address(session_coroutine_address[co]),
						source.source, source.linedefined))
				end
				-- coroutine exit
				local tag = session_coroutine_tracetag[co]
				if tag ~= nil then
					if tag then c.trace(tag, "end")	end
					session_coroutine_tracetag[co] = nil
				end
				local address = session_coroutine_address[co]
				if address then
					session_coroutine_id[co] = nil
					session_coroutine_address[co] = nil
				end

				-- recycle co into pool
				f = nil
				coroutine_pool[#coroutine_pool+1] = co
				-- recv new main function f
				f = coroutine_yield "SUSPEND"
				f(coroutine_yield())
			end
		end)
	else
		-- pass the main function f to coroutine, and restore running thread
		local running = running_thread
		coroutine_resume(co, f)
		running_thread = running
	end
	return co
end

skynet.send,skynet.call和skynet.ret

skynet.send

skynet.send相对简单,调用的是c.send, c又是skynet.core

1
2
3
4
5
6
local c = require "skynet.core"
...
function skynet.send(addr, typename, ...)
	local p = proto[typename]
	return c.send(addr, p.id, 0 , p.pack(...))
end

所以直接找到luaopen_skynet_core,连同send还有其他一些函数。send->lsend->send_message->skynet_send | skynet_sendname->skynet_send,最后的入口就是skynet_send.

skynet.call和skynet.ret

skynet.call相对复杂,顺带一提skynet.newservice是对skynet.call的简单封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function skynet.newservice(name, ...)
	return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
end

function skynet.call(addr, typename, ...)
	local tag = session_coroutine_tracetag[running_thread]
	if tag then
		c.trace(tag, "call", 2)
		c.send(addr, skynet.PTYPE_TRACE, 0, tag)
	end

	local p = proto[typename]
	local session = auxsend(addr, p.id , p.pack(...))
	if session == nil then
		error("call to invalid address " .. skynet.address(addr))
	end
	return p.unpack(yield_call(addr, session))
end

skynet.call主要包含两步,一步是c.send,一步是p.unpack(yield_call(addr, session))即返回回包。yield_call所得的结果来自local succ, msg, sz = coroutine_yield "SUSPEND",之前说过回包是通过协程。回包使用的是skynet.ret,也是对c.send的封装,local ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, msg, sz),消息类型是PTYPE_RESPONSE,所以消息还是来自于raw_dispatch_message中的suspend(co, coroutine_resume(co, true, msg, sz, session))这里的msg,sz即为返回的值。所以不论是callback还是call都是通过协程返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
local function raw_dispatch_message(prototype, msg, sz, session, source)
	-- skynet.PTYPE_RESPONSE = 1, read skynet.h
	if prototype == 1 then
		local co = session_id_coroutine[session]
		if co == "BREAK" then
			session_id_coroutine[session] = nil
		elseif co == nil then
			unknown_response(session, source, msg, sz)
		else
			local tag = session_coroutine_tracetag[co]
			if tag then c.trace(tag, "resume") end
			session_id_coroutine[session] = nil
			suspend(co, coroutine_resume(co, true, msg, sz, session))
		end
	else
		local p = proto[prototype]
		if p == nil then
			if prototype == skynet.PTYPE_TRACE then
				-- trace next request
				trace_source[source] = c.tostring(msg,sz)
			elseif session ~= 0 then
				c.send(source, skynet.PTYPE_ERROR, session, "")
			else
				unknown_request(session, source, msg, sz, prototype)
			end
			return
		end

		local f = p.dispatch
		if f then
			local co = co_create(f)
			session_coroutine_id[co] = session
			session_coroutine_address[co] = source
			local traceflag = p.trace
			if traceflag == false then
				-- force off
				trace_source[source] = nil
				session_coroutine_tracetag[co] = false
			else
				local tag = trace_source[source]
				if tag then
					trace_source[source] = nil
					c.trace(tag, "request")
					session_coroutine_tracetag[co] = tag
				elseif traceflag then
					-- set running_thread for trace
					running_thread = co
					skynet.trace()
				end
			end
			suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
		else
			trace_source[source] = nil
			if session ~= 0 then
				c.send(source, skynet.PTYPE_ERROR, session, "")
			else
				unknown_request(session, source, msg, sz, proto[prototype].name)
			end
		end
	end
end

skynet network

一开始skynet没有包含这部分,socket相关是后续补充而来。 network相关的操作大部分require "skynet.socketdriver"所以入口在 luaopen_skynet_socketdriver.

网络连接流程

socket.listen

llisten->skynet_socket_listen->socket_server_listen->send_request->write,按照如此流程写入到管道.request_package包含headerdata. 关于socket消息的读取,最早可追溯到thread_socketskynet_socket_poll是无限循环,用来处理有关网络的数据,分为两步has_cmd(ss)第一用select单独处理recvctrl_fd(也用sp_add加入到了epoll),第二用ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);封装了epoll_wait.对于listen事件,最终走ctrl_cmd->listen_socket.skynet_socket_poll处理完数据后,再通过forward_message->skynet_context_push给队列发送消息.

socket.start和socket.connect

1
2
3
4
function socket.start(id, func)
	driver.start(id)
	return connect(id, func)
end

driverl.start的流程

driver.start->lstart->skynet_socket_start->socket_server_start->send_request(ss, &request, 'R', sizeof(request.u.resumepause));->resume_socket->forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);

connect的流程`

connect->suspend->skynet.wait->suspend_sleep->coroutine_yield "SUSPEND" 所以connect的流程会在用协程让出控制权,直到start流程中的消息被处理,唤醒协程.

这里的协程不同于之前的PTYPE_RESPONSE,走PTYPE_SOCKET对应的分支local f = p.dispatchsuspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))).

1
2
3
4
5
6
7
8
skynet.register_protocol {
	name = "socket",
	id = skynet.PTYPE_SOCKET,	-- PTYPE_SOCKET = 6
	unpack = driver.unpack,
	dispatch = function (_, _, t, ...)
		socket_message[t](...)
	end
}

先创建了dispatch的协程,根据协议SKYNET_SOCKET_TYPE_CONNECT,执行下面函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, ud , addr)
	local s = socket_pool[id]
	if s == nil then
		return
	end
	-- log remote addr
	if not s.connected then	-- resume may also post connect message
		if s.listen then
			s.addr = addr
			s.port = ud
		end
		s.connected = true
		wakeup(s)
	end
end

设置s.connected = true,最后wakeup将协程放入到wakeup队列,最后再通过skynet.lua中的suspend唤醒.