351 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			351 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| """
 | |
| This is a generic fuzz testing tool, see --help for more information.
 | |
| """
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import random
 | |
| import subprocess
 | |
| import itertools
 | |
| 
 | |
| class TestGenerator:
 | |
|     def __init__(self, inputs, delete, insert, replace,
 | |
|                  insert_strings, pick_input):
 | |
|         self.inputs = [(s, open(s).read()) for s in inputs]
 | |
| 
 | |
|         self.delete = bool(delete)
 | |
|         self.insert = bool(insert)
 | |
|         self.replace = bool(replace)
 | |
|         self.pick_input = bool(pick_input)
 | |
|         self.insert_strings = list(insert_strings)
 | |
| 
 | |
|         self.num_positions = sum([len(d) for _,d in self.inputs])
 | |
|         self.num_insert_strings = len(insert_strings)
 | |
|         self.num_tests = ((delete + (insert + replace)*self.num_insert_strings)
 | |
|                           * self.num_positions)
 | |
|         self.num_tests += 1
 | |
| 
 | |
|         if self.pick_input:
 | |
|             self.num_tests *= self.num_positions
 | |
| 
 | |
|     def position_to_source_index(self, position):
 | |
|         for i,(s,d) in enumerate(self.inputs):
 | |
|             n = len(d)
 | |
|             if position < n:
 | |
|                 return (i,position)
 | |
|             position -= n
 | |
|         raise ValueError,'Invalid position.'
 | |
| 
 | |
|     def get_test(self, index):
 | |
|         assert 0 <= index < self.num_tests
 | |
| 
 | |
|         picked_position = None
 | |
|         if self.pick_input:
 | |
|             index,picked_position = divmod(index, self.num_positions)
 | |
|             picked_position = self.position_to_source_index(picked_position)
 | |
| 
 | |
|         if index == 0:
 | |
|             return ('nothing', None, None, picked_position)
 | |
| 
 | |
|         index -= 1
 | |
|         index,position = divmod(index, self.num_positions)
 | |
|         position = self.position_to_source_index(position)
 | |
|         if self.delete:
 | |
|             if index == 0:
 | |
|                 return ('delete', position, None, picked_position)
 | |
|             index -= 1
 | |
| 
 | |
|         index,insert_index = divmod(index, self.num_insert_strings)
 | |
|         insert_str = self.insert_strings[insert_index]
 | |
|         if self.insert:
 | |
|             if index == 0:
 | |
|                 return ('insert', position, insert_str, picked_position)
 | |
|             index -= 1
 | |
| 
 | |
|         assert self.replace
 | |
|         assert index == 0
 | |
|         return ('replace', position, insert_str, picked_position)
 | |
| 
 | |
| class TestApplication:
 | |
|     def __init__(self, tg, test):
 | |
|         self.tg = tg
 | |
|         self.test = test
 | |
| 
 | |
|     def apply(self):
 | |
|         if self.test[0] == 'nothing':
 | |
|             pass
 | |
|         else:
 | |
|             i,j = self.test[1]
 | |
|             name,data = self.tg.inputs[i]
 | |
|             if self.test[0] == 'delete':
 | |
|                 data = data[:j] + data[j+1:]
 | |
|             elif self.test[0] == 'insert':
 | |
|                 data = data[:j] + self.test[2] + data[j:]
 | |
|             elif self.test[0] == 'replace':
 | |
|                 data = data[:j] + self.test[2] + data[j+1:]
 | |
|             else:
 | |
|                 raise ValueError,'Invalid test %r' % self.test
 | |
|             open(name,'wb').write(data)
 | |
| 
 | |
|     def revert(self):
 | |
|         if self.test[0] != 'nothing':
 | |
|             i,j = self.test[1]
 | |
|             name,data = self.tg.inputs[i]
 | |
|             open(name,'wb').write(data)
 | |
| 
 | |
| def quote(str):
 | |
|     return '"' + str + '"'
 | |
|         
 | |
| def run_one_test(test_application, index, input_files, args):
 | |
|     test = test_application.test
 | |
| 
 | |
|     # Interpolate arguments.
 | |
|     options = { 'index' : index,
 | |
|                 'inputs' : ' '.join(quote(f) for f in input_files) }
 | |
| 
 | |
|     # Add picked input interpolation arguments, if used.
 | |
|     if test[3] is not None:
 | |
|         pos = test[3][1]
 | |
|         options['picked_input'] = input_files[test[3][0]]
 | |
|         options['picked_input_pos'] = pos
 | |
|         # Compute the line and column.
 | |
|         file_data = test_application.tg.inputs[test[3][0]][1]
 | |
|         line = column = 1
 | |
|         for i in range(pos):
 | |
|             c = file_data[i]
 | |
|             if c == '\n':
 | |
|                 line += 1
 | |
|                 column = 1
 | |
|             else:
 | |
|                 column += 1
 | |
|         options['picked_input_line'] = line
 | |
|         options['picked_input_col'] = column
 | |
|         
 | |
|     test_args = [a % options for a in args]
 | |
|     if opts.verbose:
 | |
|         print '%s: note: executing %r' % (sys.argv[0], test_args)
 | |
| 
 | |
|     stdout = None
 | |
|     stderr = None
 | |
|     if opts.log_dir:
 | |
|         stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index)
 | |
|         stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index)
 | |
|         stdout = open(stdout_log_path, 'wb')
 | |
|         stderr = open(stderr_log_path, 'wb')
 | |
|     else:
 | |
|         sys.stdout.flush()
 | |
|     p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr)
 | |
|     p.communicate()
 | |
|     exit_code = p.wait()
 | |
| 
 | |
|     test_result = (exit_code == opts.expected_exit_code or
 | |
|                    exit_code in opts.extra_exit_codes)
 | |
| 
 | |
|     if stdout is not None:
 | |
|         stdout.close()
 | |
|         stderr.close()
 | |
| 
 | |
|         # Remove the logs for passes, unless logging all results.
 | |
|         if not opts.log_all and test_result:
 | |
|             os.remove(stdout_log_path)
 | |
|             os.remove(stderr_log_path)
 | |
| 
 | |
|     if not test_result:
 | |
|         print 'FAIL: %d' % index
 | |
|     elif not opts.succinct:
 | |
|         print 'PASS: %d' % index
 | |
|     return test_result
 | |
| 
 | |
| def main():
 | |
|     global opts
 | |
|     from optparse import OptionParser, OptionGroup
 | |
|     parser = OptionParser("""%prog [options] ... test command args ...
 | |
| 
 | |
| %prog is a tool for fuzzing inputs and testing them.
 | |
| 
 | |
| The most basic usage is something like:
 | |
| 
 | |
|   $ %prog --file foo.txt ./test.sh
 | |
| 
 | |
| which will run a default list of fuzzing strategies on the input. For each
 | |
| fuzzed input, it will overwrite the input files (in place), run the test script,
 | |
| then restore the files back to their original contents.
 | |
| 
 | |
| NOTE: You should make sure you have a backup copy of your inputs, in case
 | |
| something goes wrong!!!
 | |
| 
 | |
| You can cause the fuzzing to not restore the original files with
 | |
| '--no-revert'. Generally this is used with '--test <index>' to run one failing
 | |
| test and then leave the fuzzed inputs in place to examine the failure.
 | |
| 
 | |
| For each fuzzed input, %prog will run the test command given on the command
 | |
| line. Each argument in the command is subject to string interpolation before
 | |
| being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard
 | |
| printf format, and VARIABLE is one of:
 | |
| 
 | |
|   'index' - the test index being run
 | |
|   'inputs' - the full list of test inputs
 | |
|   'picked_input'      - (with --pick-input) the selected input file
 | |
|   'picked_input_pos'  - (with --pick-input) the selected input position
 | |
|   'picked_input_line' - (with --pick-input) the selected input line
 | |
|   'picked_input_col'  - (with --pick-input) the selected input column
 | |
| 
 | |
| By default, the script will run forever continually picking new tests to
 | |
| run. You can limit the number of tests that are run with '--max-tests <number>',
 | |
| and you can run a particular test with '--test <index>'.
 | |
| 
 | |
| You can specify '--stop-on-fail' to stop the script on the first failure
 | |
| without reverting the changes.
 | |
| 
 | |
| """)
 | |
|     parser.add_option("-v", "--verbose", help="Show more output",
 | |
|                       action='store_true', dest="verbose", default=False)
 | |
|     parser.add_option("-s", "--succinct",  help="Reduce amount of output",
 | |
|                       action="store_true", dest="succinct", default=False)
 | |
| 
 | |
|     group = OptionGroup(parser, "Test Execution")
 | |
|     group.add_option("", "--expected-exit-code", help="Set expected exit code",
 | |
|                      type=int, dest="expected_exit_code",
 | |
|                      default=0)
 | |
|     group.add_option("", "--extra-exit-code",
 | |
|                      help="Set additional expected exit code",
 | |
|                      type=int, action="append", dest="extra_exit_codes",
 | |
|                      default=[])
 | |
|     group.add_option("", "--log-dir",
 | |
|                      help="Capture test logs to an output directory",
 | |
|                      type=str, dest="log_dir",
 | |
|                      default=None)
 | |
|     group.add_option("", "--log-all",
 | |
|                      help="Log all outputs (not just failures)",
 | |
|                      action="store_true", dest="log_all", default=False)
 | |
|     parser.add_option_group(group)
 | |
| 
 | |
|     group = OptionGroup(parser, "Input Files")
 | |
|     group.add_option("", "--file", metavar="PATH",
 | |
|                      help="Add an input file to fuzz",
 | |
|                      type=str, action="append", dest="input_files", default=[])
 | |
|     group.add_option("", "--filelist", metavar="LIST",
 | |
|                      help="Add a list of inputs files to fuzz (one per line)",
 | |
|                      type=str, action="append", dest="filelists", default=[])
 | |
|     parser.add_option_group(group)
 | |
| 
 | |
|     group = OptionGroup(parser, "Fuzz Options")
 | |
|     group.add_option("", "--replacement-chars", dest="replacement_chars",
 | |
|                      help="Characters to insert/replace",
 | |
|                      default="0{}[]<>\;@#$^%& ")
 | |
|     group.add_option("", "--replacement-string", dest="replacement_strings",
 | |
|                      action="append", help="Add a replacement string to use",
 | |
|                      default=[])
 | |
|     group.add_option("", "--replacement-list", dest="replacement_lists",
 | |
|                      help="Add a list of replacement strings (one per line)",
 | |
|                      action="append", default=[])
 | |
|     group.add_option("", "--no-delete", help="Don't delete characters",
 | |
|                      action='store_false', dest="enable_delete", default=True)
 | |
|     group.add_option("", "--no-insert", help="Don't insert strings",
 | |
|                      action='store_false', dest="enable_insert", default=True)
 | |
|     group.add_option("", "--no-replace", help="Don't replace strings",
 | |
|                      action='store_false', dest="enable_replace", default=True)
 | |
|     group.add_option("", "--no-revert", help="Don't revert changes",
 | |
|                      action='store_false', dest="revert", default=True)
 | |
|     group.add_option("", "--stop-on-fail", help="Stop on first failure",
 | |
|                      action='store_true', dest="stop_on_fail", default=False)
 | |
|     parser.add_option_group(group)
 | |
| 
 | |
|     group = OptionGroup(parser, "Test Selection")
 | |
|     group.add_option("", "--test", help="Run a particular test",
 | |
|                      type=int, dest="test", default=None, metavar="INDEX")
 | |
|     group.add_option("", "--max-tests", help="Maximum number of tests",
 | |
|                      type=int, dest="max_tests", default=None, metavar="COUNT")
 | |
|     group.add_option("", "--pick-input",
 | |
|                      help="Randomly select an input byte as well as fuzzing",
 | |
|                      action='store_true', dest="pick_input", default=False)
 | |
|     parser.add_option_group(group)
 | |
| 
 | |
|     parser.disable_interspersed_args()
 | |
| 
 | |
|     (opts, args) = parser.parse_args()
 | |
| 
 | |
|     if not args:
 | |
|         parser.error("Invalid number of arguments")
 | |
| 
 | |
|     # Collect the list of inputs.
 | |
|     input_files = list(opts.input_files)
 | |
|     for filelist in opts.filelists:
 | |
|         f = open(filelist)
 | |
|         try:
 | |
|             for ln in f:
 | |
|                 ln = ln.strip()
 | |
|                 if ln:
 | |
|                     input_files.append(ln)
 | |
|         finally:
 | |
|             f.close()
 | |
|     input_files.sort()
 | |
| 
 | |
|     if not input_files:
 | |
|         parser.error("No input files!")
 | |
| 
 | |
|     print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files))
 | |
| 
 | |
|     # Make sure the log directory exists if used.
 | |
|     if opts.log_dir:
 | |
|         if not os.path.exists(opts.log_dir):
 | |
|             try:
 | |
|                 os.mkdir(opts.log_dir)
 | |
|             except OSError:
 | |
|                 print "%s: error: log directory couldn't be created!" % (
 | |
|                     sys.argv[0],)
 | |
|                 raise SystemExit,1
 | |
| 
 | |
|     # Get the list if insert/replacement strings.
 | |
|     replacements = list(opts.replacement_chars)
 | |
|     replacements.extend(opts.replacement_strings)
 | |
|     for replacement_list in opts.replacement_lists:
 | |
|         f = open(replacement_list)
 | |
|         try:
 | |
|             for ln in f:
 | |
|                 ln = ln[:-1]
 | |
|                 if ln:
 | |
|                     replacements.append(ln)
 | |
|         finally:
 | |
|             f.close()
 | |
| 
 | |
|     # Unique and order the replacement list.
 | |
|     replacements = list(set(replacements))
 | |
|     replacements.sort()
 | |
| 
 | |
|     # Create the test generator.
 | |
|     tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert,
 | |
|                        opts.enable_replace, replacements, opts.pick_input)
 | |
| 
 | |
|     print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions)
 | |
|     print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests)
 | |
|     if opts.test is not None:
 | |
|         it = [opts.test]
 | |
|     elif opts.max_tests is not None:
 | |
|         it = itertools.imap(random.randrange,
 | |
|                             itertools.repeat(tg.num_tests, opts.max_tests))
 | |
|     else:
 | |
|         it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests))
 | |
|     for test in it:
 | |
|         t = tg.get_test(test)
 | |
| 
 | |
|         if opts.verbose:
 | |
|             print '%s: note: running test %d: %r' % (sys.argv[0], test, t)
 | |
|         ta = TestApplication(tg, t)
 | |
|         try:
 | |
|             ta.apply()
 | |
|             test_result = run_one_test(ta, test, input_files, args)
 | |
|             if not test_result and opts.stop_on_fail:
 | |
|                 opts.revert = False
 | |
|                 sys.exit(1)
 | |
|         finally:
 | |
|             if opts.revert:
 | |
|                 ta.revert()
 | |
| 
 | |
|         sys.stdout.flush()
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |