liyujie
2025-08-28 b3810562527858a3b3d98ffa6e9c9c5b0f4a9a8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/python
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import unittest
 
import common
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils
from autotest_lib.scheduler import rdb
from autotest_lib.scheduler import rdb_cache_manager
from autotest_lib.scheduler import rdb_lib
from autotest_lib.scheduler import rdb_testing_utils as test_utils
from autotest_lib.scheduler import rdb_utils
 
 
def get_line_with_labels(required_labels, cache_lines):
    """Get the cache line with the hosts that match given labels.
 
    Confirm that all hosts have matching labels within a line,
    then return the lines with the requested labels. There can
    be more than one, since we use acls in the cache key.
 
    @param labels: A list of label names.
    @cache_lines: A list of cache lines to look through.
 
    @return: A list of the cache lines with the requested labels.
    """
    label_lines = []
    for line in cache_lines:
        if not line:
            continue
        labels = list(line)[0].labels.get_label_names()
        if any(host.labels.get_label_names() != labels for host in line):
            raise AssertionError('Mismatch in deps within a cache line')
        if required_labels == labels:
            label_lines.append(line)
    return label_lines
 
 
def get_hosts_for_request(
        response_map, deps=test_utils.DEFAULT_DEPS,
        acls=test_utils.DEFAULT_ACLS, priority=0, parent_job_id=0, **kwargs):
    """Get the hosts for a request matching kwargs from the response map.
 
    @param response_map: A response map from an rdb request_handler.
    """
    return response_map[
            test_utils.AbstractBaseRDBTester.get_request(
                    deps, acls, priority, parent_job_id)]
 
 
class RDBCacheTest(test_utils.AbstractBaseRDBTester, unittest.TestCase):
    """Unittests for RDBHost objects."""
 
 
    def testCachingBasic(self):
        """Test that different requests will hit the database."""
 
        # r1 should cache h2 and use h1; r2 should cach [] and use h2
        # at the end the cache should contain one stale line, with
        # h2 in it, and one empty line since r2 acquired h2.
        default_params = test_utils.get_default_job_params()
        self.create_job(**default_params)
        default_params['deps'] = default_params['deps'][0]
        self.create_job(**default_params)
        for i in range(0, 2):
            self.db_helper.create_host(
                    'h%s'%i, **test_utils.get_default_host_params())
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
        def local_get_response(self):
            """ Local rdb.get_response handler."""
            requests = self.response_map.keys()
            if not (self.cache.hits == 0 and self.cache.misses == 2):
                raise AssertionError('Neither request should have hit the '
                        'cache, but both should have inserted into it.')
 
            lines = get_line_with_labels(
                    test_utils.DEFAULT_DEPS,
                    self.cache._cache_backend._cache.values())
            if len(lines) > 1:
                raise AssertionError('Caching was too agressive, '
                        'the second request should not have cached anything '
                        'because it used the one free host.')
 
            cached_host = lines[0].pop()
            default_params = test_utils.get_default_job_params()
            job1_host = get_hosts_for_request(
                    self.response_map, **default_params)[0]
            default_params['deps'] = default_params['deps'][0]
            job2_host = get_hosts_for_request(
                    self.response_map, **default_params)[0]
            if (job2_host.hostname == job1_host.hostname or
                cached_host.hostname not in
                [job2_host.hostname, job1_host.hostname]):
                raise AssertionError('Wrong host cached %s. The first job '
                        'should have cached the host used by the second.' %
                        cached_host.hostname)
 
            # Shouldn't be able to lease this host since r2 used it.
            try:
                cached_host.lease()
            except rdb_utils.RDBException:
                pass
            else:
                raise AssertionError('Was able to lease a stale host. The '
                        'second request should have leased it.')
            return test_utils.wire_format_response_map(self.response_map)
 
        self.god.stub_with(rdb.AvailableHostRequestHandler,
                           'get_response', local_get_response)
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
    def testCachingPriority(self):
        """Test requests with the same deps but different priorities."""
        # All 3 jobs should find hosts, and there should be one host left
        # behind in the cache. The first job will take one host and cache 3,
        # the second will take one and cache 2, while the last will take one.
        # The remaining host in the cache should not be stale.
        default_job_params = test_utils.get_default_job_params()
        for i in range(0, 3):
            default_job_params['priority'] = i
            job = self.create_job(**default_job_params)
 
        default_host_params = test_utils.get_default_host_params()
        for i in range(0, 4):
            self.db_helper.create_host('h%s'%i, **default_host_params)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
        def local_get_response(self):
            """ Local rdb.get_response handler."""
            if not (self.cache.hits == 2 and self.cache.misses ==1):
                raise AssertionError('The first request should have populated '
                        'the cache for the others.')
 
            default_job_params = test_utils.get_default_job_params()
            lines = get_line_with_labels(
                    default_job_params['deps'],
                    self.cache._cache_backend._cache.values())
            if len(lines) > 1:
                raise AssertionError('Should only be one cache line left.')
 
            # Make sure that all the jobs got different hosts, and that
            # the host cached isn't being used by a job.
            cached_host = lines[0].pop()
            cached_host.lease()
 
            job_hosts = []
            default_job_params = test_utils.get_default_job_params()
            for i in range(0, 3):
                default_job_params['priority'] = i
                hosts = get_hosts_for_request(self.response_map,
                                              **default_job_params)
                assert(len(hosts) == 1)
                host = hosts[0]
                assert(host.id not in job_hosts and cached_host.id != host.id)
            return test_utils.wire_format_response_map(self.response_map)
 
        self.god.stub_with(rdb.AvailableHostRequestHandler,
                           'get_response', local_get_response)
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
    def testCachingEmptyList(self):
        """Test that the 'no available hosts' condition isn't a cache miss."""
        default_params = test_utils.get_default_job_params()
        for i in range(0 ,3):
            default_params['parent_job_id'] = i
            self.create_job(**default_params)
 
        default_host_params = test_utils.get_default_host_params()
        self.db_helper.create_host('h1', **default_host_params)
 
        def local_get_response(self):
            """ Local rdb.get_response handler."""
            if not (self.cache.misses == 1 and self.cache.hits == 2):
                raise AssertionError('The first request should have taken h1 '
                        'while the other 2 should have hit the cache.')
 
            request = test_utils.AbstractBaseRDBTester.get_request(
                    test_utils.DEFAULT_DEPS, test_utils.DEFAULT_ACLS)
            key = self.cache.get_key(deps=request.deps, acls=request.acls)
            if self.cache._cache_backend.get(key) != []:
                raise AssertionError('A request with no hosts does not get '
                        'cached corrrectly.')
            return test_utils.wire_format_response_map(self.response_map)
 
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        self.god.stub_with(rdb.AvailableHostRequestHandler,
                           'get_response', local_get_response)
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
    def testStaleCacheLine(self):
        """Test that a stale cache line doesn't satisfy a request."""
 
        # Create 3 jobs, all of which can use the same hosts. The first
        # will cache the only remaining host after taking one, the second
        # will also take a host, but not cache anything, while the third
        # will try to use the host cached by the first job but fail because
        # it is already leased.
        default_params = test_utils.get_default_job_params()
        default_params['priority'] = 2
        self.create_job(**default_params)
        default_params['priority'] = 1
        default_params['deps'] = default_params['deps'][0]
        self.create_job(**default_params)
        default_params['priority'] = 0
        default_params['deps'] = test_utils.DEFAULT_DEPS
        self.create_job(**default_params)
 
        host_1 = self.db_helper.create_host(
                'h1', **test_utils.get_default_host_params())
        host_2 = self.db_helper.create_host(
                'h2', **test_utils.get_default_host_params())
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
        def local_get_response(self):
            """ Local rdb.get_response handler."""
            default_job_params = test_utils.get_default_job_params()
 
            # Confirm that even though the third job hit the cache, it wasn't
            # able to use the cached host because it was already leased, and
            # that it doesn't add it back to the cache.
            assert(self.cache.misses == 2 and self.cache.hits == 1)
            lines = get_line_with_labels(
                        default_job_params['deps'],
                        self.cache._cache_backend._cache.values())
            assert(len(lines) == 0)
            assert(int(self.cache.mean_staleness()) == 100)
            return test_utils.wire_format_response_map(self.response_map)
 
        self.god.stub_with(rdb.AvailableHostRequestHandler,
                           'get_response', local_get_response)
        acquired_hosts = list(rdb_lib.acquire_hosts(queue_entries))
        self.assertTrue(acquired_hosts[0].id == host_1.id and
                        acquired_hosts[1].id == host_2.id and
                        acquired_hosts[2] is None)
 
 
    def testCacheAPI(self):
        """Test the cache managers api."""
        cache = rdb_cache_manager.RDBHostCacheManager()
        key = cache.get_key(
                deps=test_utils.DEFAULT_DEPS, acls=test_utils.DEFAULT_ACLS)
 
        # Cannot set None, it's reserved for cache expiration.
        self.assertRaises(rdb_utils.RDBException, cache.set_line, *(key, None))
 
        # Setting an empty list indicates a query with no results.
        cache.set_line(key, [])
        self.assertTrue(cache.get_line(key) == [])
 
        # Getting a value will delete the key, leading to a miss on subsequent
        # gets before a set.
        self.assertRaises(rdb_utils.CacheMiss, cache.get_line, *(key,))
 
        # Caching a leased host is just a waste of cache space.
        host = test_utils.FakeHost(
                'h1', 1, labels=test_utils.DEFAULT_DEPS,
                acls=test_utils.DEFAULT_ACLS, leased=1)
        cache.set_line(key, [host])
        self.assertRaises(
                rdb_utils.CacheMiss, cache.get_line, *(key,))
 
        # Retrieving an unleased cached host shouldn't mutate it, even if the
        # key is reconstructed.
        host.leased=0
        cache.set_line(cache.get_key(host.labels, host.acls), [host])
        self.assertTrue(
                cache.get_line(cache.get_key(host.labels, host.acls)) == [host])
 
        # Caching different hosts under the same key isn't allowed.
        different_host = test_utils.FakeHost(
                'h2', 2, labels=[test_utils.DEFAULT_DEPS[0]],
                acls=test_utils.DEFAULT_ACLS, leased=0)
        cache.set_line(key, [host, different_host])
        self.assertRaises(
                rdb_utils.CacheMiss, cache.get_line, *(key,))
 
        # Caching hosts with the same deps but different acls under the
        # same key is allowed, as long as the acls match the key.
        different_host = test_utils.FakeHost(
                'h2', 2, labels=test_utils.DEFAULT_DEPS,
                acls=[test_utils.DEFAULT_ACLS[1]], leased=0)
        cache.set_line(key, [host, different_host])
        self.assertTrue(set(cache.get_line(key)) == set([host, different_host]))
 
        # Make sure we don't divide by zero while calculating hit ratio
        cache.misses = 0
        cache.hits = 0
        self.assertTrue(cache.hit_ratio() == 0)
        cache.hits = 1
        hit_ratio = cache.hit_ratio()
        self.assertTrue(type(hit_ratio) == float and hit_ratio == 100)
 
 
    def testDummyCache(self):
        """Test that the dummy cache doesn't save hosts."""
 
        # Create 2 jobs and 3 hosts. Both the jobs should not hit the cache,
        # nor should they cache anything, but both jobs should acquire hosts.
        default_params = test_utils.get_default_job_params()
        default_host_params = test_utils.get_default_host_params()
        for i in range(0, 2):
            default_params['parent_job_id'] = i
            self.create_job(**default_params)
            self.db_helper.create_host('h%s'%i, **default_host_params)
        self.db_helper.create_host('h2', **default_host_params)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        self.god.stub_with(
                rdb_cache_manager.RDBHostCacheManager, 'use_cache', False)
 
        def local_get_response(self):
            """ Local rdb.get_response handler."""
            requests = self.response_map.keys()
            if not (self.cache.hits == 0 and self.cache.misses == 2):
                raise AssertionError('Neither request should have hit the '
                        'cache, but both should have inserted into it.')
 
            # Make sure both requests actually found a host
            default_params = test_utils.get_default_job_params()
            job1_host = get_hosts_for_request(
                    self.response_map, **default_params)[0]
            default_params['parent_job_id'] = 1
            job2_host = get_hosts_for_request(
                    self.response_map, **default_params)[0]
            if (not job1_host or not job2_host or
                job2_host.hostname == job1_host.hostname):
                raise AssertionError('Excected acquisitions did not occur.')
 
            assert(hasattr(self.cache._cache_backend, '_cache') == False)
            return test_utils.wire_format_response_map(self.response_map)
 
        self.god.stub_with(rdb.AvailableHostRequestHandler,
                           'get_response', local_get_response)
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
if __name__ == '__main__':
    unittest.main()