huangcm
2025-02-24 69ed55dec4b2116a19e4cca4393cbc014fce5fb2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
#
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
 
"""A tool for running puffin tests in a corpus of deflate compressed files."""
 
import argparse
import filecmp
import logging
import os
import subprocess
import sys
import tempfile
 
_PUFFHUFF = 'puffhuff'
_PUFFDIFF = 'puffdiff'
TESTS = (_PUFFHUFF, _PUFFDIFF)
 
 
class Error(Exception):
  """Puffin general processing error."""
 
 
def ParseArguments(argv):
  """Parses and Validates command line arguments.
 
  Args:
    argv: command line arguments to parse.
 
  Returns:
    The arguments list.
  """
  parser = argparse.ArgumentParser()
 
  parser.add_argument('corpus', metavar='CORPUS',
                      help='A corpus directory containing compressed files')
  parser.add_argument('-d', '--disabled_tests', default=(), metavar='',
                      nargs='*',
                      help=('Space separated list of tests to disable. '
                            'Allowed options include: ' + ', '.join(TESTS)),
                      choices=TESTS)
  parser.add_argument('--cache_size', type=int, metavar='SIZE',
                      help='The size (in bytes) of the cache for puffpatch '
                      'operations.')
  parser.add_argument('--debug', action='store_true',
                      help='Turns on verbosity.')
 
  # Parse command-line arguments.
  args = parser.parse_args(argv)
 
  if not os.path.isdir(args.corpus):
    raise Error('Corpus directory {} is non-existent or inaccesible'
                .format(args.corpus))
  return args
 
 
def main(argv):
  """The main function."""
  args = ParseArguments(argv[1:])
 
  if args.debug:
    logging.getLogger().setLevel(logging.DEBUG)
 
  # Construct list of appropriate files.
  files = list(filter(os.path.isfile, [os.path.join(args.corpus, f)
                                       for f in os.listdir(args.corpus)]))
 
  # For each file in corpus run puffhuff.
  if _PUFFHUFF not in args.disabled_tests:
    for src in files:
      with tempfile.NamedTemporaryFile() as tgt_file:
 
        operation = 'puffhuff'
        logging.debug('Running %s on %s', operation, src)
        cmd = ['puffin',
               '--operation={}'.format(operation),
               '--src_file={}'.format(src),
               '--dst_file={}'.format(tgt_file.name)]
        if subprocess.call(cmd) != 0:
          raise Error('Puffin failed to do {} command: {}'
                      .format(operation, cmd))
 
        if not filecmp.cmp(src, tgt_file.name):
          raise Error('The generated file {} is not equivalent to the original '
                      'file {} after {} operation.'
                      .format(tgt_file.name, src, operation))
 
  if _PUFFDIFF not in args.disabled_tests:
    # Run puffdiff and puffpatch for each pairs of files in the corpus.
    for src in files:
      for tgt in files:
        with tempfile.NamedTemporaryFile() as patch, \
             tempfile.NamedTemporaryFile() as new_tgt:
 
          operation = 'puffdiff'
          logging.debug('Running %s on %s (%d) and %s (%d)',
                        operation,
                        os.path.basename(src), os.stat(src).st_size,
                        os.path.basename(tgt), os.stat(tgt).st_size)
          cmd = ['puffin',
                 '--operation={}'.format(operation),
                 '--src_file={}'.format(src),
                 '--dst_file={}'.format(tgt),
                 '--patch_file={}'.format(patch.name)]
 
          # Running the puffdiff operation
          if subprocess.call(cmd) != 0:
            raise Error('Puffin failed to do {} command: {}'
                        .format(operation, cmd))
 
          logging.debug('Patch size is: %d', os.stat(patch.name).st_size)
 
          operation = 'puffpatch'
          logging.debug('Running %s on src file %s and patch %s',
                        operation, os.path.basename(src), patch.name)
          cmd = ['puffin',
                 '--operation={}'.format(operation),
                 '--src_file={}'.format(src),
                 '--dst_file={}'.format(new_tgt.name),
                 '--patch_file={}'.format(patch.name)]
          if args.cache_size:
            cmd += ['--cache_size={}'.format(args.cache_size)]
 
          # Running the puffpatch operation
          if subprocess.call(cmd) != 0:
            raise Error('Puffin failed to do {} command: {}'
                        .format(operation, cmd))
 
          if not filecmp.cmp(tgt, new_tgt.name):
            raise Error('The generated file {} is not equivalent to the '
                        'original file {} after puffpatch operation.'
                        .format(new_tgt.name, tgt))
 
  return 0
 
 
if __name__ == '__main__':
  sys.exit(main(sys.argv))