From 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5 Mon Sep 17 00:00:00 2001 From: hc <hc@nodka.com> Date: Tue, 22 Oct 2024 10:36:11 +0000 Subject: [PATCH] 修改4g拨号为QMI,需要在系统里后台执行quectel-CM --- kernel/tools/testing/selftests/tc-testing/tdc.py | 237 +++++++++++++++++++++++++++++++++++++++++++--------------- 1 files changed, 174 insertions(+), 63 deletions(-) diff --git a/kernel/tools/testing/selftests/tc-testing/tdc.py b/kernel/tools/testing/selftests/tc-testing/tdc.py index 7607ba3..a3e4318 100755 --- a/kernel/tools/testing/selftests/tc-testing/tdc.py +++ b/kernel/tools/testing/selftests/tc-testing/tdc.py @@ -23,7 +23,11 @@ from tdc_helper import * import TdcPlugin +from TdcResults import * +class PluginDependencyException(Exception): + def __init__(self, missing_pg): + self.missing_pg = missing_pg class PluginMgrTestFail(Exception): def __init__(self, stage, output, message): @@ -36,7 +40,7 @@ super().__init__() self.plugins = {} self.plugin_instances = [] - self.args = [] + self.failed_plugins = {} self.argparser = argparser # TODO, put plugins in order @@ -52,6 +56,64 @@ self.plugins[mn] = foo self.plugin_instances.append(foo.SubPlugin()) + def load_plugin(self, pgdir, pgname): + pgname = pgname[0:-3] + foo = importlib.import_module('{}.{}'.format(pgdir, pgname)) + self.plugins[pgname] = foo + self.plugin_instances.append(foo.SubPlugin()) + self.plugin_instances[-1].check_args(self.args, None) + + def get_required_plugins(self, testlist): + ''' + Get all required plugins from the list of test cases and return + all unique items. + ''' + reqs = [] + for t in testlist: + try: + if 'requires' in t['plugins']: + if isinstance(t['plugins']['requires'], list): + reqs.extend(t['plugins']['requires']) + else: + reqs.append(t['plugins']['requires']) + except KeyError: + continue + reqs = get_unique_item(reqs) + return reqs + + def load_required_plugins(self, reqs, parser, args, remaining): + ''' + Get all required plugins from the list of test cases and load any plugin + that is not already enabled. + ''' + pgd = ['plugin-lib', 'plugin-lib-custom'] + pnf = [] + + for r in reqs: + if r not in self.plugins: + fname = '{}.py'.format(r) + source_path = [] + for d in pgd: + pgpath = '{}/{}'.format(d, fname) + if os.path.isfile(pgpath): + source_path.append(pgpath) + if len(source_path) == 0: + print('ERROR: unable to find required plugin {}'.format(r)) + pnf.append(fname) + continue + elif len(source_path) > 1: + print('WARNING: multiple copies of plugin {} found, using version found') + print('at {}'.format(source_path[0])) + pgdir = source_path[0] + pgdir = pgdir.split('/')[0] + self.load_plugin(pgdir, fname) + if len(pnf) > 0: + raise PluginDependencyException(pnf) + + parser = self.call_add_args(parser) + (args, remaining) = parser.parse_known_args(args=remaining, namespace=args) + return args + def call_pre_suite(self, testcount, testidlist): for pgn_inst in self.plugin_instances: pgn_inst.pre_suite(testcount, testidlist) @@ -60,15 +122,15 @@ for pgn_inst in reversed(self.plugin_instances): pgn_inst.post_suite(index) - def call_pre_case(self, test_ordinal, testid): + def call_pre_case(self, caseinfo, *, test_skip=False): for pgn_inst in self.plugin_instances: try: - pgn_inst.pre_case(test_ordinal, testid) + pgn_inst.pre_case(caseinfo, test_skip) except Exception as ee: print('exception {} in call to pre_case for {} plugin'. format(ee, pgn_inst.__class__)) print('test_ordinal is {}'.format(test_ordinal)) - print('testid is {}'.format(testid)) + print('testid is {}'.format(caseinfo['id'])) raise def call_post_case(self): @@ -97,11 +159,13 @@ command = pgn_inst.adjust_command(stage, command) return command + def set_args(self, args): + self.args = args + @staticmethod def _make_argparser(args): self.argparser = argparse.ArgumentParser( description='Linux TC unit tests') - def replace_keywords(cmd): """ @@ -131,12 +195,16 @@ stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=ENVIR) - (rawout, serr) = proc.communicate() - if proc.returncode != 0 and len(serr) > 0: - foutput = serr.decode("utf-8", errors="ignore") - else: - foutput = rawout.decode("utf-8", errors="ignore") + try: + (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT']) + if proc.returncode != 0 and len(serr) > 0: + foutput = serr.decode("utf-8", errors="ignore") + else: + foutput = rawout.decode("utf-8", errors="ignore") + except subprocess.TimeoutExpired: + foutput = "Command \"{}\" timed out\n".format(command) + proc.returncode = 255 proc.stdout.close() proc.stderr.close() @@ -183,14 +251,24 @@ result = True tresult = "" tap = "" + res = TestResult(tidx['id'], tidx['name']) if args.verbose > 0: print("\t====================\n=====> ", end="") print("Test " + tidx["id"] + ": " + tidx["name"]) + if 'skip' in tidx: + if tidx['skip'] == 'yes': + res = TestResult(tidx['id'], tidx['name']) + res.set_result(ResultState.skip) + res.set_errormsg('Test case designated as skipped.') + pm.call_pre_case(tidx, test_skip=True) + pm.call_post_execute() + return res + # populate NAMES with TESTID for this test NAMES['TESTID'] = tidx['id'] - pm.call_pre_case(index, tidx['id']) + pm.call_pre_case(tidx) prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"]) if (args.verbose > 0): @@ -205,10 +283,11 @@ pm.call_post_execute() if (exit_code is None or exit_code != int(tidx["expExitCode"])): - result = False print("exit: {!r}".format(exit_code)) print("exit: {}".format(int(tidx["expExitCode"]))) #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"]))) + res.set_result(ResultState.fail) + res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout)) print(procout) else: if args.verbose > 0: @@ -219,20 +298,15 @@ if procout: match_index = re.findall(match_pattern, procout) if len(match_index) != int(tidx["matchCount"]): - result = False + res.set_result(ResultState.fail) + res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout)) + else: + res.set_result(ResultState.success) elif int(tidx["matchCount"]) != 0: - result = False - - if not result: - tresult += 'not ' - tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name']) - tap += tresult - - if result == False: - if procout: - tap += procout + res.set_result(ResultState.fail) + res.set_failmsg('No output generated by verify command.') else: - tap += 'No output!\n' + res.set_result(ResultState.success) prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout) pm.call_post_case() @@ -241,7 +315,7 @@ # remove TESTID from NAMES del(NAMES['TESTID']) - return tap + return res def test_runner(pm, args, filtered_tests): """ @@ -261,25 +335,15 @@ emergency_exit = False emergency_exit_message = '' - if args.notap: - if args.verbose: - tap = 'notap requested: omitting test plan\n' - else: - tap = str(index) + ".." + str(tcount) + "\n" + tsr = TestSuiteReport() + try: pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist]) except Exception as ee: ex_type, ex, ex_tb = sys.exc_info() print('Exception {} {} (caught in pre_suite).'. format(ex_type, ex)) - # when the extra print statements are uncommented, - # the traceback does not appear between them - # (it appears way earlier in the tdc.py output) - # so don't bother ... - # print('--------------------(') - # print('traceback') traceback.print_tb(ex_tb) - # print('--------------------)') emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex) emergency_exit = True stage = 'pre-SUITE' @@ -292,18 +356,31 @@ time.sleep(2) for tidx in testlist: if "flower" in tidx["category"] and args.device == None: + errmsg = "Tests using the DEV2 variable must define the name of a " + errmsg += "physical NIC with the -d option when running tdc.\n" + errmsg += "Test has been skipped." if args.verbose > 1: - print('Not executing test {} {} because DEV2 not defined'. - format(tidx['id'], tidx['name'])) + print(errmsg) + res = TestResult(tidx['id'], tidx['name']) + res.set_result(ResultState.skip) + res.set_errormsg(errmsg) + tsr.add_resultdata(res) continue try: badtest = tidx # in case it goes bad - tap += run_one_test(pm, args, index, tidx) + res = run_one_test(pm, args, index, tidx) + tsr.add_resultdata(res) except PluginMgrTestFail as pmtf: ex_type, ex, ex_tb = sys.exc_info() stage = pmtf.stage message = pmtf.message output = pmtf.output + res = TestResult(tidx['id'], tidx['name']) + res.set_result(ResultState.skip) + res.set_errormsg(pmtf.message) + res.set_failmsg(pmtf.output) + tsr.add_resultdata(res) + index += 1 print(message) print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'. format(ex_type, ex, index, tidx['id'], tidx['name'], stage)) @@ -322,16 +399,16 @@ # if we failed in setup or teardown, # fill in the remaining tests with ok-skipped count = index - if not args.notap: - tap += 'about to flush the tap output if tests need to be skipped\n' - if tcount + 1 != index: - for tidx in testlist[index - 1:]: - msg = 'skipped - previous {} failed'.format(stage) - tap += 'ok {} - {} # {} {} {}\n'.format( - count, tidx['id'], msg, index, badtest.get('id', '--Unknown--')) - count += 1 - tap += 'done flushing skipped test tap output\n' + if tcount + 1 != count: + for tidx in testlist[count - 1:]: + res = TestResult(tidx['id'], tidx['name']) + res.set_result(ResultState.skip) + msg = 'skipped - previous {} failed {} {}'.format(stage, + index, badtest.get('id', '--Unknown--')) + res.set_errormsg(msg) + tsr.add_resultdata(res) + count += 1 if args.pause: print('Want to pause\nPress enter to continue ...') @@ -340,7 +417,7 @@ pm.call_post_suite(index) - return tap + return tsr def has_blank_ids(idlist): """ @@ -381,6 +458,10 @@ Set the command line arguments for tdc. """ parser.add_argument( + '--outfile', type=str, + help='Path to the file in which results should be saved. ' + + 'Default target is the current directory.') + parser.add_argument( '-p', '--path', type=str, help='The full path to the tc executable to use') sg = parser.add_argument_group( @@ -416,10 +497,13 @@ '-v', '--verbose', action='count', default=0, help='Show the commands that are being run') parser.add_argument( - '-N', '--notap', action='store_true', - help='Suppress tap results for command under test') + '--format', default='tap', const='tap', nargs='?', + choices=['none', 'xunit', 'tap'], + help='Specify the format for test results. (Default: TAP)') parser.add_argument('-d', '--device', - help='Execute the test case in flower category') + help='Execute test cases that use a physical device, ' + + 'where DEVICE is its name. (If not defined, tests ' + + 'that require a physical device will be skipped)') parser.add_argument( '-P', '--pause', action='store_true', help='Pause execution just before post-suite stage') @@ -438,6 +522,8 @@ NAMES['TC'] = args.path if args.device != None: NAMES['DEV2'] = args.device + if 'TIMEOUT' not in NAMES: + NAMES['TIMEOUT'] = None if not os.path.isfile(NAMES['TC']): print("The specified tc path " + NAMES['TC'] + " does not exist.") exit(1) @@ -532,6 +618,7 @@ return answer + def get_test_cases(args): """ If a test case file is specified, retrieve tests from that file. @@ -593,7 +680,7 @@ return allcatlist, allidlist, testcases_by_cats, alltestcases -def set_operation_mode(pm, args): +def set_operation_mode(pm, parser, args, remaining): """ Load the test case data and process remaining arguments to determine what the script should do for this run, and call the appropriate @@ -626,18 +713,41 @@ exit(0) if args.list: - if args.list: - list_test_cases(alltests) - exit(0) + list_test_cases(alltests) + exit(0) if len(alltests): + req_plugins = pm.get_required_plugins(alltests) + try: + args = pm.load_required_plugins(req_plugins, parser, args, remaining) + except PluginDependencyException as pde: + print('The following plugins were not found:') + print('{}'.format(pde.missing_pg)) catresults = test_runner(pm, args, alltests) + if args.format == 'none': + print('Test results output suppression requested\n') + else: + print('\nAll test results: \n') + if args.format == 'xunit': + suffix = 'xml' + res = catresults.format_xunit() + elif args.format == 'tap': + suffix = 'tap' + res = catresults.format_tap() + print(res) + print('\n\n') + if not args.outfile: + fname = 'test-results.{}'.format(suffix) + else: + fname = args.outfile + with open(fname, 'w') as fh: + fh.write(res) + fh.close() + if os.getenv('SUDO_UID') is not None: + os.chown(fname, uid=int(os.getenv('SUDO_UID')), + gid=int(os.getenv('SUDO_GID'))) else: - catresults = 'No tests found\n' - if args.notap: - print('Tap output suppression requested\n') - else: - print('All test results: \n\n{}'.format(catresults)) + print('No tests found\n') def main(): """ @@ -650,11 +760,12 @@ parser = pm.call_add_args(parser) (args, remaining) = parser.parse_known_args() args.NAMES = NAMES + pm.set_args(args) check_default_settings(args, remaining, pm) if args.verbose > 2: print('args is {}'.format(args)) - set_operation_mode(pm, args) + set_operation_mode(pm, parser, args, remaining) exit(0) -- Gitblit v1.6.2