hc
2024-01-03 2f7c68cb55ecb7331f2381deb497c27155f32faf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import sys, os, re, struct, operator, math
from collections import defaultdict
import unittest
 
sys.path.insert(0, os.getcwd())
 
import pybootchartgui.parsing as parsing
import pybootchartgui.main as main
 
debug = False
 
def floatEq(f1, f2):
   return math.fabs(f1-f2) < 0.00001
 
bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/')
parser = main._mk_options_parser()
options, args = parser.parse_args(['--q', bootchart_dir])
writer = main._mk_writer(options)
 
class TestBCParser(unittest.TestCase):
    
   def setUp(self):
       self.name = "My first unittest"
       self.rootdir = bootchart_dir
 
   def mk_fname(self,f):
       return os.path.join(self.rootdir, f)
 
   def testParseHeader(self):
       trace = parsing.Trace(writer, args, options)
       state = parsing.parse_file(writer, trace, self.mk_fname('header'))
       self.assertEqual(6, len(state.headers))
       self.assertEqual(2, parsing.get_num_cpus(state.headers))
 
   def test_parseTimedBlocks(self):
       trace = parsing.Trace(writer, args, options)
       state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log'))
       self.assertEqual(141, len(state.disk_stats))        
 
   def testParseProcPsLog(self):
       trace = parsing.Trace(writer, args, options)
       state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log'))
       samples = state.ps_stats
       processes = samples.process_map
       sorted_processes = [processes[k] for k in sorted(processes.keys())]
 
       ps_data = open(self.mk_fname('extract2.proc_ps.log'))
       for index, line in enumerate(ps_data):
           tokens = line.split();
           process = sorted_processes[index]
           if debug:
               print(tokens[0:4])
               print(process.pid / 1000, process.cmd, process.ppid, len(process.samples))
               print('-------------------')
 
           self.assertEqual(tokens[0], str(process.pid // 1000))
           self.assertEqual(tokens[1], str(process.cmd))
           self.assertEqual(tokens[2], str(process.ppid // 1000))
           self.assertEqual(tokens[3], str(len(process.samples)))
       ps_data.close()
 
   def testparseProcDiskStatLog(self):
       trace = parsing.Trace(writer, args, options)
       state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header'))
       state_with_headers.headers['system.cpu'] = 'xxx (2)'
       samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats
       self.assertEqual(141, len(samples))
 
       diskstats_data = open(self.mk_fname('extract.proc_diskstats.log'))
       for index, line in enumerate(diskstats_data):
           tokens = line.split('\t')
           sample = samples[index]
           if debug:        
               print(line.rstrip())
               print(sample)
               print('-------------------')
           
           self.assertEqual(tokens[0], str(sample.time))
           self.assert_(floatEq(float(tokens[1]), sample.read))
           self.assert_(floatEq(float(tokens[2]), sample.write))
           self.assert_(floatEq(float(tokens[3]), sample.util))
       diskstats_data.close()
   
   def testparseProcStatLog(self):
       trace = parsing.Trace(writer, args, options)
       samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats
       self.assertEqual(141, len(samples))
 
       stat_data = open(self.mk_fname('extract.proc_stat.log'))
       for index, line in enumerate(stat_data):
           tokens = line.split('\t')
           sample = samples[index]
           if debug:
               print(line.rstrip())
               print(sample)
               print('-------------------')
           self.assert_(floatEq(float(tokens[0]), sample.time))
           self.assert_(floatEq(float(tokens[1]), sample.user))
           self.assert_(floatEq(float(tokens[2]), sample.sys))
           self.assert_(floatEq(float(tokens[3]), sample.io))
       stat_data.close()
 
if __name__ == '__main__':
    unittest.main()