ronnie
2022-10-14 1504bb53e29d3d46222c0b3ea994fc494b48e153
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python
# Copyright (c) PLUMgrid, Inc.
# Licensed under the Apache License, Version 2.0 (the "License")
 
from bcc import BPF
import ctypes as ct
import random
import time
import subprocess
from bcc.utils import get_online_cpus
from unittest import main, TestCase
 
class TestArray(TestCase):
    def test_simple(self):
        b = BPF(text="""BPF_ARRAY(table1, u64, 128);""")
        t1 = b["table1"]
        t1[ct.c_int(0)] = ct.c_ulonglong(100)
        t1[ct.c_int(127)] = ct.c_ulonglong(1000)
        for i, v in t1.items():
            if i.value == 0:
                self.assertEqual(v.value, 100)
            if i.value == 127:
                self.assertEqual(v.value, 1000)
        self.assertEqual(len(t1), 128)
 
    def test_native_type(self):
        b = BPF(text="""BPF_ARRAY(table1, u64, 128);""")
        t1 = b["table1"]
        t1[0] = ct.c_ulonglong(100)
        t1[-2] = ct.c_ulonglong(37)
        t1[127] = ct.c_ulonglong(1000)
        for i, v in t1.items():
            if i.value == 0:
                self.assertEqual(v.value, 100)
            if i.value == 127:
                self.assertEqual(v.value, 1000)
        self.assertEqual(len(t1), 128)
        self.assertEqual(t1[-2].value, 37)
        self.assertEqual(t1[-1].value, t1[127].value)
 
    def test_perf_buffer(self):
        self.counter = 0
 
        class Data(ct.Structure):
            _fields_ = [("ts", ct.c_ulonglong)]
 
        def cb(cpu, data, size):
            self.assertGreater(size, ct.sizeof(Data))
            event = ct.cast(data, ct.POINTER(Data)).contents
            self.counter += 1
 
        def lost_cb(lost):
            self.assertGreater(lost, 0)
 
        text = """
BPF_PERF_OUTPUT(events);
int do_sys_nanosleep(void *ctx) {
    struct {
        u64 ts;
    } data = {bpf_ktime_get_ns()};
    events.perf_submit(ctx, &data, sizeof(data));
    return 0;
}
"""
        b = BPF(text=text)
        b.attach_kprobe(event=b.get_syscall_fnname("nanosleep"),
                        fn_name="do_sys_nanosleep")
        b["events"].open_perf_buffer(cb, lost_cb=lost_cb)
        subprocess.call(['sleep', '0.1'])
        b.perf_buffer_poll()
        self.assertGreater(self.counter, 0)
        b.cleanup()
 
    def test_perf_buffer_for_each_cpu(self):
        self.events = []
 
        class Data(ct.Structure):
            _fields_ = [("cpu", ct.c_ulonglong)]
 
        def cb(cpu, data, size):
            self.assertGreater(size, ct.sizeof(Data))
            event = ct.cast(data, ct.POINTER(Data)).contents
            self.events.append(event)
 
        def lost_cb(lost):
            self.assertGreater(lost, 0)
 
        text = """
BPF_PERF_OUTPUT(events);
int do_sys_nanosleep(void *ctx) {
    struct {
        u64 cpu;
    } data = {bpf_get_smp_processor_id()};
    events.perf_submit(ctx, &data, sizeof(data));
    return 0;
}
"""
        b = BPF(text=text)
        b.attach_kprobe(event=b.get_syscall_fnname("nanosleep"),
                        fn_name="do_sys_nanosleep")
        b["events"].open_perf_buffer(cb, lost_cb=lost_cb)
        online_cpus = get_online_cpus()
        for cpu in online_cpus:
            subprocess.call(['taskset', '-c', str(cpu), 'sleep', '0.1'])
        b.perf_buffer_poll()
        b.cleanup()
        self.assertGreaterEqual(len(self.events), len(online_cpus), 'Received only {}/{} events'.format(len(self.events), len(online_cpus)))
 
if __name__ == "__main__":
    main()