import sys
|
import os
|
import unittest
|
|
sys.path.insert(0, os.getcwd())
|
|
import pybootchartgui.parsing as parsing
|
import pybootchartgui.process_tree as process_tree
|
import pybootchartgui.main as main
|
|
if sys.version_info >= (3, 0):
|
long = int
|
|
class TestProcessTree(unittest.TestCase):
|
|
def setUp(self):
|
self.name = "Process tree unittest"
|
self.rootdir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/')
|
|
parser = main._mk_options_parser()
|
options, args = parser.parse_args(['--q', self.rootdir])
|
writer = main._mk_writer(options)
|
trace = parsing.Trace(writer, args, options)
|
|
parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log'))
|
trace.compile(writer)
|
self.processtree = process_tree.ProcessTree(writer, None, trace.ps_stats, \
|
trace.ps_stats.sample_period, None, options.prune, None, None, False, for_testing = True)
|
|
def mk_fname(self,f):
|
return os.path.join(self.rootdir, f)
|
|
def flatten(self, process_tree):
|
flattened = []
|
for p in process_tree:
|
flattened.append(p)
|
flattened.extend(self.flatten(p.child_list))
|
return flattened
|
|
def checkAgainstJavaExtract(self, filename, process_tree):
|
test_data = open(filename)
|
for expected, actual in zip(test_data, self.flatten(process_tree)):
|
tokens = expected.split('\t')
|
self.assertEqual(int(tokens[0]), actual.pid // 1000)
|
self.assertEqual(tokens[1], actual.cmd)
|
self.assertEqual(long(tokens[2]), 10 * actual.start_time)
|
self.assert_(long(tokens[3]) - 10 * actual.duration < 5, "duration")
|
self.assertEqual(int(tokens[4]), len(actual.child_list))
|
self.assertEqual(int(tokens[5]), len(actual.samples))
|
test_data.close()
|
|
def testBuild(self):
|
process_tree = self.processtree.process_tree
|
self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.1.log'), process_tree)
|
|
def testMergeLogger(self):
|
self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
|
process_tree = self.processtree.process_tree
|
self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.2.log'), process_tree)
|
|
def testPrune(self):
|
self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
|
self.processtree.prune(self.processtree.process_tree, None)
|
process_tree = self.processtree.process_tree
|
self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3b.log'), process_tree)
|
|
def testMergeExploders(self):
|
self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
|
self.processtree.prune(self.processtree.process_tree, None)
|
self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
|
process_tree = self.processtree.process_tree
|
self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3c.log'), process_tree)
|
|
def testMergeSiblings(self):
|
self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
|
self.processtree.prune(self.processtree.process_tree, None)
|
self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
|
self.processtree.merge_siblings(self.processtree.process_tree)
|
process_tree = self.processtree.process_tree
|
self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3d.log'), process_tree)
|
|
def testMergeRuns(self):
|
self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
|
self.processtree.prune(self.processtree.process_tree, None)
|
self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
|
self.processtree.merge_siblings(self.processtree.process_tree)
|
self.processtree.merge_runs(self.processtree.process_tree)
|
process_tree = self.processtree.process_tree
|
self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3e.log'), process_tree)
|
|
if __name__ == '__main__':
|
unittest.main()
|