ronnie
2022-10-14 1504bb53e29d3d46222c0b3ea994fc494b48e153
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
#!/usr/bin/python
 
"""Tests for site_sysinfo."""
 
__author__ = 'dshi@google.com (Dan Shi)'
 
import cPickle as pickle
import filecmp
import os
import random
import shutil
import tempfile
import unittest
 
import common
from autotest_lib.client.bin import site_sysinfo
from autotest_lib.client.common_lib import autotemp
 
 
class diffable_logdir_test(unittest.TestCase):
    """Tests for methods in class diffable_logdir."""
 
 
    def setUp(self):
        """Initialize a temp direcotry with test files."""
        self.tempdir = autotemp.tempdir(unique_id='diffable_logdir')
        self.src_dir = os.path.join(self.tempdir.name, 'src')
        self.dest_dir = os.path.join(self.tempdir.name, 'dest')
 
        self.existing_files = ['existing_file_'+str(i) for i in range(3)]
        self.existing_files_folder = ['', 'sub', 'sub/sub2']
        self.existing_files_path = [os.path.join(self.src_dir, folder, f)
                                    for f,folder in zip(self.existing_files,
                                                self.existing_files_folder)]
        self.new_files = ['new_file_'+str(i) for i in range(2)]
        self.new_files_folder = ['sub', 'sub/sub3']
        self.new_files_path = [os.path.join(self.src_dir, folder, f)
                                    for f,folder in zip(self.new_files,
                                                self.new_files_folder)]
 
        # Create some file with random data in source directory.
        for p in self.existing_files_path:
            self.append_text_to_file(str(random.random()), p)
 
        self.existing_fifo_path = os.path.join(
            self.src_dir,'sub/sub2/existing_fifo')
        os.mkfifo(self.existing_fifo_path)
 
 
    def tearDown(self):
        """Clearn up."""
        self.tempdir.clean()
 
 
    def append_text_to_file(self, text, file_path):
        """Append text to the end of a file, create the file if not existed.
 
        @param text: text to be appended to a file.
        @param file_path: path to the file.
 
        """
        dir_name = os.path.dirname(file_path)
        if not os.path.exists(dir_name):
            os.makedirs(dir_name)
        with open(file_path, 'a') as f:
            f.write(text)
 
 
    def assert_trees_equal(self, dir1, dir2, ignore=None):
        """Assert two directory trees contain the same files.
 
        @param dir1: the left comparison directory.
        @param dir2: the right comparison directory.
        @param ignore: filenames to ignore (in any directory).
 
        """
        dircmps = []
        dircmps.append(filecmp.dircmp(dir1, dir2, ignore))
        while dircmps:
            dcmp = dircmps.pop()
            self.assertEqual(dcmp.left_list, dcmp.right_list)
            self.assertEqual([], dcmp.diff_files)
            dircmps.extend(dcmp.subdirs.values())
 
 
    def test_diffable_logdir_success(self):
        """Test the diff function to save new data from a directory."""
        info = site_sysinfo.diffable_logdir(self.src_dir,
                                            keep_file_hierarchy=False,
                                            append_diff_in_name=False)
        # Run the first time to collect file status.
        info.run(log_dir=None, collect_init_status=True)
 
        # Add new files to the test directory.
        for file_name, file_path in zip(self.new_files,
                                         self.new_files_path):
            self.append_text_to_file(file_name, file_path)
 
        # Temp file for existing_file_2, used to hold on the inode. If the
        # file is deleted and recreated, its inode might not change.
        existing_file_2 = self.existing_files_path[2]
        existing_file_2_tmp =  existing_file_2 + '_tmp'
        os.rename(existing_file_2, existing_file_2_tmp)
 
        # Append data to existing file.
        for file_name, file_path in zip(self.existing_files,
                                         self.existing_files_path):
            self.append_text_to_file(file_name, file_path)
 
        # Remove the tmp file.
        os.remove(existing_file_2_tmp)
 
        # Add a new FIFO
        new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo')
        os.mkfifo(new_fifo_path)
 
        # Run the second time to do diff.
        info.run(self.dest_dir, collect_init_status=False, collect_all=True)
 
        # Validate files in dest_dir.
        for file_name, file_path in zip(self.existing_files+self.new_files,
                                self.existing_files_path+self.new_files_path):
            file_path = file_path.replace('src', 'dest')
            with open(file_path, 'r') as f:
                self.assertEqual(file_name, f.read())
 
        # Assert that FIFOs are not in the diff.
        self.assertFalse(
                os.path.exists(self.existing_fifo_path.replace('src', 'dest')),
                msg='Existing FIFO present in diff sysinfo')
        self.assertFalse(
                os.path.exists(new_fifo_path.replace('src', 'dest')),
                msg='New FIFO present in diff sysinfo')
 
        # With collect_all=True, full sysinfo should also be present.
        full_sysinfo_path = self.dest_dir + self.src_dir
        self.assertTrue(os.path.exists(full_sysinfo_path),
                        msg='Full sysinfo not present')
 
        # Assert that the full sysinfo is present.
        self.assertNotEqual(self.src_dir, full_sysinfo_path)
        self.assert_trees_equal(self.src_dir, full_sysinfo_path)
 
 
class LogdirTestCase(unittest.TestCase):
    """Tests logdir.run"""
 
    def setUp(self):
        self.tempdir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.tempdir)
 
        self.from_dir = os.path.join(self.tempdir, 'from')
        self.to_dir = os.path.join(self.tempdir, 'to')
        os.mkdir(self.from_dir)
        os.mkdir(self.to_dir)
 
    def _destination_path(self, relative_path, from_dir=None):
        """The expected destination path for a subdir of the source directory"""
        if from_dir is None:
            from_dir = self.from_dir
        return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path))
 
    def test_run_recreates_absolute_source_path(self):
        """When copying files, the absolute path from the source is recreated
        in the destination folder.
        """
        os.mkdir(os.path.join(self.from_dir, 'fubar'))
        logdir = site_sysinfo.logdir(self.from_dir)
        logdir.run(self.to_dir)
        destination_path= self._destination_path('fubar')
        self.assertTrue(os.path.exists(destination_path),
                        msg='Failed to copy to %s' % destination_path)
 
    def test_run_skips_symlinks(self):
        os.mkdir(os.path.join(self.from_dir, 'real'))
        os.symlink(os.path.join(self.from_dir, 'real'),
                   os.path.join(self.from_dir, 'symlink'))
 
        logdir = site_sysinfo.logdir(self.from_dir)
        logdir.run(self.to_dir)
 
        destination_path_real = self._destination_path('real')
        self.assertTrue(os.path.exists(destination_path_real),
                        msg='real directory was not copied to %s' %
                        destination_path_real)
        destination_path_link = self._destination_path('symlink')
        self.assertFalse(
                os.path.exists(destination_path_link),
                msg='symlink was copied to %s' % destination_path_link)
 
    def test_run_resolves_symlinks_to_source_root(self):
        """run tries hard to get to the source directory before copying.
 
        Within the source folder, we skip any symlinks, but we first try to
        resolve symlinks to the source root itself.
        """
        os.mkdir(os.path.join(self.from_dir, 'fubar'))
        from_symlink = os.path.join(self.tempdir, 'from_symlink')
        os.symlink(self.from_dir, from_symlink)
 
        logdir = site_sysinfo.logdir(from_symlink)
        logdir.run(self.to_dir)
 
        destination_path = self._destination_path('fubar')
        self.assertTrue(os.path.exists(destination_path),
                        msg='Failed to copy to %s' % destination_path)
 
    def test_run_excludes_common_patterns(self):
        os.mkdir(os.path.join(self.from_dir, 'autoserv2344'))
        # Create empty file.
        open(os.path.join(self.from_dir, 'system.journal'), 'w').close()
        deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix')
        os.makedirs(os.path.join(self.from_dir, deeper_subdir))
 
        logdir = site_sysinfo.logdir(self.from_dir)
        logdir.run(self.to_dir)
 
        destination_path = self._destination_path('autoserv2344')
        self.assertFalse(os.path.exists(destination_path),
                         msg='Copied banned file to %s' % destination_path)
        destination_path = self._destination_path(deeper_subdir)
        self.assertFalse(os.path.exists(destination_path),
                         msg='Copied banned file to %s' % destination_path)
        destination_path = self._destination_path('system.journal')
        self.assertFalse(os.path.exists(destination_path),
                         msg='Copied banned file to %s' % destination_path)
 
    def test_run_ignores_exclude_patterns_in_leading_dirs(self):
        """Exclude patterns should only be applied to path suffixes within
        from_dir, not to the root directory itself.
        """
        exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344')
        os.makedirs(os.path.join(exclude_pattern_dir, 'fubar'))
        logdir = site_sysinfo.logdir(exclude_pattern_dir)
        logdir.run(self.to_dir)
        destination_path = self._destination_path('fubar',
                                                  from_dir=exclude_pattern_dir)
        self.assertTrue(os.path.exists(destination_path),
                        msg='Failed to copy to %s' % destination_path)
 
    def test_pickle_unpickle_equal(self):
        """Sanity check pickle-unpickle round-trip."""
        logdir = site_sysinfo.logdir(
                self.from_dir,
                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        unpickled_logdir = pickle.loads(logdir_pickle)
 
        self.assertEqual(unpickled_logdir, logdir)
 
    def test_pickle_enforce_required_attributes(self):
        """Some attributes from this object should never be dropped.
 
        When running a client test against an older build, we pickle the objects
        of this class from newer version of the class and unpicle to older
        versions of the class. The older versions require some attributes to be
        present.
        """
        logdir = site_sysinfo.logdir(
                self.from_dir,
                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        logdir = pickle.loads(logdir_pickle)
 
        self.assertEqual(logdir.additional_exclude, 'a')
 
    def test_pickle_enforce_required_attributes_default(self):
        """Some attributes from this object should never be dropped.
 
        When running a client test against an older build, we pickle the objects
        of this class from newer version of the class and unpicle to older
        versions of the class. The older versions require some attributes to be
        present.
        """
        logdir = site_sysinfo.logdir(self.from_dir)
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        logdir = pickle.loads(logdir_pickle)
 
        self.assertEqual(logdir.additional_exclude, None)
 
    def test_unpickle_handle_missing__excludes(self):
        """Sanely handle missing _excludes attribute from pickles
 
        This can happen when running brand new version of this class that
        introduced this attribute from older server side code in prod.
        """
        logdir = site_sysinfo.logdir(self.from_dir)
        delattr(logdir, '_excludes')
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        logdir = pickle.loads(logdir_pickle)
 
        self.assertEqual(logdir._excludes,
                         site_sysinfo.logdir.DEFAULT_EXCLUDES)
 
    def test_unpickle_handle_missing__excludes_default(self):
        """Sanely handle missing _excludes attribute from pickles
 
        This can happen when running brand new version of this class that
        introduced this attribute from older server side code in prod.
        """
        logdir = site_sysinfo.logdir(
                self.from_dir,
                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
        delattr(logdir, '_excludes')
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        logdir = pickle.loads(logdir_pickle)
 
        self.assertEqual(
                logdir._excludes,
                (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
 
 
if __name__ == '__main__':
    unittest.main()