skynet

skynet源码阅读及实践总结

skynet是一个云风大大开源的一款基于c+lua的框架,架子由c语言编写完成,逻辑上用lua来实现的一个框架

如果不关心框架的话,至少需要知道这框架里面有些什么东西,知道他的来龙去脉,由于工作原由断断续续的深入了解这个框架。由于日常开发没太在意底层实现原理,有时看下底层代码后当时知道了原理,之后再来看时基本忘记得差不多了。所以由此需要做一些笔记和深入的思考


1. 两个服务之间通信就必须知道消息发送方和消息接收方以及消息和消息的长度,skynet里面对发送一条消息的参数注释如下:

1
2
3
4
5
6
7
uint32_t address
string address
uint32_t type
uint32_t session
string message
lightuserdata message_ptr
uint32_t len
  • address 指目标接收方的服务,服务用一个uint32的值来标识,也可以为一个服务取一个string类型的名称来唯一
  • type 指发送的消息的类型
  • session 指消息发送方对此次发送这条消息的会话标识,默认传0,传空底层会为其生成一个
  • message 指消息内容,消息内容有可能也是一个message_ptr
1
2
#define PTYPE_TAG_DONTCOPY 0x10000 //用 type | PTYPE_TAG_DONTCOPY 来指定消息是否需要copy一份
#define PTYPE_TAG_ALLOCSESSION 0x20000 //用 type | PTYPE_TAG_ALLOCSESSION 来指定是否需要创建session

具体可看skynet\lualib-src\lua-skynet.c ——> _send(...)函数实现

服务与服务之间实际传递的消息结构体如下:

1
2
3
4
5
6
7
8
9
struct skynet_message {
uint32_t source; //来源哪个服务
int session; //发送方此次会话的标识,非阻塞值为0,阻塞调用为发送方递增的值
void * data; //发送的消息体
size_t sz; //发送的消息大小
};
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)

tips:

消息类型保存在 sz |= (size_t)type << MESSAGE_TYPE_SHIFT
拿回类型用 sz >> MESSAGE_TYPE_SHIFT 长度用sz & MESSAGE_TYPE_MASK

type的值包括以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
local skynet = {
-- read skynet.h
PTYPE_TEXT = 0,
PTYPE_RESPONSE = 1,
PTYPE_MULTICAST = 2,
PTYPE_CLIENT = 3,
PTYPE_SYSTEM = 4,
PTYPE_HARBOR = 5,
PTYPE_SOCKET = 6,
PTYPE_ERROR = 7,
PTYPE_QUEUE = 8, -- used in deprecated mqueue, use skynet.queue instead
PTYPE_DEBUG = 9,
PTYPE_LUA = 10,
PTYPE_SNAX = 11,
}

一直在想type的值的设置和取值为什么这样设计,也查阅了一些位操作方面的知识,还是不太知道,后问了下一老司机同事,他翻开云风大大博客对这样的设计已经给出了答案,阅读和思考后最终还是清楚了。

因为type的值是一个byte,取值范围也就是0-255,一个在服务内传递的消息的在16M(24个bit 214*210)以内,所以把type的值放在sz的最高位来保存。所以在取type值的时候sz的二进制左移MESSAGE_TYPE_SHIFT即可,
而sz的值的话只要取除去最高位再sz的二进制右移高位即MESSAGE_TYPE_MASK。

2. skynet里面是由各种lua服务来组成的,必然服务之间需要能信来往,当然也就包含阻塞发消息和非阻塞的方式

服务之间的消息流如下:

1
2
3
Service A->Service B: Hello B,我发了一条消息给你嘞!
Note right of Service B: Service B 可能会阻塞调用其它服务哦!
Service B-->Service A: Yes,我收到了,这是我回你的包!

虚线代表有些是不需要B回包的,此次调用可能是阻塞或非阻塞

  • 非阻塞调用

    1
    2
    3
    4
    function skynet.send(addr, typename, ...)
    local p = proto[typename]
    return c.send(addr, p.id, 0 , p.pack(...))
    end
  • 阻塞调用

    1
    2
    3
    4
    5
    6
    7
    8
    function skynet.call(addr, typename, ...)
    local p = proto[typename]
    local session = c.send(addr, p.id , nil , p.pack(...))
    if session == nil then
    error("call to invalid address " .. skynet.address(addr))
    end
    return p.unpack(yield_call(addr, session))
    end
  • 阻塞与非阻塞的send函数的第三个参数分别为nil和0,nil意思就是session是由skynet分配来分配
  • 阻塞调用,如果收到阻塞调用的服务再阻塞去调其它服务,所有阻塞调用的服务的当前coroutine都会进入睡眠状态,等待回包被唤醒,进入睡眠并不影响lua_Stat进行其它的任何工作。
  • 如果服务的dispatch函数没有返回值则不能用call来调用此服务,否则call的coroutine会一直阻塞中

代码实现在skynet\lualib\skynet.lua ——> raw_dispatch_message(…)函数里面,实际此函数也是每个服务收到消息后的回调函数

消息的内存管理是由发送方申请,消息接收端在回调成功后释放掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
if (ctx->logfile) {
skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
}
if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) {
skynet_free(msg->data); //内存释放
}
CHECKCALLING_END(ctx)
}
//代码文件 `skynet\skynet-src\skynet_server.c`

3. 进程启动过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void
skynet_start(struct skynet_config * config) {
if (config->daemon) {
if (daemon_init(config->daemon)) {
exit(1);
}
}
skynet_harbor_init(config->harbor); //用于集群id初始化
skynet_handle_init(config->harbor); //上面所说的进程内所有service的handle及保存service的名称
skynet_mq_init(); //全局队列初始化
skynet_module_init(config->module_path); //加载service-src下面的模块 logger harbor snlua
skynet_timer_init(); //定时器初始化
skynet_socket_init(); //套接字初始化
struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}
//lua脚本加载
bootstrap(ctx, config->bootstrap);
//启动线程及监听相关
start(config->thread);
// harbor_exit may call socket send, so it should exit before socket_free
skynet_harbor_exit();
skynet_socket_free();
if (config->daemon) {
daemon_exit(config->daemon);
}
}

skynet启动进程主要与服务相关代码如上面

每个lua服务都有自己的一个队列,并且创建后就会放到全局队列里面,有工作线程会去从全局队列里面去拿服务队列并处理完一条消息后若全局队列不为空再放到全局队列里去返回下个需要处理的部队。为空则返回自己服务的队列

  1. 回调函数调用
    在加载lua代码的时候skynet.start(...)函数里面注册了回调函数
1
2
3
4
5
6
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
skynet.timeout(0, function()
skynet.init_service(start_func)
end)
end

4. skynet服务

4.1 服务标识
  • skynet为了方便针记住某个服务,可以为每个独立的服务起一个名字,名字格式.name

一个服务的名字可以重复设置吗?(可以)
两个服务的名字可以相同吗?(不可以,已经有了这样一个名字直接返回空)


4.1 服务名字存储及handle的存储结构和服务相关的命令映射

带着这些疑问就去翻阅代码去解决脑海里的疑点,首先从skynet.name('.myservice')的代码跟到他调用的c函数,所有处理针对服务的一些函数都在skynet_server.c文件里面,最后按参数找到最终处理名字的逻辑skynet_handle.c,大体看了下简单的实现,代码说明按自己的理解整理下。

  • 数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct handle_name {
char * name;
uint32_t handle;
};
struct handle_storage {
struct rwlock lock;
uint32_t harbor;
uint32_t handle_index;
int slot_size;
struct skynet_context ** slot;
int name_cap;
int name_count;
struct handle_name *name;
};
//handle_storage所存一个skynet进程下面所有lua服务的handle和所有为服务设置了名称的handle_name
  • 命令映入结构
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    static struct command_func cmd_funcs[] = {
    { "TIMEOUT", cmd_timeout },
    { "REG", cmd_reg },
    { "QUERY", cmd_query },
    { "NAME", cmd_name },
    { "EXIT", cmd_exit },
    { "KILL", cmd_kill },
    { "LAUNCH", cmd_launch },
    { "GETENV", cmd_getenv },
    { "SETENV", cmd_setenv },
    { "STARTTIME", cmd_starttime },
    { "ENDLESS", cmd_endless },
    { "ABORT", cmd_abort },
    { "MONITOR", cmd_monitor },
    { "MQLEN", cmd_mqlen },
    { "LOGON", cmd_logon },
    { "LOGOFF", cmd_logoff },
    { "SIGNAL", cmd_signal },
    { NULL, NULL },
    };
4.2 服务名字代码说明下服务名set和get
  • 服务名set
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static const char *
_insert_name(struct handle_storage *s, const char * name, uint32_t handle) {
int begin = 0;
int end = s->name_count - 1;
while (begin<=end) {
int mid = (begin+end)/2;
struct handle_name *n = &s->name[mid];
int c = strcmp(n->name, name);
if (c==0) {
return NULL;
}
if (c<0) {
begin = mid + 1;
} else {
end = mid - 1;
}
}
char * result = skynet_strdup(name);
_insert_name_before(s, result, handle, begin);
return result;
}
//用二分法遍历已经注册了名称的数组,如果找到同名的服务直接return(并不会报错,lua层只会得到nil最好用assert来处理),否则按字符串的大小插入合适的位置
  • 服务名get

通过服务名来找handle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
uint32_t
skynet_handle_findname(const char * name) {
struct handle_storage *s = H;
rwlock_rlock(&s->lock);
uint32_t handle = 0;
int begin = 0;
int end = s->name_count - 1;
while (begin<=end) {
int mid = (begin+end)/2;
struct handle_name *n = &s->name[mid];
int c = strcmp(n->name, name);
if (c==0) {
handle = n->handle;
break;
}
if (c<0) {
begin = mid + 1;
} else {
end = mid - 1;
}
}
rwlock_runlock(&s->lock);
return handle;
}
//同样也是二分法来查找已知的服务名

服务的handle是怎么够用的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
uint32_t
skynet_handle_register(struct skynet_context *ctx) {
struct handle_storage *s = H;
rwlock_wlock(&s->lock);
for (;;) {
int i;
for (i=0;i<s->slot_size;i++) {
uint32_t handle = (i+s->handle_index) & HANDLE_MASK;
int hash = handle & (s->slot_size-1);
if (s->slot[hash] == NULL) {
s->slot[hash] = ctx;
s->handle_index = handle + 1;
rwlock_wunlock(&s->lock);
handle |= s->harbor;
return handle;
}
}
assert((s->slot_size*2 - 1) <= HANDLE_MASK);
struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *));
memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *));
for (i=0;i<s->slot_size;i++) {
f type then
MSG[type](...)
end
end
}
poll

    pipe命令读取
                _________
                |   L      Listen继续处理poll事件
    1           |   S      退出poll事件,返回SOCKET_OPEN
                |   B      
                |   O

    socket事件处理

    2           events
                    accepte
                    connect
                    data

poll后再返回给指定的service去处理消息
  • 接着是start这个fd,实际是记录到socket_server这个里面去

这两步就是创建一个listen的fd了,接下来只需要accepte客户端连接了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*
The first byte is TYPE
S Start socket
B Bind socket
L Listen socket
K Close socket
O Connect to (Open)
X Exit
D Send package (high)
P Send package (low)
A Send UDP package
T Set opt
U Create UDP socket
C set udp address
*/
struct request_package {
uint8_t header[8]; // 6 bytes dummy
union {
char buffer[256];
struct request_open open;
struct request_send send;
struct request_send_udp send_udp;
struct request_close close;
struct request_listen listen;
struct request_bind bind;
struct request_start start;
struct request_setopt setopt;
struct request_udp udp;
struct request_setudp set_udp;
} u;
uint8_t dummy[256];
};
/*
header[6] header[7] 两个字节+len(union)结构体
write(fd, &header[6], len+2)
*/
struct socket_server {
int recvctrl_fd;
int sendctrl_fd;
int checkctrl;
poll_fd event_fd;
int alloc_id;
int event_n;
int event_index;
struct socket_object_interface soi;
struct event ev[MAX_EVENT];
struct socket slot[MAX_SOCKET];
char buffer[MAX_INFO];
uint8_t udpbuffer[MAX_UDP_PACKAGE];
fd_set rfds;
};

tcp包处理

收到包如何通知gateserver?
包大小?一个没没读玩又来了另一个包怎么处理?

tcp网络上实际传输的是字节流,所以我们在处理自定义的网络数据包的时候一般都是 len+data来作为一个数字包在tcp上传送,skynet框架是两个字节的长度,

skynet io是读指定长度的包再抛到上面来,处理包的完整性在lua-netpack.c,socket的消息描述如下:

参考socket_server.c : function forward_message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct skynet_socket_message {
int type;
int id;
int ud;
char * buffer;
};
/*
Each package is uint16 + data , uint16 (serialized in big-endian) is the number of bytes comprising the data .
*/
struct netpack {
int id; // 收包的fd
int size; // 包的大小
void * buffer; // 包的具体内容
};
struct uncomplete {
struct netpack pack; // 已经从fd上读取的消息
struct uncomplete * next; // 下一条消息
int read; // 不完整的包已经读取的长度
int header; // 用来存取第一个包只有一个字节的时候用
};
struct queue {
int cap; // 队列的容量
int head; // 队列头从0开始pop后++
int tail; // 队列尾从0开始push后++
struct uncomplete * hash[HASHSIZE]; // 不完整包根据fd然后hash存取
struct netpack queue[QUEUESIZE]; // 完整包的数组
};

放到skyknet里面的service回调函数处理这个包,其它的不多说了,直接看lua-netpack.c : lfilter(...)里面的源码

7. sample架子理解

把下面的关系弄清楚

client

login

gateserver

agent

client 是客户端发起登录的对象
login 是处理client登录请求,成功后告知gateserver
gateserver 类似agent的一房门。主要负责管理网络这块的事件传递到agent
agent 是消息实际处理者,也就是操作fd连接的