linux-io

少说话,多看代码,多思考,多动手

io操作是一个程序必不可少的部分,io可以是对块设备的读写,文件的读写,基于网络上的io,这里通过内核select和epoll源码比较下这两种io的优劣。平常服务器需要处理客户端多个连接(基于tcp)的io事件。

对io的读写是有阻塞和非阻塞之分的,对于tcp连接上的读写实际是基于它的读写缓存区,如果fd是阻塞的话,对读来说,缓冲区没数据,对写来说,缓冲区不足够装下需要写的数据或已经满了同样会阻塞。下面是一个阻塞和非阻塞读取标准输入的程序。

阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
int main(){
char buf[10];
while(1){
int len = read(STDIN_FILENO, buf, 10);
printf("output len %d, %s\n", len, buf);
fflush(STDIN_FILENO);
}
}

非阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
int main(){
int hd = open("test.txt", O_CREAT|O_RDONLY|O_NONBLOCK, 0666);
char buf[5];
while(1){
memset(buf, 0, 5);
int len = read(hd, buf, 5);
if (0 == len){
continue;
}
sleep(1);
printf("read %d %s\n", len, buf);
}
}

echo "123456" >> test.txt

在不用select或epoll接口时,如果程序需要处理客户端多个连接将会导致程序阻塞,不适合处理多个阻塞流,如果非阻塞流,就需要一直轮循,导至浪费cpu时间。有了系统的select和epoll接口后就不需要用户去查找哪些fd是需要处理的。所以调用select和epoll后我们可以拿到我们需要处理的fd。下面来分析下内核源码。

select

相关接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
FD_ZERO(fd_set *fdset) //清空fd_set数组
FD_SET(int fd, fd_set *fdset) //保存在fd_set数组里
FD_CLR(int fd, fd_set *fdset) //从fd_set里移除
FD_ISSET(int fd, fdset *fdset) //检查fdset联系的文件句柄fd是否可读写,>0表示可读写。
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
/*
nfds : 最大的fd值+1, 一定要加上
readfds : 用来监听可读的fd集合
writefds : 可写
exceptfds : 异常的
timeout : 为0立即返回,如果传null将一直阻塞到有fd可用
返回准备好的fd个数
*/

内核代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
int do_select(int n, fd_set_bits *fds, long *timeout)
{
struct poll_wqueues table;
poll_table *wait;
int retval, i;
long __timeout = *timeout;
spin_lock(&current->files->file_lock);
retval = max_select_fd(n, fds);
spin_unlock(&current->files->file_lock);
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table);
wait = &table.pt;
if (!__timeout)
wait = NULL;
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
set_current_state(TASK_INTERRUPTIBLE);
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
struct file_operations *f_op = NULL;
struct file *file = NULL;
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += __NFDBITS;
continue;
}
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
if (i >= n)
break;
if (!(bit & all_bits))
continue;
file = fget(i);
if (file) {
f_op = file->f_op;
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll)
mask = (*f_op->poll)(file, retval ? NULL : wait);
fput(file);
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
}
}
cond_resched();
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
}
wait = NULL;
if (retval || !__timeout || signal_pending(current))
break;
if(table.error) {
retval = table.error;
break;
}
__timeout = schedule_timeout(__timeout);
}
__set_current_state(TASK_RUNNING);
poll_freewait(&table);
/*
* Up-to-date the caller timeout.
*/
*timeout = __timeout;
return retval;
}

这个函数大概就是把传进来的读,写、异常的fd集合做了个遍历,返回了准备好的fd个数,用户需要在内核空间返回后再根据这个返回值做一次遍历。

epoll

相关接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int epoll_create(int size); // 向内核请求创建一个epoll句柄,size大小在2.6.8以后已经不用了
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/*
EPOLL_CTL_ADD 向epoll新增一个fd
EPOLL_CTL_DEL 从epoll移除一个fd
EPOLL_CTL_MOD 改变fd关联的event
*/
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

内核代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* This structure is stored inside the "private_data" member of the file
* structure and rapresent the main data sructure for the eventpoll
* interface.
*/
struct eventpoll {
/* Protect the this structure access */
rwlock_t lock;
/*
* This semaphore is used to ensure that files are not removed
* while epoll is using them. This is read-held during the event
* collection loop and it is write-held during the file cleanup
* path, the epoll file exit code and the ctl operations.
*/
struct rw_semaphore sem;
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
struct list_head rdllist;
/* RB-Tree root used to store monitored fd structs */
struct rb_root rbr;
};
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
/*
* Calculate the timeout by checking for the "infinite" value ( -1 )
* and the overflow condition. The passed timeout is in milliseconds,
* that why (t * HZ) / 1000.
*/
jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ?
MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;
retry:
write_lock_irqsave(&ep->lock, flags);
res = 0;
if (list_empty(&ep->rdllist)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
add_wait_queue(&ep->wq, &wait);
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
write_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
write_lock_irqsave(&ep->lock, flags);
}
remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
eavail = !list_empty(&ep->rdllist);
write_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}

内核在获取多准备好的fd的时候是通过ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key);回调函数放到准备好的队列,然后再把准备好的队列放到epoll_wait里面的events参数里面.

epoll相比select的多路复用的机制监控的fd不受fd_set数组的限制,准备好的fd是通过回调(epoll_ctl新增时注册)放到rdlist里面然后再放回用户空间,用户空间就不需要再像select那样去遍历数组去检查是否可读.