diff options
author | van Hauser <vh@thc.org> | 2023-06-06 17:36:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-06 17:36:04 +0300 |
commit | 4deb45f3b3e9f53880596d21432069b05553bcb3 (patch) | |
tree | 2dcf56dd0b540a4387f050c32ba5f50e7f42d666 /custom_mutators/aflpp_tritondse/aflpp_tritondse.py | |
parent | 8de7f6131d48e27d53e894b65bd11e0dc3817639 (diff) | |
parent | 2f6b54e4410738d92c4981a700541f15e4fbe938 (diff) | |
download | afl++-4deb45f3b3e9f53880596d21432069b05553bcb3.tar.gz |
Merge pull request #1759 from AFLplusplus/dev
Dev
Diffstat (limited to 'custom_mutators/aflpp_tritondse/aflpp_tritondse.py')
-rw-r--r-- | custom_mutators/aflpp_tritondse/aflpp_tritondse.py | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/custom_mutators/aflpp_tritondse/aflpp_tritondse.py b/custom_mutators/aflpp_tritondse/aflpp_tritondse.py new file mode 100644 index 00000000..58b506b6 --- /dev/null +++ b/custom_mutators/aflpp_tritondse/aflpp_tritondse.py @@ -0,0 +1,220 @@ +import sys +import os +import logging +import hashlib + +from tritondse import CleLoader +from tritondse import CompositeData +from tritondse import Config +from tritondse import CoverageStrategy +from tritondse import ProcessState +from tritondse import Program +from tritondse import Seed +from tritondse import SeedFormat +from tritondse import SymbolicExecutor +from tritondse import SymbolicExplorator + +is_debug = False +out_path = "" +input_file = None +prog = None +config = None +dse = None +cycle = 0 +count = 0 +finding = 0 +hashes = set() +format = SeedFormat.RAW + +def pre_exec_hook(se: SymbolicExecutor, state: ProcessState): + global count + global hashes + global finding + if se.seed.hash not in hashes: + hashes.add(se.seed.hash) + finding = 1 + filename = out_path + "/id:" + f"{count:06}" + "," + se.seed.hash + if not os.path.exists(filename): + if is_debug: + print('Creating queue input ' + filename) + with open(filename, 'wb') as file: + if input_file: + file.write(se.seed.content.files[input_file]) + else: + file.write(se.seed.content) + count += 1 + #if input_file: + # if is_debug: + # print('Writing to ' + input_file + ' the content: ' + str(se.seed.content)) + # with open(input_file, 'wb') as file: + # file.write(se.seed.content) + + +#def rtn_open(se: SymbolicExecutor, pstate: ProcessState, pc): +# """ +# The open behavior. +# """ +# logging.debug('open hooked') +# +# # Get arguments +# arg0 = pstate.get_argument_value(0) # const char *pathname +# flags = pstate.get_argument_value(1) # int flags +# mode = pstate.get_argument_value(2) # int mode +# arg0s = pstate.memory.read_string(arg0) +# +# # Concretize the whole path name +# pstate.concretize_memory_bytes(arg0, len(arg0s)+1) # Concretize the whole string + \0 +# +# # We use flags as concrete value +# pstate.concretize_argument(1) +# +# # Use the flags to open the file in the write mode. +# mode = "" +# if (flags & 0xFF) == 0x00: # O_RDONLY +# mode = "r" +# elif (flags & 0xFF) == 0x01: # O_WRONLY +# mode = "w" +# elif (flags & 0xFF) == 0x02: # O_RDWR +# mode = "r+" +# +# if (flags & 0x0100): # O_CREAT +# mode += "x" +# if (flags & 0x0200): # O_APPEND +# mode = "a" # replace completely value +# +# if se.seed.is_file_defined(arg0s) and "r" in mode: # input file and opened in reading +# logging.info(f"opening an input file: {arg0s}") +# # Program is opening an input +# data = se.seed.get_file_input(arg0s) +# filedesc = pstate.create_file_descriptor(arg0s, io.BytesIO(data)) +# fd = filedesc.id +# else: +# # Try to open it as a regular file +# try: +# fd = open(arg0s, mode) # use the mode here +# filedesc = pstate.create_file_descriptor(arg0s, fd) +# fd = filedesc.id +# except Exception as e: +# logging.debug(f"Failed to open {arg0s} {e}") +# fd = pstate.minus_one +# +# pstate.write_register("rax", fd) # write the return value +# pstate.cpu.program_counter = pstate.pop_stack_value() # pop the return value +# se.skip_instruction() # skip the current instruction so that the engine go straight fetching the next instruction + + +def init(seed): + global config + global dse + global format + global input_file + global is_debug + global out_path + global prog + # Load the program (LIEF-based program loader). + prog = CleLoader(os.environ['AFL_CUSTOM_INFO_PROGRAM']) + # Process other configuration environment variables. + argv = None + try: + foo = os.environ['AFL_DEBUG'] + is_debug = True + except KeyError: + pass + if is_debug: + logging.basicConfig(level=logging.WARNING) + else: + logging.basicConfig(level=logging.CRITICAL) + try: + foo = os.environ['AFL_CUSTOM_INFO_OUT'] + out_path = foo + '/../tritondse/queue' + except KeyError: + pass + try: + foo = os.environ['AFL_CUSTOM_INFO_PROGRAM_INPUT'] + input_file = foo + except KeyError: + pass + try: + argv_list = os.environ['AFL_CUSTOM_INFO_PROGRAM_ARGV'] + argv_tmp = [ os.environ['AFL_CUSTOM_INFO_PROGRAM'] ] + argv_tmp += argv_list.split() + argv = [] + # now check for @@ + for item in argv_tmp: + if "@@" in item: + input_file = out_path + '/../.input' + argv.append(input_file) + else: + argv.append(item) + except KeyError: + pass + # Create the output directory + os.makedirs(out_path, exist_ok=True) + # Debug + if is_debug: + print('DEBUG target: ' + os.environ['AFL_CUSTOM_INFO_PROGRAM']) + if argv: + print('DEBUG argv: ') + print(argv) + if input_file: + print('DEBUG input_file: ' + input_file) + print('DEBUG out_path: ' + out_path) + print('') + if input_file: + format = SeedFormat.COMPOSITE + # Now set up TritonDSE + config = Config(coverage_strategy = CoverageStrategy.PATH, + debug = is_debug, + pipe_stdout = is_debug, + pipe_stderr = is_debug, + execution_timeout = 1, + program_argv = argv, + smt_timeout= 50, + seed_format = format) + # Create an instance of the Symbolic Explorator + dse = SymbolicExplorator(config, prog) + # Add callbacks. + dse.callback_manager.register_pre_execution_callback(pre_exec_hook) + #dse.callback_manager.register_function_callback("open", rtn_open) + + +def fuzz(buf, add_buf, max_size): + global finding + finding = 1 + while finding == 1: + finding = 0 + dse.step() + return b"" + + +def queue_new_entry(filename_new_queue, filename_orig_queue): + global cycle + global dse + # Add seed to the worklist. + with open(filename_new_queue, "rb") as file: + data = file.read() + hash = hashlib.md5(data).hexdigest() + if hash not in hashes: + hashes.add(hash) + if is_debug: + print("NEW FILE " + filename_new_queue + " hash " + hash + " count " + str(cycle)) + cycle += 1 + if input_file: + seed = Seed(CompositeData(files={"stdin": b"", # nothing on stdin + input_file: data})) + else: + seed = Seed(data) + dse.add_input_seed(seed) + # Start exploration! + #dse.step() + #dse.explore() + pass + + +# we simulate just doing one single fuzz in the custom mutator +def fuzz_count(buf): + return 1 + + +def splice_optout(): + pass |