0%

Sysdig Source Code Analysis II

一、前言

之前记录了sysdig驱动部分的源码理解,还缺少用户态的捕获解析过程的分析,这边补上。因为真实的项目中没有用到sysdig的CLI解析引擎,而是自研的,所以关于libsinsp部分针对具体事件的解析过滤逻辑相关的源码分析将不会去进行(或者是等后续需要的时候)。

二、架构

虽然上篇已经放过了,这里再贴一下:

image-20210704222658894

上篇主要讲的是驱动这块的整体实现逻辑,包括数据如何采集、如何放入ringbuffer、ringbuffer如何进行状态管理等;现在数据已经入了ringbuffer了,问题的关键变成了如何从ringbuffer中取出数据,如何对取出数据后的ringbuffer做状态管理等。

三、分析

以sysdig在userspace/libscap/examples/01-open/test.c测试程序进行分析:

主要逻辑很短,有两个部分需要注意,分别是scap_open_live、scap_next,scap的主要逻辑都包含在这两个函数中;先贴下源码:

main函数的开头部分就是一个异常处理回调的注册逻辑,在程序中断时会触发signal_callback函数中的逻辑获取scap handle的状态并输出,主要包括用户态接受到的事件数、内核态写入ringbuffer的事件数、因ringbuffer写满丢弃的事件数、因无效内存访问丢弃的事件数、因cpu抢占丢弃的事件数、因内核bug丢弃的事件数等等;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <stdio.h>
#include <signal.h>
#include <scap.h>

uint64_t g_nevts = 0;
scap_t* g_h = NULL;

static void signal_callback(int signal)
{
scap_stats s;
printf("events captured: %" PRIu64 "\n", g_nevts);
scap_get_stats(g_h, &s);
printf("seen by driver: %" PRIu64 "\n", s.n_evts);
printf("Number of dropped events: %" PRIu64 "\n", s.n_drops);
printf("Number of dropped events caused by full buffer: %" PRIu64 "\n", s.n_drops_buffer);
printf("Number of dropped events caused by invalid memory access: %" PRIu64 "\n", s.n_drops_pf);
printf("Number of dropped events caused by an invalid condition in the kernel instrumentation: %" PRIu64 "\n", s.n_drops_bug);
printf("Number of preemptions: %" PRIu64 "\n", s.n_preemptions);
printf("Number of events skipped due to the tid being in a set of suppressed tids: %" PRIu64 "\n", s.n_suppressed);
printf("Number of threads currently being suppressed: %" PRIu64 "\n", s.n_tids_suppressed);
exit(0);
}

int main(int argc, char** argv)
{
char error[SCAP_LASTERR_SIZE];
int32_t res;
scap_evt* ev;
uint16_t cpuid;

if(signal(SIGINT, signal_callback) == SIG_ERR)
{
fprintf(stderr, "An error occurred while setting SIGINT signal handler.\n");
return -1;
}

g_h = scap_open_live(error, &res);
if(g_h == NULL)
{
fprintf(stderr, "%s (%d)\n", error, res);
return -1;
}

while(1)
{
res = scap_next(g_h, &ev, &cpuid);

if(res > 0)
{
fprintf(stderr, "%s\n", scap_getlasterr(g_h));
scap_close(g_h);
return -1;
}

if(res != SCAP_TIMEOUT)
{
g_nevts++;
}
}

scap_close(g_h);
return 0;
}

之后是scap_open_live函数,可以发现它调用了scap_open_live_int函数,它的流程基本如下:

首先创建一个scap类型的handle并分配空间,这个scap的结构体非常复杂,内部也有其他结构体嵌套,其中m_devs成员较为重要,保存了handle所打开设备的所有描述信息,包括所属设备的ringbuffer地址、本次需要从ringbuffer读取的size总大小、下一次需要从ringbuffer中取出的事件地址、下一次需要从ringbuffer中取出事件的长度等;后面读、取事件相关的操作也基本靠m_devs这个结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scap_t* scap_open_live_int(char *error, int32_t *rc,
proc_entry_callback proc_callback,
void* proc_callback_context,
bool import_users,
const char *bpf_probe,
const char **suppressed_comms)
{
uint32_t j;
char filename[SCAP_MAX_PATH_SIZE];
scap_t* handle = NULL;
uint32_t ndevs;

//
// Allocate the handle
//
handle = (scap_t*) calloc(sizeof(scap_t), 1);
if(!handle)
{
snprintf(error, SCAP_LASTERR_SIZE, "error allocating the scap_t structure");
*rc = SCAP_FAILURE;
return NULL;
}

其次,获取当前主机所有cpu核数和可用cpu核数,并根据当前可用cpu核数创建指定数量的scap_device结构体数组,保存到handle的m_devs成员中;之后遍历m_devs数组将m_buffer等ringbuffer元数据初始化为map_failed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
handle->m_ncpus = sysconf(_SC_NPROCESSORS_CONF);
if(handle->m_ncpus == -1)
{
scap_close(handle);
snprintf(error, SCAP_LASTERR_SIZE, "_SC_NPROCESSORS_CONF: %s", scap_strerror(handle, errno));
*rc = SCAP_FAILURE;
return NULL;
}

//
// Find out how many devices we have to open, which equals to the number of CPUs
//
ndevs = sysconf(_SC_NPROCESSORS_ONLN);
if(ndevs == -1)
{
scap_close(handle);
snprintf(error, SCAP_LASTERR_SIZE, "_SC_NPROCESSORS_ONLN: %s", scap_strerror(handle, errno));
*rc = SCAP_FAILURE;
return NULL;
}

handle->m_devs = (scap_device*) calloc(sizeof(scap_device), ndevs);
if(!handle->m_devs)
{
scap_close(handle);
snprintf(error, SCAP_LASTERR_SIZE, "error allocating the device handles");
*rc = SCAP_FAILURE;
return NULL;
}
for(j = 0; j < ndevs; j++)
{
handle->m_devs[j].m_buffer = (char*)MAP_FAILED;
if(!handle->m_bpf)
{
handle->m_devs[j].m_bufinfo = (struct ppm_ring_buffer_info*)MAP_FAILED;
handle->m_devs[j].m_bufstatus = (struct udig_ring_buffer_status*)MAP_FAILED;
}
}

handle->m_ndevs = ndevs;

之后比较重要的是这边做内存映射的部分:ing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
else
{
int len;
uint32_t all_scanned_devs;

//
// Allocate the device descriptors.
//
len = RING_BUF_SIZE * 2;

for(j = 0, all_scanned_devs = 0; j < handle->m_ndevs && all_scanned_devs < handle->m_ncpus; ++all_scanned_devs)
{
//
// Open the device
//
snprintf(filename, sizeof(filename), "%s/dev/" PROBE_DEVICE_NAME "%d", scap_get_host_root(), all_scanned_devs);

if((handle->m_devs[j].m_fd = open(filename, O_RDWR | O_SYNC)) < 0)
{
if(errno == ENODEV)
{
//
// This CPU is offline, so we just skip it
//
continue;
}
else if(errno == EBUSY)
{
uint32_t curr_max_consumers = get_max_consumers();
snprintf(error, SCAP_LASTERR_SIZE, "Too many sysdig instances attached to device %s. Current value for /sys/module/" PROBE_DEVICE_NAME "_probe/parameters/max_consumers is '%"PRIu32"'.", filename, curr_max_consumers);
}
else
{
snprintf(error, SCAP_LASTERR_SIZE, "error opening device %s. Make sure you have root credentials and that the " PROBE_NAME " module is loaded.", filename);
}

scap_close(handle);
*rc = SCAP_FAILURE;
return NULL;
}

// Set close-on-exec for the fd
if (fcntl(handle->m_devs[j].m_fd, F_SETFD, FD_CLOEXEC) == -1) {
snprintf(error, SCAP_LASTERR_SIZE, "Can not set close-on-exec flag for fd for device %s (%s)", filename, scap_strerror(handle, errno));
scap_close(handle);
*rc = SCAP_FAILURE;
return NULL;
}

//
// Map the ring buffer
//
handle->m_devs[j].m_buffer = (char*)mmap(0,
len,
PROT_READ,
MAP_SHARED,
handle->m_devs[j].m_fd,
0);

if(handle->m_devs[j].m_buffer == MAP_FAILED)
{
// we cleanup this fd and then we let scap_close() take care of the other ones
close(handle->m_devs[j].m_fd);

scap_close(handle);
snprintf(error, SCAP_LASTERR_SIZE, "error mapping the ring buffer for device %s", filename);
*rc = SCAP_FAILURE;
return NULL;
}

//
// Map the ppm_ring_buffer_info that contains the buffer pointers
//
handle->m_devs[j].m_bufinfo = (struct ppm_ring_buffer_info*)mmap(0,
sizeof(struct ppm_ring_buffer_info),
PROT_READ | PROT_WRITE,
MAP_SHARED,
handle->m_devs[j].m_fd,
0);

if(handle->m_devs[j].m_bufinfo == MAP_FAILED)
{
// we cleanup this fd and then we let scap_close() take care of the other ones
munmap(handle->m_devs[j].m_buffer, len);
close(handle->m_devs[j].m_fd);

scap_close(handle);

snprintf(error, SCAP_LASTERR_SIZE, "error mapping the ring buffer info for device %s", filename);
*rc = SCAP_FAILURE;
return NULL;
}

++j;
}
}
for(j = 0; j < handle->m_ndevs; ++j)
{
//
// Additional initializations
//
handle->m_devs[j].m_lastreadsize = 0;
handle->m_devs[j].m_sn_len = 0;
scap_stop_dropping_mode(handle);
}

接着开始开启抓取,主要是通过ioctl给每个设备发PPM_IOCTL_ENABLE_CAPTURE信号进行的,driver相关的设备ioctl信号处理函数之前应该以及记录过了:

1
2
3
4
5
6
7
if((*rc = scap_start_capture(handle)) != SCAP_SUCCESS)
{
scap_close(handle);
return NULL;
}

return handle;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
int32_t scap_start_capture(scap_t* handle)
{
#if !defined(HAS_CAPTURE) || defined(CYGWING_AGENT)
snprintf(handle->m_lasterr, SCAP_LASTERR_SIZE, "live capture not supported on %s", PLATFORM_NAME);
return SCAP_FAILURE;
#else
uint32_t j;

//
// Not supported for files
//
if(handle->m_mode == SCAP_MODE_LIVE)
{
//
// Enable capture on all the rings
//
if(handle->m_bpf)
{
return scap_bpf_start_capture(handle);
}
else if(handle->m_udig)
{
udig_start_capture(handle);
}
else
{
for(j = 0; j < handle->m_ndevs; j++)
{
if(ioctl(handle->m_devs[j].m_fd, PPM_IOCTL_ENABLE_CAPTURE))
{
snprintf(handle->m_lasterr, SCAP_LASTERR_SIZE, "scap_start_capture failed for device %" PRIu32, j);
ASSERT(false);
return SCAP_FAILURE;
}
}
}
}
else
{
snprintf(handle->m_lasterr, SCAP_LASTERR_SIZE, "cannot start offline live captures");
ASSERT(false);
return SCAP_FAILURE;
}

return SCAP_SUCCESS;
#endif // HAS_CAPTURE
}

scap_open_live的逻辑结束之后,主要就是scap_next这边的逻辑:首先对每一个ringbuffer设备中剩余需要读取的事件size大小(m_sn_len)做判断,如果等于0,且本次读取的总大小(m_lastreadsize)大于0,说明本次读取已经完成,那么就需要进行tail的重新调整计算(tail在用户态控制读),并同步到内核态,这个计算和同步的过程在scap_advance_tail中实现;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
while(1)
{
res = scap_next(g_h, &ev, &cpuid);

if(res > 0)
{
fprintf(stderr, "%s\n", scap_getlasterr(g_h));
scap_close(g_h);
return -1;
}

if(res != SCAP_TIMEOUT)
{
g_nevts++;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static inline int32_t scap_next_live(scap_t* handle, OUT scap_evt** pevent, OUT uint16_t* pcpuid)
#else
static int32_t scap_next_live(scap_t* handle, OUT scap_evt** pevent, OUT uint16_t* pcpuid)
#endif
{
#if !defined(HAS_CAPTURE) || defined(CYGWING_AGENT)
//
// this should be prevented at open time
//
ASSERT(false);
return SCAP_FAILURE;
#else
uint32_t j;
uint64_t max_ts = 0xffffffffffffffffLL;
scap_evt* pe = NULL;
uint32_t ndevs = handle->m_ndevs;

*pcpuid = 65535;

for(j = 0; j < ndevs; j++)
{
scap_device* dev = &(handle->m_devs[j]);

if(dev->m_sn_len == 0)
{
//
// If we don't have data from this ring, but we are
// still occupying, free the resources for the
// producer rather than sitting on them.
//
if(dev->m_lastreadsize > 0)
{
scap_advance_tail(handle, j);
}

continue;
}

之后针对每一个ringbuffer设备,sysdig会计算事件发生的时间戳,获取同一次取事件流程中所有ringbuffer设备中最早发生的那个事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if(handle->m_bpf)
{
pe = scap_bpf_evt_from_perf_sample(dev->m_sn_next_event);
}
else
{
pe = (scap_evt *) dev->m_sn_next_event;
}

//
// We want to consume the event with the lowest timestamp
//
if(pe->ts < max_ts)
{
if(pe->len > dev->m_sn_len)
{
snprintf(handle->m_lasterr, SCAP_LASTERR_SIZE, "scap_next buffer corruption");

//
// if you get the following assertion, first recompile the driver and libscap
//
ASSERT(false);
return SCAP_FAILURE;
}

*pevent = pe;
*pcpuid = j;
max_ts = pe->ts;
}
}

获取被选中的那个同一批次中最早事件所在的cpuid号并通过&handle->m_devs[*pcpuid]定位到对应的ringbuffer设备,更新相关元数据,如需要读取的剩余事件大小(m_sn_len)和对应ringbuffer中下一次需要读取的事件地址(m_sn_next_event):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if(*pcpuid != 65535)
{
struct scap_device *dev = &handle->m_devs[*pcpuid];

//
// Update the pointers.
//
if(handle->m_bpf)
{
scap_bpf_advance_to_evt(handle, *pcpuid, true,
dev->m_sn_next_event,
&dev->m_sn_next_event,
&dev->m_sn_len);
}
else
{
ASSERT(dev->m_sn_len >= (*pevent)->len);
dev->m_sn_len -= (*pevent)->len;
dev->m_sn_next_event += (*pevent)->len;
}

return SCAP_SUCCESS;
}

检查是否所有ringbuffer中的数据已经被usespace程序消费完毕,如果是,则需要重新去计算下一次需要读取的事件大小,并判断是否需要进行等待(等待内核态的生产者产生事件):

1
2
3
4
5
6
7
8
else
{
//
// All the buffers have been consumed. Check if there's enough data to keep going or
// if we should wait.
//
return refill_read_buffers(handle);
}

首先判断ringbuffer是否为空,用了are_buffers_empty这个函数,若为空,说明事件在用户态消费得太快(正常情况下),需要进行usleep等待内核态重新填充足量的事件数据到ringbuffer;are_buffers_empty函数调用了buf_size_used来判断用户态可以读的size,具体实现在get_buf_pointers中,其内部对ringbuffer中读写偏移tail、head的位置进行了判断,获取在此时head、tail偏移所处位置时用户态可读的size大小并与BUFFER_EMPTY_THRESHOLD_B宏定对比,若小于这个宏(20000),则认为ringbuffer中事件量太少,暂时不可读,需等待指定时间后才被允许读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int32_t refill_read_buffers(scap_t* handle)
{
uint32_t j;
uint32_t ndevs = handle->m_ndevs;

if(are_buffers_empty(handle))
{
usleep(handle->m_buffer_empty_wait_time_us);
handle->m_buffer_empty_wait_time_us = MIN(handle->m_buffer_empty_wait_time_us * 2,
BUFFER_EMPTY_WAIT_TIME_US_MAX);
}
else
{
handle->m_buffer_empty_wait_time_us = BUFFER_EMPTY_WAIT_TIME_US_START;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static bool are_buffers_empty(scap_t* handle)
{
uint32_t j;

for(j = 0; j < handle->m_ndevs; j++)
{
if(buf_size_used(handle, j) > BUFFER_EMPTY_THRESHOLD_B)
{
return false;
}
}

return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void get_buf_pointers(struct ppm_ring_buffer_info* bufinfo, uint32_t* phead, uint32_t* ptail, uint64_t* pread_size)
#endif
{
*phead = bufinfo->head;
*ptail = bufinfo->tail;

if(*ptail > *phead)
{
*pread_size = RING_BUF_SIZE - *ptail + *phead;
}
else
{
*pread_size = *phead - *ptail;
}
}

最后计算本次需要读取的总事件size大小,通过get_buf_pointers获取当前ringbuffer设备的head、tail及可读size(pread_size),将此可读size赋给设备m_lastreadsize及m_sn_len成员,并更新下一次读取的ringbuffer首地址(m_sn_next_event):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
	for(j = 0; j < ndevs; j++)
{
struct scap_device *dev = &(handle->m_devs[j]);

int32_t res = scap_readbuf(handle,
j,
&dev->m_sn_next_event,
&dev->m_sn_len);

if(res != SCAP_SUCCESS)
{
return res;
}
}

//
// Note: we might return a spurious timeout here in case the previous loop extracted valid data to parse.
// It's ok, since this is rare and the caller will just call us again after receiving a
// SCAP_TIMEOUT.
//
return SCAP_TIMEOUT;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_t* len)
{
uint32_t thead;
uint32_t ttail;
uint64_t read_size;

if(handle->m_bpf)
{
return scap_bpf_readbuf(handle, cpuid, buf, len);
}

//
// Read the pointers.
//
get_buf_pointers(handle->m_devs[cpuid].m_bufinfo,
&thead,
&ttail,
&read_size);

//
// Remember read_size so we can update the tail at the next call
//
handle->m_devs[cpuid].m_lastreadsize = read_size;

//
// Return the results
//
*len = read_size;
*buf = handle->m_devs[cpuid].m_buffer + ttail;

return SCAP_SUCCESS;
}

四、总结

时间紧迫,写的比较粗,libscap的行为就是libscap从ringbuffer中取事件,主要动作有开启捕获与循环取事件,又可继续拆分为设备创建、内存映射、ioctl通信、ringbuffer状态判断、更新等流程,不再赘述。