hc
2023-11-06 e3e12f52b214121840b44c91de5b3e5af5d3eb84
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import pexpect
 
import infra
 
 
class Emulator(object):
 
    def __init__(self, builddir, downloaddir, logtofile, timeout_multiplier):
        self.qemu = None
        self.downloaddir = downloaddir
        self.logfile = infra.open_log_file(builddir, "run", logtofile)
        # We use elastic runners on the cloud to runs our tests. Those runners
        # can take a long time to run the emulator. Use a timeout multiplier
        # when running the tests to avoid sporadic failures.
        self.timeout_multiplier = timeout_multiplier
 
    # Start Qemu to boot the system
    #
    # arch: Qemu architecture to use
    #
    # kernel: path to the kernel image, or the special string
    # 'builtin'. 'builtin' means a pre-built kernel image will be
    # downloaded from ARTEFACTS_URL and suitable options are
    # automatically passed to qemu and added to the kernel cmdline. So
    # far only armv5, armv7 and i386 builtin kernels are available.
    # If None, then no kernel is used, and we assume a bootable device
    # will be specified.
    #
    # kernel_cmdline: array of kernel arguments to pass to Qemu -append option
    #
    # options: array of command line options to pass to Qemu
    #
    def boot(self, arch, kernel=None, kernel_cmdline=None, options=None):
        if arch in ["armv7", "armv5"]:
            qemu_arch = "arm"
        else:
            qemu_arch = arch
 
        qemu_cmd = ["qemu-system-{}".format(qemu_arch),
                    "-serial", "stdio",
                    "-display", "none"]
 
        if options:
            qemu_cmd += options
 
        if kernel_cmdline is None:
            kernel_cmdline = []
 
        if kernel:
            if kernel == "builtin":
                if arch in ["armv7", "armv5"]:
                    kernel_cmdline.append("console=ttyAMA0")
 
                if arch == "armv7":
                    kernel = infra.download(self.downloaddir,
                                            "kernel-vexpress")
                    dtb = infra.download(self.downloaddir,
                                         "vexpress-v2p-ca9.dtb")
                    qemu_cmd += ["-dtb", dtb]
                    qemu_cmd += ["-M", "vexpress-a9"]
                elif arch == "armv5":
                    kernel = infra.download(self.downloaddir,
                                            "kernel-versatile")
                    qemu_cmd += ["-M", "versatilepb"]
 
            qemu_cmd += ["-kernel", kernel]
 
        if kernel_cmdline:
            qemu_cmd += ["-append", " ".join(kernel_cmdline)]
 
        self.logfile.write("> starting qemu with '%s'\n" % " ".join(qemu_cmd))
        self.qemu = pexpect.spawn(qemu_cmd[0], qemu_cmd[1:],
                                  timeout=5 * self.timeout_multiplier,
                                  env={"QEMU_AUDIO_DRV": "none"})
        # We want only stdout into the log to avoid double echo
        self.qemu.logfile_read = self.logfile
 
    # Wait for the login prompt to appear, and then login as root with
    # the provided password, or no password if not specified.
    def login(self, password=None):
        # The login prompt can take some time to appear when running multiple
        # instances in parallel, so set the timeout to a large value
        index = self.qemu.expect(["buildroot login:", pexpect.TIMEOUT],
                                 timeout=60 * self.timeout_multiplier)
        if index != 0:
            self.logfile.write("==> System does not boot")
            raise SystemError("System does not boot")
 
        self.qemu.sendline("root")
        if password:
            self.qemu.expect("Password:")
            self.qemu.sendline(password)
        index = self.qemu.expect(["# ", pexpect.TIMEOUT])
        if index != 0:
            raise SystemError("Cannot login")
        self.run("dmesg -n 1")
 
    # Run the given 'cmd' with a 'timeout' on the target
    # return a tuple (output, exit_code)
    def run(self, cmd, timeout=-1):
        self.qemu.sendline(cmd)
        if timeout != -1:
            timeout *= self.timeout_multiplier
        self.qemu.expect("# ", timeout=timeout)
        # Remove double carriage return from qemu stdout so str.splitlines()
        # works as expected.
        output = self.qemu.before.replace("\r\r", "\r").splitlines()[1:]
 
        self.qemu.sendline("echo $?")
        self.qemu.expect("# ")
        exit_code = self.qemu.before.splitlines()[2]
        exit_code = int(exit_code)
 
        return output, exit_code
 
    def stop(self):
        if self.qemu is None:
            return
        self.qemu.terminate(force=True)