diff options
Diffstat (limited to 'unicorn_mode')
-rw-r--r-- | unicorn_mode/README.md | 55 | ||||
-rw-r--r-- | unicorn_mode/UNICORNAFL_VERSION | 2 | ||||
-rwxr-xr-x | unicorn_mode/build_unicorn_support.sh | 6 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/ida_context_loader.py | 197 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_dumper_gdb.py | 108 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_dumper_ida.py | 207 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_dumper_lldb.py | 241 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py | 143 | ||||
-rw-r--r-- | unicorn_mode/samples/c/COMPILE.md | 2 | ||||
-rw-r--r-- | unicorn_mode/samples/compcov_x64/compcov_test_harness.py | 76 | ||||
-rw-r--r-- | unicorn_mode/samples/simple/simple_test_harness.py | 79 | ||||
-rw-r--r-- | unicorn_mode/samples/simple/simple_test_harness_alt.py | 100 | ||||
-rwxr-xr-x[-rw-r--r--] | unicorn_mode/samples/speedtest/get_offsets.py | 0 | ||||
-rw-r--r-- | unicorn_mode/samples/speedtest/python/harness.py | 6 | ||||
m--------- | unicorn_mode/unicornafl | 0 |
15 files changed, 880 insertions, 342 deletions
diff --git a/unicorn_mode/README.md b/unicorn_mode/README.md index f6bd4d12..b3df44fa 100644 --- a/unicorn_mode/README.md +++ b/unicorn_mode/README.md @@ -8,19 +8,19 @@ The CompareCoverage and NeverZero counters features are by Andrea Fioraldi <andr ## 1) Introduction -The code in ./unicorn_mode allows you to build a standalone feature that -leverages the Unicorn Engine and allows callers to obtain instrumentation +The code in ./unicorn_mode allows you to build the (Unicorn Engine)[https://github.com/unicorn-engine/unicorn] with afl support. +This means, you can run anything that can be emulated in unicorn and obtain instrumentation output for black-box, closed-source binary code snippets. This mechanism can be then used by afl-fuzz to stress-test targets that couldn't be built -with afl-gcc or used in QEMU mode, or with other extensions such as -TriforceAFL. +with afl-cc or used in QEMU mode. There is a significant performance penalty compared to native AFL, but at least we're able to use AFL++ on these binaries, right? ## 2) How to use -Requirements: you need an installed python environment. +First, you will need a working harness for your target in unicorn, using Python, C, or Rust. +For some pointers for more advanced emulation, take a look at [BaseSAFE](https://github.com/fgsect/BaseSAFE) and [Qiling](https://github.com/qilingframework/qiling). ### Building AFL++'s Unicorn Mode @@ -34,23 +34,23 @@ cd unicorn_mode ``` NOTE: This script checks out a Unicorn Engine fork as submodule that has been tested -and is stable-ish, based on the unicorn engine master. +and is stable-ish, based on the unicorn engine `next` branch. Building Unicorn will take a little bit (~5-10 minutes). Once it completes it automatically compiles a sample application and verifies that it works. ### Fuzzing with Unicorn Mode -To really use unicorn-mode effectively you need to prepare the following: +To use unicorn-mode effectively you need to prepare the following: * Relevant binary code to be fuzzed * Knowledge of the memory map and good starting state * Folder containing sample inputs to start fuzzing with + Same ideas as any other AFL inputs - + Quality/speed of results will depend greatly on quality of starting + + Quality/speed of results will depend greatly on the quality of starting samples + See AFL's guidance on how to create a sample corpus - * Unicornafl-based test harness which: + * Unicornafl-based test harness in Rust, C, or Python, which: + Adds memory map regions + Loads binary code into memory + Calls uc.afl_fuzz() / uc.afl_start_forkserver @@ -59,13 +59,13 @@ To really use unicorn-mode effectively you need to prepare the following: the test harness + Presumably the data to be fuzzed is at a fixed buffer address + If input constraints (size, invalid bytes, etc.) are known they - should be checked after the file is loaded. If a constraint - fails, just exit the test harness. AFL will treat the input as + should be checked in the place_input handler. If a constraint + fails, just return false from the handler. AFL will treat the input as 'uninteresting' and move on. + Sets up registers and memory state for beginning of test - + Emulates the interested code from beginning to end + + Emulates the interesting code from beginning to end + If a crash is detected, the test harness must 'crash' by - throwing a signal (SIGSEGV, SIGKILL, SIGABORT, etc.) + throwing a signal (SIGSEGV, SIGKILL, SIGABORT, etc.), or indicate a crash in the crash validation callback. Once you have all those things ready to go you just need to run afl-fuzz in 'unicorn-mode' by passing in the '-U' flag: @@ -79,11 +79,12 @@ AFL's main documentation for more info about how to use afl-fuzz effectively. For a much clearer vision of what all of this looks like, please refer to the sample provided in the 'unicorn_mode/samples' directory. There is also a blog -post that goes over the basics at: +post that uses slightly older concepts, but describes the general ideas, at: [https://medium.com/@njvoss299/afl-unicorn-fuzzing-arbitrary-binary-code-563ca28936bf](https://medium.com/@njvoss299/afl-unicorn-fuzzing-arbitrary-binary-code-563ca28936bf) -The 'helper_scripts' directory also contains several helper scripts that allow you + +The ['helper_scripts'](./helper_scripts) directory also contains several helper scripts that allow you to dump context from a running process, load it, and hook heap allocations. For details on how to use this check out the follow-up blog post to the one linked above. @@ -92,10 +93,10 @@ A example use of AFL-Unicorn mode is discussed in the paper Unicorefuzz: ## 3) Options -As for the QEMU-based instrumentation, the afl-unicorn twist of afl++ -comes with a sub-instruction based instrumentation similar in purpose to laf-intel. +As for the QEMU-based instrumentation, unicornafl comes with a sub-instruction based instrumentation similar in purpose to laf-intel. The options that enable Unicorn CompareCoverage are the same used for QEMU. +This will split up each multi-byte compare to give feedback for each correct byte. AFL_COMPCOV_LEVEL=1 is to instrument comparisons with only immediate values. AFL_COMPCOV_LEVEL=2 instruments all comparison instructions. @@ -119,6 +120,20 @@ unicornafl.monkeypatch() This will replace all unicorn imports with unicornafl inputs. -Refer to the [samples/arm_example/arm_tester.c](samples/arm_example/arm_tester.c) for an example -of how to do this properly! If you don't get this right, AFL will not -load any mutated inputs and your fuzzing will be useless! +5) Examples + +Apart from reading the documentation in `afl.c` and the python bindings of unicornafl, the best documentation are the [samples/](./samples). +The following examples exist at the time of writing: + +- c: A simple example how to use the c bindings +- compcov_x64: A python example that uses compcov to traverse hard-to-reach blocks +- persistent: A c example using persistent mode for maximum speed, and resetting the target state between each iteration +- simple: A simple python example +- speedtest/c: The c harness for an example target, used to compare c, python, and rust bindings and fix speed issues +- speedtest/python: Fuzzing the same target in python +- speedtest/rust: Fuzzing the same target using a rust harness + +Usually, the place to look at is the `harness` in each folder. The source code in each harness is pretty well documented. +Most harnesses also have the `afl-fuzz` commandline, or even offer a `make fuzz` Makefile target. +Targets in these folders, if x86, can usually be made using `make target` in each folder or get shipped pre-built (plus their source). +Especially take a look at the [speedtest documentation](./samples/speedtest/README.md) to see how the languages compare. \ No newline at end of file diff --git a/unicorn_mode/UNICORNAFL_VERSION b/unicorn_mode/UNICORNAFL_VERSION index 4d8a03b2..d9ae5590 100644 --- a/unicorn_mode/UNICORNAFL_VERSION +++ b/unicorn_mode/UNICORNAFL_VERSION @@ -1 +1 @@ -80d31ef3 +fb2fc9f2 diff --git a/unicorn_mode/build_unicorn_support.sh b/unicorn_mode/build_unicorn_support.sh index f1d028f8..6c376f8d 100755 --- a/unicorn_mode/build_unicorn_support.sh +++ b/unicorn_mode/build_unicorn_support.sh @@ -117,19 +117,19 @@ done # some python version should be available now PYTHONS="`command -v python3` `command -v python` `command -v python2`" -EASY_INSTALL_FOUND=0 +SETUPTOOLS_FOUND=0 for PYTHON in $PYTHONS ; do if $PYTHON -c "import setuptools" ; then - EASY_INSTALL_FOUND=1 + SETUPTOOLS_FOUND=1 PYTHONBIN=$PYTHON break fi done -if [ "0" = $EASY_INSTALL_FOUND ]; then +if [ "0" = $SETUPTOOLS_FOUND ]; then echo "[-] Error: Python setup-tools not found. Run 'sudo apt-get install python-setuptools', or install python3-setuptools, or run '$PYTHONBIN -m ensurepip', or create a virtualenv, or ..." PREREQ_NOTFOUND=1 diff --git a/unicorn_mode/helper_scripts/ida_context_loader.py b/unicorn_mode/helper_scripts/ida_context_loader.py new file mode 100644 index 00000000..31d47a90 --- /dev/null +++ b/unicorn_mode/helper_scripts/ida_context_loader.py @@ -0,0 +1,197 @@ +# Copyright (c) 2021 Brandon Miller (zznop) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +"""IDA script for loading state that was dumped from a running process using unicorn AFL's GDB +plugin (unicorn_dumper_gdb.py). The dumper script can be found in the AFL++ repository at: +https://github.com/AFLplusplus/AFLplusplus/blob/stable/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py +""" + +import json +from pathlib import Path, PurePath +import zlib +import idaapi +import ida_bytes +import ida_kernwin +import ida_nalt +import ida_segment + + +class ContextLoaderError(Exception): + """Base "catch all" exception for this script + """ + + +class ArchNotSupportedError(ContextLoaderError): + """Exception raised if the input file CPU architecture isn't supported fully + """ + + +def parse_mapping_index(filepath: str): + """Open and unmarshal the _index.json file + + :param filepath: Path to the JSON file + :return: Dict representing index file contents + """ + + if filepath is None: + raise ContextLoaderError('_index.json file was not selected') + + try: + with open(filepath, 'rb') as _file: + return json.load(_file) + except Exception as ex: + raise ContextLoaderError('Failed to parse json file {}'.format(filepath)) from ex + +def get_input_name(): + """Get the name of the input file + + :retrun: Name of the input file + """ + + input_filepath = ida_nalt.get_input_file_path() + return Path(input_filepath).name + +def write_segment_bytes(start: int, filepath: str): + """"Read data from context file and write it to the IDA segment + + :param start: Start address + :param filepath: Path to context file + """ + + with open(filepath, 'rb') as _file: + data = _file.read() + + decompressed_data = zlib.decompress(data) + ida_bytes.put_bytes(start, decompressed_data) + +def create_segment(context_dir: str, segment: dict, is_be: bool): + """Create segment in IDA and map in the data from the file + + :param context_dir: Parent directory of the context files + :param segment: Segment information from _index.json + :param is_be: True if processor is big endian, otherwise False + """ + + input_name = get_input_name() + if Path(segment['name']).name != input_name: + ida_seg = idaapi.segment_t() + ida_seg.start_ea = segment['start'] + ida_seg.end_ea = segment['end'] + ida_seg.bitness = 1 if is_be else 0 + if segment['permissions']['r']: + ida_seg.perm |= ida_segment.SEGPERM_READ + if segment['permissions']['w']: + ida_seg.perm |= ida_segment.SEGPERM_WRITE + if segment['permissions']['x']: + ida_seg.perm |= ida_segment.SEGPERM_EXEC + idaapi.add_segm_ex(ida_seg, Path(segment['name']).name, 'CODE', idaapi.ADDSEG_OR_DIE) + else: + idaapi.add_segm_ex(ida_seg, Path(segment['name']).name, 'DATA', idaapi.ADDSEG_OR_DIE) + + if segment['content_file']: + write_segment_bytes(segment['start'], PurePath(context_dir, segment['content_file'])) + +def create_segments(index: dict, context_dir: str): + """Iterate segments in index JSON, create the segment in IDA, and map in the data from the file + + :param index: _index.json JSON data + :param context_dir: Parent directory of the context files + """ + + info = idaapi.get_inf_structure() + is_be = info.is_be() + for segment in index['segments']: + create_segment(context_dir, segment, is_be) + +def rebase_program(index: dict): + """Rebase the program to the offset specified in the context _index.json + + :param index: _index.json JSON data + """ + + input_name = get_input_name() + new_base = None + for segment in index['segments']: + if not segment['name']: + continue + + segment_name = Path(segment['name']).name + if input_name == segment_name: + new_base = segment['start'] + break + + if not new_base: + raise ContextLoaderError('Input file is not in _index.json') + + current_base = idaapi.get_imagebase() + ida_segment.rebase_program(new_base-current_base, 8) + +def get_pc_by_arch(index: dict) -> int: + """Queries the input file CPU architecture and attempts to lookup the address of the program + counter in the _index.json by register name + + :param index: _index.json JSON data + :return: Program counter value or None + """ + + progctr = None + info = idaapi.get_inf_structure() + if info.procname == 'metapc': + if info.is_64bit(): + progctr = index['regs']['rax'] + elif info.is_32bit(): + progctr = index['regs']['eax'] + return progctr + +def write_reg_info(index: dict): + """Write register info as line comment at instruction pointed to by the program counter and + change focus to that location + + :param index: _index.json JSON data + """ + + cmt = '' + for reg, val in index['regs'].items(): + cmt += f"{reg.ljust(6)} : {hex(val)}\n" + + progctr = get_pc_by_arch(index) + if progctr is None: + raise ArchNotSupportedError( + 'Architecture not fully supported, skipping register status comment') + ida_bytes.set_cmt(progctr, cmt, 0) + ida_kernwin.jumpto(progctr) + +def main(filepath): + """Main - parse _index.json input and map context files into the database + + :param filepath: Path to the _index.json file + """ + + try: + index = parse_mapping_index(filepath) + context_dir = Path(filepath).parent + rebase_program(index) + create_segments(index, context_dir) + write_reg_info(index) + except ContextLoaderError as ex: + print(ex) + +if __name__ == '__main__': + main(ida_kernwin.ask_file(1, '*.json', 'Import file name')) diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py b/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py index 8c8f9641..1ac4c9f3 100644 --- a/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py +++ b/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py @@ -45,30 +45,31 @@ MAX_SEG_SIZE = 128 * 1024 * 1024 INDEX_FILE_NAME = "_index.json" -#---------------------- -#---- Helper Functions +# ---------------------- +# ---- Helper Functions + def map_arch(): - arch = get_arch() # from GEF - if 'x86_64' in arch or 'x86-64' in arch: + arch = get_arch() # from GEF + if "x86_64" in arch or "x86-64" in arch: return "x64" - elif 'x86' in arch or 'i386' in arch: + elif "x86" in arch or "i386" in arch: return "x86" - elif 'aarch64' in arch or 'arm64' in arch: + elif "aarch64" in arch or "arm64" in arch: return "arm64le" - elif 'aarch64_be' in arch: + elif "aarch64_be" in arch: return "arm64be" - elif 'armeb' in arch: + elif "armeb" in arch: # check for THUMB mode - cpsr = get_register('$cpsr') - if (cpsr & (1 << 5)): + cpsr = get_register("$cpsr") + if cpsr & (1 << 5): return "armbethumb" else: return "armbe" - elif 'arm' in arch: + elif "arm" in arch: # check for THUMB mode - cpsr = get_register('$cpsr') - if (cpsr & (1 << 5)): + cpsr = get_register("$cpsr") + if cpsr & (1 << 5): return "armlethumb" else: return "armle" @@ -76,8 +77,9 @@ def map_arch(): return "" -#----------------------- -#---- Dumping functions +# ----------------------- +# ---- Dumping functions + def dump_arch_info(): arch_info = {} @@ -89,7 +91,7 @@ def dump_regs(): reg_state = {} for reg in current_arch.all_registers: reg_val = get_register(reg) - reg_state[reg.strip().strip('$')] = reg_val + reg_state[reg.strip().strip("$")] = reg_val return reg_state @@ -108,47 +110,76 @@ def dump_process_memory(output_dir): if entry.page_start == entry.page_end: continue - seg_info = {'start': entry.page_start, 'end': entry.page_end, 'name': entry.path, 'permissions': { - "r": entry.is_readable() > 0, - "w": entry.is_writable() > 0, - "x": entry.is_executable() > 0 - }, 'content_file': ''} + seg_info = { + "start": entry.page_start, + "end": entry.page_end, + "name": entry.path, + "permissions": { + "r": entry.is_readable() > 0, + "w": entry.is_writable() > 0, + "x": entry.is_executable() > 0, + }, + "content_file": "", + } # "(deleted)" may or may not be valid, but don't push it. - if entry.is_readable() and not '(deleted)' in entry.path: + if entry.is_readable() and not "(deleted)" in entry.path: try: # Compress and dump the content to a file seg_content = read_memory(entry.page_start, entry.size) - if(seg_content == None): - print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(entry.page_start, entry.path)) + if seg_content == None: + print( + "Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format( + entry.page_start, entry.path + ) + ) else: - print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(entry.page_start, len(seg_content), entry.path, repr(seg_info['permissions']))) + print( + "Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format( + entry.page_start, + len(seg_content), + entry.path, + repr(seg_info["permissions"]), + ) + ) compressed_seg_content = zlib.compress(seg_content) md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" seg_info["content_file"] = md5_sum # Write the compressed contents to disk - out_file = open(os.path.join(output_dir, md5_sum), 'wb') + out_file = open(os.path.join(output_dir, md5_sum), "wb") out_file.write(compressed_seg_content) out_file.close() except: - print("Exception reading segment ({}): {}".format(entry.path, sys.exc_info()[0])) + print( + "Exception reading segment ({}): {}".format( + entry.path, sys.exc_info()[0] + ) + ) else: - print("Skipping segment {0}@0x{1:016x}".format(entry.path, entry.page_start)) + print( + "Skipping segment {0}@0x{1:016x}".format(entry.path, entry.page_start) + ) # Add the segment to the list final_segment_list.append(seg_info) - return final_segment_list -#--------------------------------------------- -#---- ARM Extention (dump floating point regs) + +# --------------------------------------------- +# ---- ARM Extention (dump floating point regs) + def dump_float(rge=32): reg_convert = "" - if map_arch() == "armbe" or map_arch() == "armle" or map_arch() == "armbethumb" or map_arch() == "armbethumb": + if ( + map_arch() == "armbe" + or map_arch() == "armle" + or map_arch() == "armbethumb" + or map_arch() == "armbethumb" + ): reg_state = {} for reg_num in range(32): value = gdb.selected_frame().read_register("d" + str(reg_num)) @@ -158,8 +189,10 @@ def dump_float(rge=32): return reg_state -#---------- -#---- Main + +# ---------- +# ---- Main + def main(): print("----- Unicorn Context Dumper -----") @@ -175,7 +208,9 @@ def main(): try: # Create the output directory - timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') + timestamp = datetime.datetime.fromtimestamp(time.time()).strftime( + "%Y%m%d_%H%M%S" + ) output_path = "UnicornContext_" + timestamp if not os.path.exists(output_path): os.makedirs(output_path) @@ -190,7 +225,7 @@ def main(): } # Write the index file - index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') + index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w") index_file.write(json.dumps(context, indent=4)) index_file.close() print("Done.") @@ -198,5 +233,6 @@ def main(): except Exception as e: print("!!! ERROR:\n\t{}".format(repr(e))) + if __name__ == "__main__": main() diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_ida.py b/unicorn_mode/helper_scripts/unicorn_dumper_ida.py index 3f955a5c..fa29fb90 100644 --- a/unicorn_mode/helper_scripts/unicorn_dumper_ida.py +++ b/unicorn_mode/helper_scripts/unicorn_dumper_ida.py @@ -31,8 +31,9 @@ MAX_SEG_SIZE = 128 * 1024 * 1024 # Name of the index file INDEX_FILE_NAME = "_index.json" -#---------------------- -#---- Helper Functions +# ---------------------- +# ---- Helper Functions + def get_arch(): if ph.id == PLFM_386 and ph.flag & PR_USE64: @@ -52,6 +53,7 @@ def get_arch(): else: return "" + def get_register_list(arch): if arch == "arm64le" or arch == "arm64be": arch = "arm64" @@ -59,84 +61,174 @@ def get_register_list(arch): arch = "arm" registers = { - "x64" : [ - "rax", "rbx", "rcx", "rdx", "rsi", "rdi", "rbp", "rsp", - "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", - "rip", "rsp", "efl", - "cs", "ds", "es", "fs", "gs", "ss", + "x64": [ + "rax", + "rbx", + "rcx", + "rdx", + "rsi", + "rdi", + "rbp", + "rsp", + "r8", + "r9", + "r10", + "r11", + "r12", + "r13", + "r14", + "r15", + "rip", + "rsp", + "efl", + "cs", + "ds", + "es", + "fs", + "gs", + "ss", + ], + "x86": [ + "eax", + "ebx", + "ecx", + "edx", + "esi", + "edi", + "ebp", + "esp", + "eip", + "esp", + "efl", + "cs", + "ds", + "es", + "fs", + "gs", + "ss", ], - "x86" : [ - "eax", "ebx", "ecx", "edx", "esi", "edi", "ebp", "esp", - "eip", "esp", "efl", - "cs", "ds", "es", "fs", "gs", "ss", - ], - "arm" : [ - "R0", "R1", "R2", "R3", "R4", "R5", "R6", "R7", - "R8", "R9", "R10", "R11", "R12", "PC", "SP", "LR", + "arm": [ + "R0", + "R1", + "R2", + "R3", + "R4", + "R5", + "R6", + "R7", + "R8", + "R9", + "R10", + "R11", + "R12", + "PC", + "SP", + "LR", "PSR", ], - "arm64" : [ - "X0", "X1", "X2", "X3", "X4", "X5", "X6", "X7", - "X8", "X9", "X10", "X11", "X12", "X13", "X14", - "X15", "X16", "X17", "X18", "X19", "X20", "X21", - "X22", "X23", "X24", "X25", "X26", "X27", "X28", - "PC", "SP", "FP", "LR", "CPSR" + "arm64": [ + "X0", + "X1", + "X2", + "X3", + "X4", + "X5", + "X6", + "X7", + "X8", + "X9", + "X10", + "X11", + "X12", + "X13", + "X14", + "X15", + "X16", + "X17", + "X18", + "X19", + "X20", + "X21", + "X22", + "X23", + "X24", + "X25", + "X26", + "X27", + "X28", + "PC", + "SP", + "FP", + "LR", + "CPSR" # "NZCV", - ] + ], } - return registers[arch] + return registers[arch] + + +# ----------------------- +# ---- Dumping functions -#----------------------- -#---- Dumping functions def dump_arch_info(): arch_info = {} arch_info["arch"] = get_arch() return arch_info + def dump_regs(): reg_state = {} for reg in get_register_list(get_arch()): reg_state[reg] = GetRegValue(reg) return reg_state + def dump_process_memory(output_dir): # Segment information dictionary segment_list = [] - + # Loop over the segments, fill in the info dictionary for seg_ea in Segments(): seg_start = SegStart(seg_ea) seg_end = SegEnd(seg_ea) seg_size = seg_end - seg_start - + seg_info = {} - seg_info["name"] = SegName(seg_ea) + seg_info["name"] = SegName(seg_ea) seg_info["start"] = seg_start - seg_info["end"] = seg_end - + seg_info["end"] = seg_end + perms = getseg(seg_ea).perm seg_info["permissions"] = { - "r": False if (perms & SEGPERM_READ) == 0 else True, + "r": False if (perms & SEGPERM_READ) == 0 else True, "w": False if (perms & SEGPERM_WRITE) == 0 else True, - "x": False if (perms & SEGPERM_EXEC) == 0 else True, + "x": False if (perms & SEGPERM_EXEC) == 0 else True, } if (perms & SEGPERM_READ) and seg_size <= MAX_SEG_SIZE and isLoaded(seg_start): try: # Compress and dump the content to a file seg_content = get_many_bytes(seg_start, seg_end - seg_start) - if(seg_content == None): - print("Segment empty: {0}@0x{1:016x} (size:UNKNOWN)".format(SegName(seg_ea), seg_ea)) + if seg_content == None: + print( + "Segment empty: {0}@0x{1:016x} (size:UNKNOWN)".format( + SegName(seg_ea), seg_ea + ) + ) seg_info["content_file"] = "" else: - print("Dumping segment {0}@0x{1:016x} (size:{2})".format(SegName(seg_ea), seg_ea, len(seg_content))) + print( + "Dumping segment {0}@0x{1:016x} (size:{2})".format( + SegName(seg_ea), seg_ea, len(seg_content) + ) + ) compressed_seg_content = zlib.compress(seg_content) md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" seg_info["content_file"] = md5_sum - + # Write the compressed contents to disk - out_file = open(os.path.join(output_dir, md5_sum), 'wb') + out_file = open(os.path.join(output_dir, md5_sum), "wb") out_file.write(compressed_seg_content) out_file.close() except: @@ -145,12 +237,13 @@ def dump_process_memory(output_dir): else: print("Skipping segment {0}@0x{1:016x}".format(SegName(seg_ea), seg_ea)) seg_info["content_file"] = "" - + # Add the segment to the list - segment_list.append(seg_info) - + segment_list.append(seg_info) + return segment_list + """ TODO: FINISH IMPORT DUMPING def import_callback(ea, name, ord): @@ -169,41 +262,47 @@ def dump_imports(): return import_dict """ - -#---------- -#---- Main - + +# ---------- +# ---- Main + + def main(): try: print("----- Unicorn Context Dumper -----") print("You must be actively debugging before running this!") - print("If it fails, double check that you are actively debugging before running.") + print( + "If it fails, double check that you are actively debugging before running." + ) # Create the output directory - timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') + timestamp = datetime.datetime.fromtimestamp(time.time()).strftime( + "%Y%m%d_%H%M%S" + ) output_path = os.path.dirname(os.path.abspath(GetIdbPath())) output_path = os.path.join(output_path, "UnicornContext_" + timestamp) if not os.path.exists(output_path): os.makedirs(output_path) print("Process context will be output to {}".format(output_path)) - + # Get the context context = { "arch": dump_arch_info(), - "regs": dump_regs(), + "regs": dump_regs(), "segments": dump_process_memory(output_path), - #"imports": dump_imports(), + # "imports": dump_imports(), } # Write the index file - index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') + index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w") index_file.write(json.dumps(context, indent=4)) - index_file.close() + index_file.close() print("Done.") - + except Exception, e: print("!!! ERROR:\n\t{}".format(str(e))) - + + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py b/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py index 3c019d77..179d062a 100644 --- a/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py +++ b/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py @@ -50,10 +50,11 @@ UNICORN_PAGE_SIZE = 0x1000 # Alignment functions to align all memory segments to Unicorn page boundaries (4KB pages only) ALIGN_PAGE_DOWN = lambda x: x & ~(UNICORN_PAGE_SIZE - 1) -ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE-1) +ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE - 1) + +# ---------------------- +# ---- Helper Functions -#---------------------- -#---- Helper Functions def overlap_alignments(segments, memory): final_list = [] @@ -61,33 +62,40 @@ def overlap_alignments(segments, memory): curr_end_addr = 0 curr_node = None current_segment = None - sorted_segments = sorted(segments, key=lambda k: (k['start'], k['end'])) + sorted_segments = sorted(segments, key=lambda k: (k["start"], k["end"])) if curr_seg_idx < len(sorted_segments): current_segment = sorted_segments[curr_seg_idx] - for mem in sorted(memory, key=lambda k: (k['start'], -k['end'])): + for mem in sorted(memory, key=lambda k: (k["start"], -k["end"])): if curr_node is None: - if current_segment is not None and current_segment['start'] == mem['start']: + if current_segment is not None and current_segment["start"] == mem["start"]: curr_node = deepcopy(current_segment) - curr_node['permissions'] = mem['permissions'] + curr_node["permissions"] = mem["permissions"] else: curr_node = deepcopy(mem) - curr_end_addr = curr_node['end'] - - while curr_end_addr <= mem['end']: - if curr_node['end'] == mem['end']: - if current_segment is not None and current_segment['start'] > curr_node['start'] and current_segment['start'] < curr_node['end']: - curr_node['end'] = current_segment['start'] - if(curr_node['end'] > curr_node['start']): + curr_end_addr = curr_node["end"] + + while curr_end_addr <= mem["end"]: + if curr_node["end"] == mem["end"]: + if ( + current_segment is not None + and current_segment["start"] > curr_node["start"] + and current_segment["start"] < curr_node["end"] + ): + curr_node["end"] = current_segment["start"] + if curr_node["end"] > curr_node["start"]: final_list.append(curr_node) curr_node = deepcopy(current_segment) - curr_node['permissions'] = mem['permissions'] - curr_end_addr = curr_node['end'] + curr_node["permissions"] = mem["permissions"] + curr_end_addr = curr_node["end"] else: - if(curr_node['end'] > curr_node['start']): + if curr_node["end"] > curr_node["start"]: final_list.append(curr_node) # if curr_node is a segment - if current_segment is not None and current_segment['end'] == mem['end']: + if ( + current_segment is not None + and current_segment["end"] == mem["end"] + ): curr_seg_idx += 1 if curr_seg_idx < len(sorted_segments): current_segment = sorted_segments[curr_seg_idx] @@ -98,50 +106,56 @@ def overlap_alignments(segments, memory): break # could only be a segment else: - if curr_node['end'] < mem['end']: + if curr_node["end"] < mem["end"]: # check for remaining segments and valid segments - if(curr_node['end'] > curr_node['start']): + if curr_node["end"] > curr_node["start"]: final_list.append(curr_node) - + curr_seg_idx += 1 if curr_seg_idx < len(sorted_segments): current_segment = sorted_segments[curr_seg_idx] else: current_segment = None - - if current_segment is not None and current_segment['start'] <= curr_end_addr and current_segment['start'] < mem['end']: + + if ( + current_segment is not None + and current_segment["start"] <= curr_end_addr + and current_segment["start"] < mem["end"] + ): curr_node = deepcopy(current_segment) - curr_node['permissions'] = mem['permissions'] + curr_node["permissions"] = mem["permissions"] else: # no more segments curr_node = deepcopy(mem) - - curr_node['start'] = curr_end_addr - curr_end_addr = curr_node['end'] - return final_list + curr_node["start"] = curr_end_addr + curr_end_addr = curr_node["end"] + + return final_list + # https://github.com/llvm-mirror/llvm/blob/master/include/llvm/ADT/Triple.h def get_arch(): - arch, arch_vendor, arch_os = lldb.target.GetTriple().split('-') - if arch == 'x86_64': + arch, arch_vendor, arch_os = lldb.target.GetTriple().split("-") + if arch == "x86_64": return "x64" - elif arch == 'x86' or arch == 'i386': + elif arch == "x86" or arch == "i386": return "x86" - elif arch == 'aarch64' or arch == 'arm64': + elif arch == "aarch64" or arch == "arm64": return "arm64le" - elif arch == 'aarch64_be': + elif arch == "aarch64_be": return "arm64be" - elif arch == 'armeb': + elif arch == "armeb": return "armbe" - elif arch == 'arm': + elif arch == "arm": return "armle" else: return "" -#----------------------- -#---- Dumping functions +# ----------------------- +# ---- Dumping functions + def dump_arch_info(): arch_info = {} @@ -152,56 +166,64 @@ def dump_arch_info(): def dump_regs(): reg_state = {} for reg_list in lldb.frame.GetRegisters(): - if 'general purpose registers' in reg_list.GetName().lower(): + if "general purpose registers" in reg_list.GetName().lower(): for reg in reg_list: reg_state[reg.GetName()] = int(reg.GetValue(), 16) return reg_state + def get_section_info(sec): - name = sec.name if sec.name is not None else '' + name = sec.name if sec.name is not None else "" if sec.GetParent().name is not None: - name = sec.GetParent().name + '.' + sec.name + name = sec.GetParent().name + "." + sec.name module_name = sec.addr.module.file.GetFilename() - module_name = module_name if module_name is not None else '' - long_name = module_name + '.' + name - + module_name = module_name if module_name is not None else "" + long_name = module_name + "." + name + return sec.addr.load_addr, (sec.addr.load_addr + sec.size), sec.size, long_name - + def dump_process_memory(output_dir): # Segment information dictionary raw_segment_list = [] raw_memory_list = [] - + # 1st pass: # Loop over the segments, fill in the segment info dictionary for module in lldb.target.module_iter(): for seg_ea in module.section_iter(): - seg_info = {'module': module.file.GetFilename() } - seg_info['start'], seg_info['end'], seg_size, seg_info['name'] = get_section_info(seg_ea) + seg_info = {"module": module.file.GetFilename()} + ( + seg_info["start"], + seg_info["end"], + seg_size, + seg_info["name"], + ) = get_section_info(seg_ea) # TODO: Ugly hack for -1 LONG address on 32-bit - if seg_info['start'] >= sys.maxint or seg_size <= 0: - print "Throwing away page: {}".format(seg_info['name']) + if seg_info["start"] >= sys.maxint or seg_size <= 0: + print "Throwing away page: {}".format(seg_info["name"]) continue # Page-align segment - seg_info['start'] = ALIGN_PAGE_DOWN(seg_info['start']) - seg_info['end'] = ALIGN_PAGE_UP(seg_info['end']) - print("Appending: {}".format(seg_info['name'])) + seg_info["start"] = ALIGN_PAGE_DOWN(seg_info["start"]) + seg_info["end"] = ALIGN_PAGE_UP(seg_info["end"]) + print ("Appending: {}".format(seg_info["name"])) raw_segment_list.append(seg_info) # Add the stack memory region (just hardcode 0x1000 around the current SP) sp = lldb.frame.GetSP() start_sp = ALIGN_PAGE_DOWN(sp) - raw_segment_list.append({'start': start_sp, 'end': start_sp + 0x1000, 'name': 'STACK'}) + raw_segment_list.append( + {"start": start_sp, "end": start_sp + 0x1000, "name": "STACK"} + ) # Write the original memory to file for debugging - index_file = open(os.path.join(output_dir, DEBUG_MEM_FILE_NAME), 'w') + index_file = open(os.path.join(output_dir, DEBUG_MEM_FILE_NAME), "w") index_file.write(json.dumps(raw_segment_list, indent=4)) - index_file.close() + index_file.close() - # Loop over raw memory regions + # Loop over raw memory regions mem_info = lldb.SBMemoryRegionInfo() start_addr = -1 next_region_addr = 0 @@ -218,15 +240,20 @@ def dump_process_memory(output_dir): end_addr = mem_info.GetRegionEnd() # Unknown region name - region_name = 'UNKNOWN' + region_name = "UNKNOWN" # Ignore regions that aren't even mapped if mem_info.IsMapped() and mem_info.IsReadable(): - mem_info_obj = {'start': start_addr, 'end': end_addr, 'name': region_name, 'permissions': { - "r": mem_info.IsReadable(), - "w": mem_info.IsWritable(), - "x": mem_info.IsExecutable() - }} + mem_info_obj = { + "start": start_addr, + "end": end_addr, + "name": region_name, + "permissions": { + "r": mem_info.IsReadable(), + "w": mem_info.IsWritable(), + "x": mem_info.IsExecutable(), + }, + } raw_memory_list.append(mem_info_obj) @@ -234,65 +261,89 @@ def dump_process_memory(output_dir): for seg_info in final_segment_list: try: - seg_info['content_file'] = '' - start_addr = seg_info['start'] - end_addr = seg_info['end'] - region_name = seg_info['name'] + seg_info["content_file"] = "" + start_addr = seg_info["start"] + end_addr = seg_info["end"] + region_name = seg_info["name"] # Compress and dump the content to a file err = lldb.SBError() - seg_content = lldb.process.ReadMemory(start_addr, end_addr - start_addr, err) - if(seg_content == None): - print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(start_addr, region_name)) - seg_info['content_file'] = '' + seg_content = lldb.process.ReadMemory( + start_addr, end_addr - start_addr, err + ) + if seg_content == None: + print ( + "Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format( + start_addr, region_name + ) + ) + seg_info["content_file"] = "" else: - print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(start_addr, len(seg_content), region_name, repr(seg_info['permissions']))) + print ( + "Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format( + start_addr, + len(seg_content), + region_name, + repr(seg_info["permissions"]), + ) + ) compressed_seg_content = zlib.compress(seg_content) md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" - seg_info['content_file'] = md5_sum - + seg_info["content_file"] = md5_sum + # Write the compressed contents to disk - out_file = open(os.path.join(output_dir, md5_sum), 'wb') + out_file = open(os.path.join(output_dir, md5_sum), "wb") out_file.write(compressed_seg_content) out_file.close() - + except: - print("Exception reading segment ({}): {}".format(region_name, sys.exc_info()[0])) - + print ( + "Exception reading segment ({}): {}".format( + region_name, sys.exc_info()[0] + ) + ) + return final_segment_list -#---------- -#---- Main - + +# ---------- +# ---- Main + + def main(): try: - print("----- Unicorn Context Dumper -----") - print("You must be actively debugging before running this!") - print("If it fails, double check that you are actively debugging before running.") - + print ("----- Unicorn Context Dumper -----") + print ("You must be actively debugging before running this!") + print ( + "If it fails, double check that you are actively debugging before running." + ) + # Create the output directory - timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') + timestamp = datetime.datetime.fromtimestamp(time.time()).strftime( + "%Y%m%d_%H%M%S" + ) output_path = "UnicornContext_" + timestamp if not os.path.exists(output_path): os.makedirs(output_path) - print("Process context will be output to {}".format(output_path)) - + print ("Process context will be output to {}".format(output_path)) + # Get the context context = { "arch": dump_arch_info(), - "regs": dump_regs(), + "regs": dump_regs(), "segments": dump_process_memory(output_path), } - + # Write the index file - index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') + index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w") index_file.write(json.dumps(context, indent=4)) - index_file.close() - print("Done.") - + index_file.close() + print ("Done.") + except Exception, e: - print("!!! ERROR:\n\t{}".format(repr(e))) - + print ("!!! ERROR:\n\t{}".format(repr(e))) + + if __name__ == "__main__": main() elif lldb.debugger: diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py b/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py index dc56b2aa..eccbc8bf 100644 --- a/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py +++ b/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py @@ -59,45 +59,47 @@ MAX_SEG_SIZE = 128 * 1024 * 1024 # Name of the index file INDEX_FILE_NAME = "_index.json" -#---------------------- -#---- Helper Functions +# ---------------------- +# ---- Helper Functions + def map_arch(): - arch = pwndbg.arch.current # from PWNDBG - if 'x86_64' in arch or 'x86-64' in arch: + arch = pwndbg.arch.current # from PWNDBG + if "x86_64" in arch or "x86-64" in arch: return "x64" - elif 'x86' in arch or 'i386' in arch: + elif "x86" in arch or "i386" in arch: return "x86" - elif 'aarch64' in arch or 'arm64' in arch: + elif "aarch64" in arch or "arm64" in arch: return "arm64le" - elif 'aarch64_be' in arch: + elif "aarch64_be" in arch: return "arm64be" - elif 'arm' in arch: - cpsr = pwndbg.regs['cpsr'] - # check endianess - if pwndbg.arch.endian == 'big': + elif "arm" in arch: + cpsr = pwndbg.regs["cpsr"] + # check endianess + if pwndbg.arch.endian == "big": # check for THUMB mode - if (cpsr & (1 << 5)): + if cpsr & (1 << 5): return "armbethumb" else: return "armbe" else: # check for THUMB mode - if (cpsr & (1 << 5)): + if cpsr & (1 << 5): return "armlethumb" else: return "armle" - elif 'mips' in arch: - if pwndbg.arch.endian == 'little': - return 'mipsel' + elif "mips" in arch: + if pwndbg.arch.endian == "little": + return "mipsel" else: - return 'mips' + return "mips" else: return "" -#----------------------- -#---- Dumping functions +# ----------------------- +# ---- Dumping functions + def dump_arch_info(): arch_info = {} @@ -110,26 +112,26 @@ def dump_regs(): for reg in pwndbg.regs.all: reg_val = pwndbg.regs[reg] # current dumper script looks for register values to be hex strings -# reg_str = "0x{:08x}".format(reg_val) -# if "64" in get_arch(): -# reg_str = "0x{:016x}".format(reg_val) -# reg_state[reg.strip().strip('$')] = reg_str - reg_state[reg.strip().strip('$')] = reg_val + # reg_str = "0x{:08x}".format(reg_val) + # if "64" in get_arch(): + # reg_str = "0x{:016x}".format(reg_val) + # reg_state[reg.strip().strip('$')] = reg_str + reg_state[reg.strip().strip("$")] = reg_val return reg_state def dump_process_memory(output_dir): # Segment information dictionary final_segment_list = [] - + # PWNDBG: vmmap = pwndbg.vmmap.get() - + # Pointer to end of last dumped memory segment - segment_last_addr = 0x0; + segment_last_addr = 0x0 start = None - end = None + end = None if not vmmap: print("No address mapping information found") @@ -141,86 +143,107 @@ def dump_process_memory(output_dir): continue start = entry.start - end = entry.end + end = entry.end - if (segment_last_addr > entry.start): # indicates overlap - if (segment_last_addr > entry.end): # indicates complete overlap, so we skip the segment entirely + if segment_last_addr > entry.start: # indicates overlap + if ( + segment_last_addr > entry.end + ): # indicates complete overlap, so we skip the segment entirely continue - else: + else: start = segment_last_addr - - - seg_info = {'start': start, 'end': end, 'name': entry.objfile, 'permissions': { - "r": entry.read, - "w": entry.write, - "x": entry.execute - }, 'content_file': ''} + + seg_info = { + "start": start, + "end": end, + "name": entry.objfile, + "permissions": {"r": entry.read, "w": entry.write, "x": entry.execute}, + "content_file": "", + } # "(deleted)" may or may not be valid, but don't push it. - if entry.read and not '(deleted)' in entry.objfile: + if entry.read and not "(deleted)" in entry.objfile: try: # Compress and dump the content to a file seg_content = pwndbg.memory.read(start, end - start) - if(seg_content == None): - print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(entry.start, entry.objfile)) + if seg_content == None: + print( + "Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format( + entry.start, entry.objfile + ) + ) else: - print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(entry.start, len(seg_content), entry.objfile, repr(seg_info['permissions']))) + print( + "Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format( + entry.start, + len(seg_content), + entry.objfile, + repr(seg_info["permissions"]), + ) + ) compressed_seg_content = zlib.compress(str(seg_content)) md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" seg_info["content_file"] = md5_sum - + # Write the compressed contents to disk - out_file = open(os.path.join(output_dir, md5_sum), 'wb') + out_file = open(os.path.join(output_dir, md5_sum), "wb") out_file.write(compressed_seg_content) out_file.close() except Exception as e: traceback.print_exc() - print("Exception reading segment ({}): {}".format(entry.objfile, sys.exc_info()[0])) + print( + "Exception reading segment ({}): {}".format( + entry.objfile, sys.exc_info()[0] + ) + ) else: print("Skipping segment {0}@0x{1:016x}".format(entry.objfile, entry.start)) - + segment_last_addr = end # Add the segment to the list final_segment_list.append(seg_info) - return final_segment_list -#---------- -#---- Main - + +# ---------- +# ---- Main + + def main(): print("----- Unicorn Context Dumper -----") print("You must be actively debugging before running this!") print("If it fails, double check that you are actively debugging before running.") - + try: # Create the output directory - timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') + timestamp = datetime.datetime.fromtimestamp(time.time()).strftime( + "%Y%m%d_%H%M%S" + ) output_path = "UnicornContext_" + timestamp if not os.path.exists(output_path): os.makedirs(output_path) print("Process context will be output to {}".format(output_path)) - + # Get the context context = { "arch": dump_arch_info(), - "regs": dump_regs(), + "regs": dump_regs(), "segments": dump_process_memory(output_path), } # Write the index file - index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') + index_file = open(os.path.join(output_path, INDEX_FILE_NAME), "w") index_file.write(json.dumps(context, indent=4)) - index_file.close() + index_file.close() print("Done.") - + except Exception as e: print("!!! ERROR:\n\t{}".format(repr(e))) - + + if __name__ == "__main__" and pwndbg_loaded: main() - diff --git a/unicorn_mode/samples/c/COMPILE.md b/unicorn_mode/samples/c/COMPILE.md index 7857e5bf..7da140f7 100644 --- a/unicorn_mode/samples/c/COMPILE.md +++ b/unicorn_mode/samples/c/COMPILE.md @@ -17,6 +17,6 @@ You shouldn't need to compile simple_target.c since a X86_64 binary version is pre-built and shipped in this sample folder. This file documents how the binary was built in case you want to rebuild it or recompile it for any reason. -The pre-built binary (simple_target_x86_64.bin) was built using -g -O0 in gcc. +The pre-built binary (persistent_target_x86_64) was built using -g -O0 in gcc. We then load the binary and execute the main function directly. diff --git a/unicorn_mode/samples/compcov_x64/compcov_test_harness.py b/unicorn_mode/samples/compcov_x64/compcov_test_harness.py index b9ebb61d..f0749d1b 100644 --- a/unicorn_mode/samples/compcov_x64/compcov_test_harness.py +++ b/unicorn_mode/samples/compcov_x64/compcov_test_harness.py @@ -22,48 +22,81 @@ from unicornafl import * from unicornafl.x86_const import * # Path to the file containing the binary to emulate -BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'compcov_target.bin') +BINARY_FILE = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "compcov_target.bin" +) # Memory map for the code to be tested -CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded +CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded CODE_SIZE_MAX = 0x00010000 # Max size for the code (64kb) STACK_ADDRESS = 0x00200000 # Address of the stack (arbitrarily chosen) -STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen) -DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed +STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen) +DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data try: # If Capstone is installed then we'll dump disassembly, otherwise just dump the binary. from capstone import * + cs = Cs(CS_ARCH_X86, CS_MODE_64) + def unicorn_debug_instruction(uc, address, size, user_data): mem = uc.mem_read(address, size) - for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): + for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite( + bytes(mem), size + ): print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) + + except ImportError: + def unicorn_debug_instruction(uc, address, size, user_data): print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + def unicorn_debug_block(uc, address, size, user_data): print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + def unicorn_debug_mem_access(uc, access, address, size, value, user_data): if access == UC_MEM_WRITE: - print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) + print( + " >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format( + address, size, value + ) + ) else: print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) + def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data): if access == UC_MEM_WRITE_UNMAPPED: - print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) + print( + " >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format( + address, size, value + ) + ) else: - print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)) + print( + " >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size) + ) + def main(): parser = argparse.ArgumentParser(description="Test harness for compcov_target.bin") - parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load") - parser.add_argument('-t', '--trace', default=False, action="store_true", help="Enables debug tracing") + parser.add_argument( + "input_file", + type=str, + help="Path to the file containing the mutated input to load", + ) + parser.add_argument( + "-t", + "--trace", + default=False, + action="store_true", + help="Enables debug tracing", + ) args = parser.parse_args() # Instantiate a MIPS32 big endian Unicorn Engine instance @@ -73,13 +106,16 @@ def main(): uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block) uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction) uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access) - uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access) + uc.hook_add( + UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, + unicorn_debug_mem_invalid_access, + ) - #--------------------------------------------------- + # --------------------------------------------------- # Load the binary to emulate and map it into memory print("Loading data input from {}".format(args.input_file)) - binary_file = open(BINARY_FILE, 'rb') + binary_file = open(BINARY_FILE, "rb") binary_code = binary_file.read() binary_file.close() @@ -93,11 +129,11 @@ def main(): uc.mem_write(CODE_ADDRESS, binary_code) # Set the program counter to the start of the code - start_address = CODE_ADDRESS # Address of entry point of main() - end_address = CODE_ADDRESS + 0x55 # Address of last instruction in main() + start_address = CODE_ADDRESS # Address of entry point of main() + end_address = CODE_ADDRESS + 0x55 # Address of last instruction in main() uc.reg_write(UC_X86_REG_RIP, start_address) - #----------------- + # ----------------- # Setup the stack uc.mem_map(STACK_ADDRESS, STACK_SIZE) @@ -106,8 +142,7 @@ def main(): # Mapping a location to write our buffer to uc.mem_map(DATA_ADDRESS, DATA_SIZE_MAX) - - #----------------------------------------------- + # ----------------------------------------------- # Load the mutated input and map it into memory def place_input_callback(uc, input, _, data): @@ -121,7 +156,7 @@ def main(): # Write the mutated command into the data buffer uc.mem_write(DATA_ADDRESS, input) - #------------------------------------------------------------ + # ------------------------------------------------------------ # Emulate the code, allowing it to process the mutated input print("Starting the AFL fuzz") @@ -129,8 +164,9 @@ def main(): input_file=args.input_file, place_input_callback=place_input_callback, exits=[end_address], - persistent_iters=1 + persistent_iters=1, ) + if __name__ == "__main__": main() diff --git a/unicorn_mode/samples/simple/simple_test_harness.py b/unicorn_mode/samples/simple/simple_test_harness.py index 4a673daf..cd04ad3a 100644 --- a/unicorn_mode/samples/simple/simple_test_harness.py +++ b/unicorn_mode/samples/simple/simple_test_harness.py @@ -22,48 +22,81 @@ from unicornafl import * from unicornafl.mips_const import * # Path to the file containing the binary to emulate -BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'simple_target.bin') +BINARY_FILE = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "simple_target.bin" +) # Memory map for the code to be tested -CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded +CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded CODE_SIZE_MAX = 0x00010000 # Max size for the code (64kb) STACK_ADDRESS = 0x00200000 # Address of the stack (arbitrarily chosen) -STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen) -DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed +STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen) +DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data try: # If Capstone is installed then we'll dump disassembly, otherwise just dump the binary. from capstone import * + cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN) + def unicorn_debug_instruction(uc, address, size, user_data): mem = uc.mem_read(address, size) - for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): + for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite( + bytes(mem), size + ): print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) + + except ImportError: + def unicorn_debug_instruction(uc, address, size, user_data): - print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + def unicorn_debug_block(uc, address, size, user_data): print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) - + + def unicorn_debug_mem_access(uc, access, address, size, value, user_data): if access == UC_MEM_WRITE: - print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) + print( + " >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format( + address, size, value + ) + ) else: - print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) + print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) + def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data): if access == UC_MEM_WRITE_UNMAPPED: - print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) + print( + " >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format( + address, size, value + ) + ) else: - print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)) + print( + " >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size) + ) + def main(): parser = argparse.ArgumentParser(description="Test harness for simple_target.bin") - parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load") - parser.add_argument('-t', '--trace', default=False, action="store_true", help="Enables debug tracing") + parser.add_argument( + "input_file", + type=str, + help="Path to the file containing the mutated input to load", + ) + parser.add_argument( + "-t", + "--trace", + default=False, + action="store_true", + help="Enables debug tracing", + ) args = parser.parse_args() # Instantiate a MIPS32 big endian Unicorn Engine instance @@ -73,13 +106,16 @@ def main(): uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block) uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction) uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access) - uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access) + uc.hook_add( + UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, + unicorn_debug_mem_invalid_access, + ) - #--------------------------------------------------- + # --------------------------------------------------- # Load the binary to emulate and map it into memory print("Loading data input from {}".format(args.input_file)) - binary_file = open(BINARY_FILE, 'rb') + binary_file = open(BINARY_FILE, "rb") binary_code = binary_file.read() binary_file.close() @@ -93,11 +129,11 @@ def main(): uc.mem_write(CODE_ADDRESS, binary_code) # Set the program counter to the start of the code - start_address = CODE_ADDRESS # Address of entry point of main() - end_address = CODE_ADDRESS + 0xf4 # Address of last instruction in main() + start_address = CODE_ADDRESS # Address of entry point of main() + end_address = CODE_ADDRESS + 0xF4 # Address of last instruction in main() uc.reg_write(UC_MIPS_REG_PC, start_address) - #----------------- + # ----------------- # Setup the stack uc.mem_map(STACK_ADDRESS, STACK_SIZE) @@ -106,14 +142,14 @@ def main(): # reserve some space for data uc.mem_map(DATA_ADDRESS, DATA_SIZE_MAX) - #----------------------------------------------------- + # ----------------------------------------------------- # Set up a callback to place input data (do little work here, it's called for every single iteration) # We did not pass in any data and don't use persistent mode, so we can ignore these params. # Be sure to check out the docstrings for the uc.afl_* functions. def place_input_callback(uc, input, persistent_round, data): # Apply constraints to the mutated input if len(input) > DATA_SIZE_MAX: - #print("Test input is too long (> {} bytes)") + # print("Test input is too long (> {} bytes)") return False # Write the mutated command into the data buffer @@ -122,5 +158,6 @@ def main(): # Start the fuzzer. uc.afl_fuzz(args.input_file, place_input_callback, [end_address]) + if __name__ == "__main__": main() diff --git a/unicorn_mode/samples/simple/simple_test_harness_alt.py b/unicorn_mode/samples/simple/simple_test_harness_alt.py index 9c3dbc93..3249b13d 100644 --- a/unicorn_mode/samples/simple/simple_test_harness_alt.py +++ b/unicorn_mode/samples/simple/simple_test_harness_alt.py @@ -25,50 +25,79 @@ from unicornafl import * from unicornafl.mips_const import * # Path to the file containing the binary to emulate -BINARY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'simple_target.bin') +BINARY_FILE = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "simple_target.bin" +) # Memory map for the code to be tested -CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded +CODE_ADDRESS = 0x00100000 # Arbitrary address where code to test will be loaded CODE_SIZE_MAX = 0x00010000 # Max size for the code (64kb) STACK_ADDRESS = 0x00200000 # Address of the stack (arbitrarily chosen) -STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen) -DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed +STACK_SIZE = 0x00010000 # Size of the stack (arbitrarily chosen) +DATA_ADDRESS = 0x00300000 # Address where mutated data will be placed DATA_SIZE_MAX = 0x00010000 # Maximum allowable size of mutated data try: # If Capstone is installed then we'll dump disassembly, otherwise just dump the binary. from capstone import * + cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN) + def unicorn_debug_instruction(uc, address, size, user_data): mem = uc.mem_read(address, size) - for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): + for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite( + bytes(mem), size + ): print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) + + except ImportError: + def unicorn_debug_instruction(uc, address, size, user_data): - print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + def unicorn_debug_block(uc, address, size, user_data): print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) - + + def unicorn_debug_mem_access(uc, access, address, size, value, user_data): if access == UC_MEM_WRITE: - print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) + print( + " >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format( + address, size, value + ) + ) else: - print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) + print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) + def unicorn_debug_mem_invalid_access(uc, access, address, size, value, user_data): if access == UC_MEM_WRITE_UNMAPPED: - print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) + print( + " >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format( + address, size, value + ) + ) else: - print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)) + print( + " >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size) + ) + def force_crash(uc_error): # This function should be called to indicate to AFL that a crash occurred during emulation. # Pass in the exception received from Uc.emu_start() mem_errors = [ - UC_ERR_READ_UNMAPPED, UC_ERR_READ_PROT, UC_ERR_READ_UNALIGNED, - UC_ERR_WRITE_UNMAPPED, UC_ERR_WRITE_PROT, UC_ERR_WRITE_UNALIGNED, - UC_ERR_FETCH_UNMAPPED, UC_ERR_FETCH_PROT, UC_ERR_FETCH_UNALIGNED, + UC_ERR_READ_UNMAPPED, + UC_ERR_READ_PROT, + UC_ERR_READ_UNALIGNED, + UC_ERR_WRITE_UNMAPPED, + UC_ERR_WRITE_PROT, + UC_ERR_WRITE_UNALIGNED, + UC_ERR_FETCH_UNMAPPED, + UC_ERR_FETCH_PROT, + UC_ERR_FETCH_UNALIGNED, ] if uc_error.errno in mem_errors: # Memory error - throw SIGSEGV @@ -80,11 +109,22 @@ def force_crash(uc_error): # Not sure what happened - throw SIGABRT os.kill(os.getpid(), signal.SIGABRT) + def main(): parser = argparse.ArgumentParser(description="Test harness for simple_target.bin") - parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input to load") - parser.add_argument('-d', '--debug', default=False, action="store_true", help="Enables debug tracing") + parser.add_argument( + "input_file", + type=str, + help="Path to the file containing the mutated input to load", + ) + parser.add_argument( + "-d", + "--debug", + default=False, + action="store_true", + help="Enables debug tracing", + ) args = parser.parse_args() # Instantiate a MIPS32 big endian Unicorn Engine instance @@ -94,13 +134,16 @@ def main(): uc.hook_add(UC_HOOK_BLOCK, unicorn_debug_block) uc.hook_add(UC_HOOK_CODE, unicorn_debug_instruction) uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, unicorn_debug_mem_access) - uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, unicorn_debug_mem_invalid_access) + uc.hook_add( + UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, + unicorn_debug_mem_invalid_access, + ) - #--------------------------------------------------- + # --------------------------------------------------- # Load the binary to emulate and map it into memory print("Loading data input from {}".format(args.input_file)) - binary_file = open(BINARY_FILE, 'rb') + binary_file = open(BINARY_FILE, "rb") binary_code = binary_file.read() binary_file.close() @@ -114,11 +157,11 @@ def main(): uc.mem_write(CODE_ADDRESS, binary_code) # Set the program counter to the start of the code - start_address = CODE_ADDRESS # Address of entry point of main() - end_address = CODE_ADDRESS + 0xf4 # Address of last instruction in main() + start_address = CODE_ADDRESS # Address of entry point of main() + end_address = CODE_ADDRESS + 0xF4 # Address of last instruction in main() uc.reg_write(UC_MIPS_REG_PC, start_address) - #----------------- + # ----------------- # Setup the stack uc.mem_map(STACK_ADDRESS, STACK_SIZE) @@ -127,10 +170,10 @@ def main(): # reserve some space for data uc.mem_map(DATA_ADDRESS, DATA_SIZE_MAX) - #----------------------------------------------------- + # ----------------------------------------------------- # Kick off AFL's fork server - # THIS MUST BE DONE BEFORE LOADING USER DATA! - # If this isn't done every single run, the AFL fork server + # THIS MUST BE DONE BEFORE LOADING USER DATA! + # If this isn't done every single run, the AFL fork server # will not be started appropriately and you'll get erratic results! print("Starting the AFL forkserver") @@ -142,12 +185,12 @@ def main(): else: out = lambda x, y: print(x.format(y)) - #----------------------------------------------- + # ----------------------------------------------- # Load the mutated input and map it into memory # Load the mutated input from disk out("Loading data input from {}", args.input_file) - input_file = open(args.input_file, 'rb') + input_file = open(args.input_file, "rb") input = input_file.read() input_file.close() @@ -159,7 +202,7 @@ def main(): # Write the mutated command into the data buffer uc.mem_write(DATA_ADDRESS, input) - #------------------------------------------------------------ + # ------------------------------------------------------------ # Emulate the code, allowing it to process the mutated input out("Executing until a crash or execution reaches 0x{0:016x}", end_address) @@ -175,5 +218,6 @@ def main(): # UC_AFL_RET_FINISHED = 3 out("Done. AFL Mode is {}", afl_mode) + if __name__ == "__main__": main() diff --git a/unicorn_mode/samples/speedtest/get_offsets.py b/unicorn_mode/samples/speedtest/get_offsets.py index c9dc76df..c9dc76df 100644..100755 --- a/unicorn_mode/samples/speedtest/get_offsets.py +++ b/unicorn_mode/samples/speedtest/get_offsets.py diff --git a/unicorn_mode/samples/speedtest/python/harness.py b/unicorn_mode/samples/speedtest/python/harness.py index f72eb32b..801ef4d1 100644 --- a/unicorn_mode/samples/speedtest/python/harness.py +++ b/unicorn_mode/samples/speedtest/python/harness.py @@ -256,17 +256,17 @@ def main(): input_len = len(input) # global input_len if input_len > INPUT_MAX: - #print("Test input is too long (> {} bytes)") + # print("Test input is too long (> {} bytes)") return False # print(f"Placing input: {input} in round {persistent_round}") # Make sure the string is always 0-terminated (as it would be "in the wild") - input[-1] = b'\0' + input[-1] = b"\0" # Write the mutated command into the data buffer uc.mem_write(INPUT_ADDRESS, input) - #uc.reg_write(UC_X86_REG_RIP, main_offset) + # uc.reg_write(UC_X86_REG_RIP, main_offset) print(f"Starting to fuzz. Running from addr {main_offset} to one of {main_ends}") # Start the fuzzer. diff --git a/unicorn_mode/unicornafl b/unicorn_mode/unicornafl -Subproject 80d31ef367f7a1a75fc48e08e129d10f2ffa049 +Subproject fb2fc9f25df32f17f6b6b859e4dbd70f9a857e0 |