huangcm
2025-09-01 53d8e046ac1bf2ebe94f671983e3d3be059df91a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
#!/usr/bin/python
#pylint: disable-msg=C0111
 
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import unittest
 
import common
from autotest_lib.client.common_lib import host_queue_entry_states
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils
from autotest_lib.frontend.afe import models
from autotest_lib.frontend.afe import rdb_model_extensions
from autotest_lib.scheduler import rdb
from autotest_lib.scheduler import rdb_hosts
from autotest_lib.scheduler import rdb_lib
from autotest_lib.scheduler import rdb_requests
from autotest_lib.scheduler import rdb_testing_utils
from autotest_lib.server.cros import provision
 
 
class AssignmentValidator(object):
    """Utility class to check that priority inversion doesn't happen. """
 
 
    @staticmethod
    def check_acls_deps(host, request):
        """Check if a host and request match by comparing acls and deps.
 
        @param host: A dictionary representing attributes of the host.
        @param request: A request, as defined in rdb_requests.
 
        @return True if the deps/acls of the request match the host.
        """
        # Unfortunately the hosts labels are labelnames, not ids.
        request_deps = set([l.name for l in
                models.Label.objects.filter(id__in=request.deps)])
        return (set(host['labels']).intersection(request_deps) == request_deps
                and set(host['acls']).intersection(request.acls))
 
 
    @staticmethod
    def find_matching_host_for_request(hosts, request):
        """Find a host from the given list of hosts, matching the request.
 
        @param hosts: A list of dictionaries representing host attributes.
        @param requetst: The unsatisfied request.
 
        @return: A host, if a matching host is found from the input list.
        """
        if not hosts or not request:
            return None
        for host in hosts:
            if AssignmentValidator.check_acls_deps(host, request):
                return host
 
 
    @staticmethod
    def sort_requests(requests):
        """Sort the requests by priority.
 
        @param requests: Unordered requests.
 
        @return: A list of requests ordered by priority.
        """
        return sorted(collections.Counter(requests).items(),
                key=lambda request: request[0].priority, reverse=True)
 
 
    @staticmethod
    def verify_priority(request_queue, result):
        requests = AssignmentValidator.sort_requests(request_queue)
        for request, count in requests:
            hosts = result.get(request)
            # The request was completely satisfied.
            if hosts and len(hosts) == count:
                continue
            # Go through all hosts given to lower priority requests and
            # make sure we couldn't have allocated one of them for this
            # unsatisfied higher priority request.
            lower_requests = requests[requests.index((request,count))+1:]
            for lower_request, count in lower_requests:
                if (lower_request.priority < request.priority and
                    AssignmentValidator.find_matching_host_for_request(
                            result.get(lower_request), request)):
                    raise ValueError('Priority inversion occured between '
                            'priorities %s and %s' %
                            (request.priority, lower_request.priority))
 
 
    @staticmethod
    def priority_checking_response_handler(request_manager):
        """Fake response handler wrapper for any request_manager.
 
        Check that higher priority requests get a response over lower priority
        requests, by re-validating all the hosts assigned to a lower priority
        request against the unsatisfied higher priority ones.
 
        @param request_manager: A request_manager as defined in rdb_lib.
 
        @raises ValueError: If priority inversion is detected.
        """
        # Fist call the rdb to make its decisions, then sort the requests
        # by priority and make sure unsatisfied requests higher up in the list
        # could not have been satisfied by hosts assigned to requests lower
        # down in the list.
        result = request_manager.api_call(request_manager.request_queue)
        if not result:
            raise ValueError('Expected results but got none.')
        AssignmentValidator.verify_priority(
                request_manager.request_queue, result)
        for hosts in result.values():
            for host in hosts:
                yield host
 
 
class BaseRDBTest(rdb_testing_utils.AbstractBaseRDBTester, unittest.TestCase):
    _config_section = 'AUTOTEST_WEB'
 
 
    def testAcquireLeasedHostBasic(self):
        """Test that acquisition of a leased host doesn't happen.
 
        @raises AssertionError: If the one host that satisfies the request
            is acquired.
        """
        job = self.create_job(deps=set(['a']))
        host = self.db_helper.create_host('h1', deps=set(['a']))
        host.leased = 1
        host.save()
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        hosts = list(rdb_lib.acquire_hosts(queue_entries))
        self.assertTrue(len(hosts) == 1 and hosts[0] is None)
 
 
    def testAcquireLeasedHostRace(self):
        """Test behaviour when hosts are leased just before acquisition.
 
        If a fraction of the hosts somehow get leased between finding and
        acquisition, the rdb should just return the remaining hosts for the
        request to use.
 
        @raises AssertionError: If both the requests get a host successfully,
            since one host gets leased before the final attempt to lease both.
        """
        j1 = self.create_job(deps=set(['a']))
        j2 = self.create_job(deps=set(['a']))
        hosts = [self.db_helper.create_host('h1', deps=set(['a'])),
                 self.db_helper.create_host('h2', deps=set(['a']))]
 
        @rdb_hosts.return_rdb_host
        def local_find_hosts(host_query_manger, deps, acls):
            """Return a predetermined list of hosts, one of which is leased."""
            h1 = models.Host.objects.get(hostname='h1')
            h1.leased = 1
            h1.save()
            h2 = models.Host.objects.get(hostname='h2')
            return [h1, h2]
 
        self.god.stub_with(rdb.AvailableHostQueryManager, 'find_hosts',
                           local_find_hosts)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        hosts = list(rdb_lib.acquire_hosts(queue_entries))
        self.assertTrue(len(hosts) == 2 and None in hosts)
        self.check_hosts(iter(hosts))
 
 
    def testHostReleaseStates(self):
        """Test that we will only release an unused host if it is in Ready.
 
        @raises AssertionError: If the host gets released in any other state.
        """
        host = self.db_helper.create_host('h1', deps=set(['x']))
        for state in rdb_model_extensions.AbstractHostModel.Status.names:
            host.status = state
            host.leased = 1
            host.save()
            self._release_unused_hosts()
            host = models.Host.objects.get(hostname='h1')
            self.assertTrue(host.leased == (state != 'Ready'))
 
 
    def testHostReleseHQE(self):
        """Test that we will not release a ready host if it's being used.
 
        @raises AssertionError: If the host is released even though it has
            been assigned to an active hqe.
        """
        # Create a host and lease it out in Ready.
        host = self.db_helper.create_host('h1', deps=set(['x']))
        host.status = 'Ready'
        host.leased = 1
        host.save()
 
        # Create a job and give its hqe the leased host.
        job = self.create_job(deps=set(['x']))
        self.db_helper.add_host_to_job(host, job.id)
        hqe = models.HostQueueEntry.objects.get(job_id=job.id)
 
        # Activate the hqe by setting its state.
        hqe.status = host_queue_entry_states.ACTIVE_STATUSES[0]
        hqe.save()
 
        # Make sure the hqes host isn't released, even if its in ready.
        self._release_unused_hosts()
        host = models.Host.objects.get(hostname='h1')
        self.assertTrue(host.leased == 1)
 
 
    def testBasicDepsAcls(self):
        """Test a basic deps/acls request.
 
        Make sure that a basic request with deps and acls, finds a host from
        the ready pool that has matching labels and is in a matching aclgroups.
 
        @raises AssertionError: If the request doesn't find a host, since the
            we insert a matching host in the ready pool.
        """
        deps = set(['a', 'b'])
        acls = set(['a', 'b'])
        self.db_helper.create_host('h1', deps=deps, acls=acls)
        job = self.create_job(user='autotest_system', deps=deps, acls=acls)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()
        self.check_host_assignment(job.id, matching_host.id)
        self.assertTrue(matching_host.leased == 1)
 
 
    def testPreferredDeps(self):
        """Test that perferred deps is respected.
 
        If multiple hosts satisfied a job's deps, the one with preferred
        label will be assigned to the job.
 
        @raises AssertionError: If a host without a preferred label is
                                assigned to the job instead of one with
                                a preferred label.
        """
        lumpy_deps = set(['board:lumpy'])
        stumpy_deps = set(['board:stumpy'])
        stumpy_deps_with_crosversion = set(
                ['board:stumpy', 'cros-version:lumpy-release/R41-6323.0.0'])
 
        acls = set(['a', 'b'])
        # Hosts lumpy1 and lumpy2 are created as a control group,
        # which ensures that if no preferred label is used, the host
        # with a smaller id will be chosen first. We need to make sure
        # stumpy2 was chosen because it has a cros-version label, but not
        # because of other randomness.
        self.db_helper.create_host('lumpy1', deps=lumpy_deps, acls=acls)
        self.db_helper.create_host('lumpy2', deps=lumpy_deps, acls=acls)
        self.db_helper.create_host('stumpy1', deps=stumpy_deps, acls=acls)
        self.db_helper.create_host(
                    'stumpy2', deps=stumpy_deps_with_crosversion , acls=acls)
        job_1 = self.create_job(user='autotest_system',
                              deps=lumpy_deps, acls=acls)
        job_2 = self.create_job(user='autotest_system',
                              deps=stumpy_deps_with_crosversion, acls=acls)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        matching_hosts  = list(rdb_lib.acquire_hosts(queue_entries))
        assignment = {}
        import logging
        for job, host in zip(queue_entries, matching_hosts):
            self.check_host_assignment(job.id, host.id)
            assignment[job.id] = host.hostname
        self.assertEqual(assignment[job_1.id], 'lumpy1')
        self.assertEqual(assignment[job_2.id], 'stumpy2')
 
 
    def testBadDeps(self):
        """Test that we find no hosts when only acls match.
 
        @raises AssertionError: If the request finds a host, since the only
            host in the ready pool will not have matching deps.
        """
        host_labels = set(['a'])
        job_deps = set(['b'])
        acls = set(['a', 'b'])
        self.db_helper.create_host('h1', deps=host_labels, acls=acls)
        job = self.create_job(user='autotest_system', deps=job_deps, acls=acls)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()
        self.assert_(not matching_host)
 
 
    def testBadAcls(self):
        """Test that we find no hosts when only deps match.
 
        @raises AssertionError: If the request finds a host, since the only
            host in the ready pool will not have matching acls.
        """
        deps = set(['a'])
        host_acls = set(['a'])
        job_acls = set(['b'])
        self.db_helper.create_host('h1', deps=deps, acls=host_acls)
 
        # Create the job as a new user who is only in the 'b' and 'Everyone'
        # aclgroups. Though there are several hosts in the Everyone group, the
        # 1 host that has the 'a' dep isn't.
        job = self.create_job(user='new_user', deps=deps, acls=job_acls)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()
        self.assert_(not matching_host)
 
 
    def testBasicPriority(self):
        """Test that priority inversion doesn't happen.
 
        Schedule 2 jobs with the same deps, acls and user, but different
        priorities, and confirm that the higher priority request gets the host.
        This confirmation happens through the AssignmentValidator.
 
        @raises AssertionError: If the un important request gets host h1 instead
            of the important request.
        """
        deps = set(['a', 'b'])
        acls = set(['a', 'b'])
        self.db_helper.create_host('h1', deps=deps, acls=acls)
        important_job = self.create_job(user='autotest_system',
                deps=deps, acls=acls, priority=2)
        un_important_job = self.create_job(user='autotest_system',
                deps=deps, acls=acls, priority=0)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
                AssignmentValidator.priority_checking_response_handler)
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
    def testPriorityLevels(self):
        """Test that priority inversion doesn't happen.
 
        Increases a job's priority and makes several requests for hosts,
        checking that priority inversion doesn't happen.
 
        @raises AssertionError: If the unimportant job gets h1 while it is
            still unimportant, or doesn't get h1 while after it becomes the
            most important job.
        """
        deps = set(['a', 'b'])
        acls = set(['a', 'b'])
        self.db_helper.create_host('h1', deps=deps, acls=acls)
 
        # Create jobs that will bucket differently and confirm that jobs in an
        # earlier bucket get a host.
        first_job = self.create_job(user='autotest_system', deps=deps, acls=acls)
        important_job = self.create_job(user='autotest_system', deps=deps,
                acls=acls, priority=2)
        deps.pop()
        unimportant_job = self.create_job(user='someother_system', deps=deps,
                acls=acls, priority=1)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
                AssignmentValidator.priority_checking_response_handler)
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
        # Elevate the priority of the unimportant job, so we now have
        # 2 jobs at the same priority.
        self.db_helper.increment_priority(job_id=unimportant_job.id)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        self._release_unused_hosts()
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
        # Prioritize the first job, and confirm that it gets the host over the
        # jobs that got it the last time.
        self.db_helper.increment_priority(job_id=unimportant_job.id)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        self._release_unused_hosts()
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
    def testFrontendJobScheduling(self):
        """Test that basic frontend job scheduling.
 
        @raises AssertionError: If the received and requested host don't match,
            or the mis-matching host is returned instead.
        """
        deps = set(['x', 'y'])
        acls = set(['a', 'b'])
 
        # Create 2 frontend jobs and only one matching host.
        matching_job = self.create_job(acls=acls, deps=deps)
        matching_host = self.db_helper.create_host('h1', acls=acls, deps=deps)
        mis_matching_job = self.create_job(acls=acls, deps=deps)
        mis_matching_host = self.db_helper.create_host(
                'h2', acls=acls, deps=deps.pop())
        self.db_helper.add_host_to_job(matching_host, matching_job.id)
        self.db_helper.add_host_to_job(mis_matching_host, mis_matching_job.id)
 
        # Check that only the matching host is returned, and that we get 'None'
        # for the second request.
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        hosts = list(rdb_lib.acquire_hosts(queue_entries))
        self.assertTrue(len(hosts) == 2 and None in hosts)
        returned_host = [host for host in hosts if host].pop()
        self.assertTrue(matching_host.id == returned_host.id)
 
 
    def testFrontendJobPriority(self):
        """Test that frontend job scheduling doesn't ignore priorities.
 
        @raises ValueError: If the priorities of frontend jobs are ignored.
        """
        board = 'x'
        high_priority = self.create_job(priority=2, deps=set([board]))
        low_priority = self.create_job(priority=1, deps=set([board]))
        host = self.db_helper.create_host('h1', deps=set([board]))
        self.db_helper.add_host_to_job(host, low_priority.id)
        self.db_helper.add_host_to_job(host, high_priority.id)
 
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
        def local_response_handler(request_manager):
            """Confirms that a higher priority frontend job gets a host.
 
            @raises ValueError: If priority inversion happens and the job
                with priority 1 gets the host instead.
            """
            result = request_manager.api_call(request_manager.request_queue)
            if not result:
                raise ValueError('Excepted the high priority request to '
                                 'get a host, but the result is empty.')
            for request, hosts in result.iteritems():
                if request.priority == 1:
                    raise ValueError('Priority of frontend job ignored.')
                if len(hosts) > 1:
                    raise ValueError('Multiple hosts returned against one '
                                     'frontend job scheduling request.')
                yield hosts[0]
 
        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
                           local_response_handler)
        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
    def testSuiteOrderedHostAcquisition(self):
        """Test that older suite jobs acquire hosts first.
 
        Make sure older suite jobs get hosts first, but not at the expense of
        higher priority jobs.
 
        @raises ValueError: If unexpected acquisitions occur, eg:
            suite_job_2 acquires the last 2 hosts instead of suite_job_1.
            isolated_important_job doesn't get any hosts.
            Any job acquires more hosts than necessary.
        """
        board = 'x'
 
        # Create 2 suites such that the later suite has an ordering of deps
        # that places it ahead of the earlier suite, if parent_job_id is
        # ignored.
        suite_without_dep = self.create_suite(num=2, priority=0, board=board)
 
        suite_with_dep = self.create_suite(num=1, priority=0, board=board)
        self.db_helper.add_deps_to_job(suite_with_dep[0], dep_names=list('y'))
 
        # Create an important job that should be ahead of the first suite,
        # because priority trumps parent_job_id and time of creation.
        isolated_important_job = self.create_job(priority=3, deps=set([board]))
 
        # Create 3 hosts, all with the deps to satisfy the last suite.
        for i in range(0, 3):
            self.db_helper.create_host('h%s' % i, deps=set([board, 'y']))
 
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
        def local_response_handler(request_manager):
            """Reorder requests and check host acquisition.
 
            @raises ValueError: If unexpected/no acquisitions occur.
            """
            if any([request for request in request_manager.request_queue
                    if request.parent_job_id is None]):
                raise ValueError('Parent_job_id can never be None.')
 
            # This will result in the ordering:
            # [suite_2_1, suite_1_*, suite_1_*, isolated_important_job]
            # The priority scheduling order should be:
            # [isolated_important_job, suite_1_*, suite_1_*, suite_2_1]
            # Since:
            #   a. the isolated_important_job is the most important.
            #   b. suite_1 was created before suite_2, regardless of deps
            disorderly_queue = sorted(request_manager.request_queue,
                    key=lambda r: -r.parent_job_id)
            request_manager.request_queue = disorderly_queue
            result = request_manager.api_call(request_manager.request_queue)
            if not result:
                raise ValueError('Expected results but got none.')
 
            # Verify that the isolated_important_job got a host, and that the
            # first suite got both remaining free hosts.
            for request, hosts in result.iteritems():
                if request.parent_job_id == 0:
                    if len(hosts) > 1:
                        raise ValueError('First job acquired more hosts than '
                                'necessary. Response map: %s' % result)
                    continue
                if request.parent_job_id == 1:
                    if len(hosts) < 2:
                        raise ValueError('First suite job requests were not '
                                'satisfied. Response_map: %s' % result)
                    continue
                # The second suite job got hosts instead of one of
                # the others. Eitherway this is a failure.
                raise ValueError('Unexpected host acquisition '
                        'Response map: %s' % result)
            yield None
 
        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
                           local_response_handler)
        list(rdb_lib.acquire_hosts(queue_entries))
 
 
    def testConfigurations(self):
        """Test that configurations don't matter.
        @raises AssertionError: If the request doesn't find a host,
                 this will happen if configurations are not stripped out.
        """
        self.god.stub_with(provision.Cleanup,
                           '_actions',
                           {'action': 'fakeTest'})
        job_labels = set(['action', 'a'])
        host_deps = set(['a'])
        db_host = self.db_helper.create_host('h1', deps=host_deps)
        self.create_job(user='autotest_system', deps=job_labels)
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        matching_host = rdb_lib.acquire_hosts(queue_entries).next()
        self.assert_(matching_host.id == db_host.id)
 
 
class RDBMinDutTest(
        rdb_testing_utils.AbstractBaseRDBTester, unittest.TestCase):
    """Test AvailableHostRequestHandler"""
 
    _config_section = 'AUTOTEST_WEB'
 
 
    def min_dut_test_helper(self, num_hosts, suite_settings):
        """A helper function to test min_dut logic.
 
        @param num_hosts: Total number of hosts to create.
        @param suite_settings: A dictionary specify how suites would be created
                               and verified.
                E.g.  {'priority': 10, 'num_jobs': 3,
                       'min_duts':2, 'expected_aquired': 1}
                       With this setting, will create a suite that has 3
                       child jobs, with priority 10 and min_duts 2.
                       The suite is expected to get 1 dut.
        """
        acls = set(['fake_acl'])
        hosts = []
        for i in range (0, num_hosts):
            hosts.append(self.db_helper.create_host(
                'h%d' % i, deps=set(['board:lumpy']), acls=acls))
        suites = {}
        suite_min_duts = {}
        for setting in suite_settings:
            s = self.create_suite(num=setting['num_jobs'],
                                  priority=setting['priority'],
                                  board='board:lumpy', acls=acls)
            # Empty list will be used to store acquired hosts.
            suites[s['parent_job'].id] = (setting, [])
            suite_min_duts[s['parent_job'].id] = setting['min_duts']
        queue_entries = self._dispatcher._refresh_pending_queue_entries()
        matching_hosts = rdb_lib.acquire_hosts(queue_entries, suite_min_duts)
        for host, queue_entry in zip(matching_hosts, queue_entries):
            if host:
                suites[queue_entry.job.parent_job_id][1].append(host)
 
        for setting, hosts in suites.itervalues():
            self.assertEqual(len(hosts),setting['expected_aquired'])
 
 
    def testHighPriorityTakeAll(self):
        """Min duts not satisfied."""
        num_hosts = 1
        suite1 = {'priority':20, 'num_jobs': 3, 'min_duts': 2,
                  'expected_aquired': 1}
        suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,
                  'expected_aquired': 0}
        self.min_dut_test_helper(num_hosts, [suite1, suite2])
 
 
    def testHighPriorityMinSatisfied(self):
        """High priority min duts satisfied."""
        num_hosts = 4
        suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
                  'expected_aquired': 2}
        suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,
                  'expected_aquired': 2}
        self.min_dut_test_helper(num_hosts, [suite1, suite2])
 
 
    def testAllPrioritiesMinSatisfied(self):
        """Min duts satisfied."""
        num_hosts = 7
        suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
                  'expected_aquired': 2}
        suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,
                  'expected_aquired': 5}
        self.min_dut_test_helper(num_hosts, [suite1, suite2])
 
 
    def testHighPrioritySatisfied(self):
        """Min duts satisfied, high priority suite satisfied."""
        num_hosts = 10
        suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
                  'expected_aquired': 4}
        suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,
                  'expected_aquired': 6}
        self.min_dut_test_helper(num_hosts, [suite1, suite2])
 
 
    def testEqualPriorityFirstSuiteMinSatisfied(self):
        """Equal priority, earlier suite got min duts."""
        num_hosts = 4
        suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
                  'expected_aquired': 2}
        suite2 = {'priority':20, 'num_jobs': 7, 'min_duts': 5,
                  'expected_aquired': 2}
        self.min_dut_test_helper(num_hosts, [suite1, suite2])
 
 
    def testEqualPriorityAllSuitesMinSatisfied(self):
        """Equal priority, all suites got min duts."""
        num_hosts = 7
        suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
                  'expected_aquired': 2}
        suite2 = {'priority':20, 'num_jobs': 7, 'min_duts': 5,
                  'expected_aquired': 5}
        self.min_dut_test_helper(num_hosts, [suite1, suite2])
 
 
if __name__ == '__main__':
    unittest.main()