diff options
Diffstat (limited to 'unicorn_mode/helper_scripts')
-rw-r--r-- | unicorn_mode/helper_scripts/template_test_harness.py | 104 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_dumper_gdb.py | 190 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_dumper_ida.py | 209 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_dumper_lldb.py | 299 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py | 224 | ||||
-rw-r--r-- | unicorn_mode/helper_scripts/unicorn_loader.py | 560 |
6 files changed, 1586 insertions, 0 deletions
diff --git a/unicorn_mode/helper_scripts/template_test_harness.py b/unicorn_mode/helper_scripts/template_test_harness.py new file mode 100644 index 00000000..93c526cc --- /dev/null +++ b/unicorn_mode/helper_scripts/template_test_harness.py @@ -0,0 +1,104 @@ +""" + template_test_harness.py + + Template which loads the context of a process into a Unicorn Engine, + instance, loads a custom (mutated) inputs, and executes the + desired code. Designed to be used in conjunction with one of the + Unicorn Context Dumper scripts. + + Author: + Nathan Voss <njvoss299@gmail.com> +""" + +import argparse + +from unicorn import * +from unicorn.x86_const import * # TODO: Set correct architecture here as necessary + +import unicorn_loader + +# Simple stand-in heap to prevent OS/kernel issues +unicorn_heap = None + +# Start and end address of emulation +START_ADDRESS = # TODO: Set start address here +END_ADDRESS = # TODO: Set end address here + +""" + Implement target-specific hooks in here. + Stub out, skip past, and re-implement necessary functionality as appropriate +""" +def unicorn_hook_instruction(uc, address, size, user_data): + + # TODO: Setup hooks and handle anything you need to here + # - For example, hook malloc/free/etc. and handle it internally + pass + +#------------------------ +#---- Main test function + +def main(): + + parser = argparse.ArgumentParser() + parser.add_argument('context_dir', type=str, help="Directory containing process context") + parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input content") + parser.add_argument('-d', '--debug', default=False, action="store_true", help="Dump trace info") + args = parser.parse_args() + + print("Loading context from {}".format(args.context_dir)) + uc = unicorn_loader.AflUnicornEngine(args.context_dir, enable_trace=args.debug, debug_print=False) + + # Instantiate the hook function to avoid emulation errors + global unicorn_heap + unicorn_heap = unicorn_loader.UnicornSimpleHeap(uc, debug_print=True) + uc.hook_add(UC_HOOK_CODE, unicorn_hook_instruction) + + # Execute 1 instruction just to startup the forkserver + # NOTE: This instruction will be executed again later, so be sure that + # there are no negative consequences to the overall execution state. + # If there are, change the later call to emu_start to no re-execute + # the first instruction. + print("Starting the forkserver by executing 1 instruction") + try: + uc.emu_start(START_ADDRESS, 0, 0, count=1) + except UcError as e: + print("ERROR: Failed to execute a single instruction (error: {})!".format(e)) + return + + # Allocate a buffer and load a mutated input and put it into the right spot + if args.input_file: + print("Loading input content from {}".format(args.input_file)) + input_file = open(args.input_file, 'rb') + input_content = input_file.read() + input_file.close() + + # TODO: Apply constraints to mutated input here + raise exceptions.NotImplementedError('No constraints on the mutated inputs have been set!') + + # Allocate a new buffer and put the input into it + buf_addr = unicorn_heap.malloc(len(input_content)) + uc.mem_write(buf_addr, input_content) + print("Allocated mutated input buffer @ 0x{0:016x}".format(buf_addr)) + + # TODO: Set the input into the state so it will be handled + raise exceptions.NotImplementedError('The mutated input was not loaded into the Unicorn state!') + + # Run the test + print("Executing from 0x{0:016x} to 0x{1:016x}".format(START_ADDRESS, END_ADDRESS)) + try: + result = uc.emu_start(START_ADDRESS, END_ADDRESS, timeout=0, count=0) + except UcError as e: + # If something went wrong during emulation a signal is raised to force this + # script to crash in a way that AFL can detect ('uc.force_crash()' should be + # called for any condition that you want AFL to treat as a crash). + print("Execution failed with error: {}".format(e)) + uc.dump_regs() + uc.force_crash(e) + + print("Final register state:") + uc.dump_regs() + + print("Done.") + +if __name__ == "__main__": + main() diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py b/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py new file mode 100644 index 00000000..22b9fd47 --- /dev/null +++ b/unicorn_mode/helper_scripts/unicorn_dumper_gdb.py @@ -0,0 +1,190 @@ +""" + unicorn_dumper_gdb.py + + When run with GDB sitting at a debug breakpoint, this + dumps the current state (registers/memory/etc) of + the process to a directory consisting of an index + file with register and segment information and + sub-files containing all actual process memory. + + The output of this script is expected to be used + to initialize context for Unicorn emulation. + + ----------- + + In order to run this script, GEF needs to be running in the GDB session (gef.py) + # HELPERS from: https://github.com/hugsy/gef/blob/master/gef.py + It can be loaded with: + source <path_to_gef>/gef.py + + Call this function when at a breakpoint in your process with: + source unicorn_dumper_gdb.py + + ----------- + + +""" + +import datetime +import hashlib +import json +import os +import sys +import time +import zlib + +# GDB Python SDK +import gdb + +# Maximum segment size that we'll store +# Yep, this could break stuff pretty quickly if we +# omit something that's used during emulation. +MAX_SEG_SIZE = 128 * 1024 * 1024 + +# Name of the index file +INDEX_FILE_NAME = "_index.json" + +#---------------------- +#---- Helper Functions + +def map_arch(): + arch = get_arch() # from GEF + if 'x86_64' in arch or 'x86-64' in arch: + return "x64" + elif 'x86' in arch or 'i386' in arch: + return "x86" + elif 'aarch64' in arch or 'arm64' in arch: + return "arm64le" + elif 'aarch64_be' in arch: + return "arm64be" + elif 'armeb' in arch: + # check for THUMB mode + cpsr = get_register('cpsr') + if (cpsr & (1 << 5)): + return "armbethumb" + else: + return "armbe" + elif 'arm' in arch: + # check for THUMB mode + cpsr = get_register('cpsr') + if (cpsr & (1 << 5)): + return "armlethumb" + else: + return "armle" + else: + return "" + + +#----------------------- +#---- Dumping functions + +def dump_arch_info(): + arch_info = {} + arch_info["arch"] = map_arch() + return arch_info + + +def dump_regs(): + reg_state = {} + for reg in current_arch.all_registers: + reg_val = get_register(reg) + # current dumper script looks for register values to be hex strings +# reg_str = "0x{:08x}".format(reg_val) +# if "64" in get_arch(): +# reg_str = "0x{:016x}".format(reg_val) +# reg_state[reg.strip().strip('$')] = reg_str + reg_state[reg.strip().strip('$')] = reg_val + return reg_state + + +def dump_process_memory(output_dir): + # Segment information dictionary + final_segment_list = [] + + # GEF: + vmmap = get_process_maps() + if not vmmap: + print("No address mapping information found") + return final_segment_list + + for entry in vmmap: + if entry.page_start == entry.page_end: + continue + + seg_info = {'start': entry.page_start, 'end': entry.page_end, 'name': entry.path, 'permissions': { + "r": entry.is_readable() > 0, + "w": entry.is_writable() > 0, + "x": entry.is_executable() > 0 + }, 'content_file': ''} + + # "(deleted)" may or may not be valid, but don't push it. + if entry.is_readable() and not '(deleted)' in entry.path: + try: + # Compress and dump the content to a file + seg_content = read_memory(entry.page_start, entry.size) + if(seg_content == None): + print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(entry.page_start, entry.path)) + else: + print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(entry.page_start, len(seg_content), entry.path, repr(seg_info['permissions']))) + compressed_seg_content = zlib.compress(seg_content) + md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" + seg_info["content_file"] = md5_sum + + # Write the compressed contents to disk + out_file = open(os.path.join(output_dir, md5_sum), 'wb') + out_file.write(compressed_seg_content) + out_file.close() + + except: + print("Exception reading segment ({}): {}".format(entry.path, sys.exc_info()[0])) + else: + print("Skipping segment {0}@0x{1:016x}".format(entry.path, entry.page_start)) + + # Add the segment to the list + final_segment_list.append(seg_info) + + + return final_segment_list + +#---------- +#---- Main + +def main(): + print("----- Unicorn Context Dumper -----") + print("You must be actively debugging before running this!") + print("If it fails, double check that you are actively debugging before running.") + try: + GEF_TEST = set_arch() + except Exception as e: + print("!!! GEF not running in GDB. Please run gef.py by executing:") + print('\tpython execfile ("<path_to_gef>/gef.py")') + return + + try: + + # Create the output directory + timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') + output_path = "UnicornContext_" + timestamp + if not os.path.exists(output_path): + os.makedirs(output_path) + print("Process context will be output to {}".format(output_path)) + + # Get the context + context = { + "arch": dump_arch_info(), + "regs": dump_regs(), + "segments": dump_process_memory(output_path), + } + + # Write the index file + index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') + index_file.write(json.dumps(context, indent=4)) + index_file.close() + print("Done.") + + except Exception as e: + print("!!! ERROR:\n\t{}".format(repr(e))) + +if __name__ == "__main__": + main() + diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_ida.py b/unicorn_mode/helper_scripts/unicorn_dumper_ida.py new file mode 100644 index 00000000..6cf9f30f --- /dev/null +++ b/unicorn_mode/helper_scripts/unicorn_dumper_ida.py @@ -0,0 +1,209 @@ +""" + unicorn_dumper_ida.py + + When run with IDA (<v7) sitting at a debug breakpoint, + dumps the current state (registers/memory/etc) of + the process to a directory consisting of an index + file with register and segment information and + sub-files containing all actual process memory. + + The output of this script is expected to be used + to initialize context for Unicorn emulation. +""" + +import datetime +import hashlib +import json +import os +import sys +import time +import zlib + +# IDA Python SDK +from idaapi import * +from idc import * + +# Maximum segment size that we'll store +# Yep, this could break stuff pretty quickly if we +# omit something that's used during emulation. +MAX_SEG_SIZE = 128 * 1024 * 1024 + +# Name of the index file +INDEX_FILE_NAME = "_index.json" + +#---------------------- +#---- Helper Functions + +def get_arch(): + if ph.id == PLFM_386 and ph.flag & PR_USE64: + return "x64" + elif ph.id == PLFM_386 and ph.flag & PR_USE32: + return "x86" + elif ph.id == PLFM_ARM and ph.flag & PR_USE64: + if cvar.inf.is_be(): + return "arm64be" + else: + return "arm64le" + elif ph.id == PLFM_ARM and ph.flag & PR_USE32: + if cvar.inf.is_be(): + return "armbe" + else: + return "armle" + else: + return "" + +def get_register_list(arch): + if arch == "arm64le" or arch == "arm64be": + arch = "arm64" + elif arch == "armle" or arch == "armbe": + arch = "arm" + + registers = { + "x64" : [ + "rax", "rbx", "rcx", "rdx", "rsi", "rdi", "rbp", "rsp", + "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", + "rip", "rsp", "efl", + "cs", "ds", "es", "fs", "gs", "ss", + ], + "x86" : [ + "eax", "ebx", "ecx", "edx", "esi", "edi", "ebp", "esp", + "eip", "esp", "efl", + "cs", "ds", "es", "fs", "gs", "ss", + ], + "arm" : [ + "R0", "R1", "R2", "R3", "R4", "R5", "R6", "R7", + "R8", "R9", "R10", "R11", "R12", "PC", "SP", "LR", + "PSR", + ], + "arm64" : [ + "X0", "X1", "X2", "X3", "X4", "X5", "X6", "X7", + "X8", "X9", "X10", "X11", "X12", "X13", "X14", + "X15", "X16", "X17", "X18", "X19", "X20", "X21", + "X22", "X23", "X24", "X25", "X26", "X27", "X28", + "PC", "SP", "FP", "LR", "CPSR" + # "NZCV", + ] + } + return registers[arch] + +#----------------------- +#---- Dumping functions + +def dump_arch_info(): + arch_info = {} + arch_info["arch"] = get_arch() + return arch_info + +def dump_regs(): + reg_state = {} + for reg in get_register_list(get_arch()): + reg_state[reg] = GetRegValue(reg) + return reg_state + +def dump_process_memory(output_dir): + # Segment information dictionary + segment_list = [] + + # Loop over the segments, fill in the info dictionary + for seg_ea in Segments(): + seg_start = SegStart(seg_ea) + seg_end = SegEnd(seg_ea) + seg_size = seg_end - seg_start + + seg_info = {} + seg_info["name"] = SegName(seg_ea) + seg_info["start"] = seg_start + seg_info["end"] = seg_end + + perms = getseg(seg_ea).perm + seg_info["permissions"] = { + "r": False if (perms & SEGPERM_READ) == 0 else True, + "w": False if (perms & SEGPERM_WRITE) == 0 else True, + "x": False if (perms & SEGPERM_EXEC) == 0 else True, + } + + if (perms & SEGPERM_READ) and seg_size <= MAX_SEG_SIZE and isLoaded(seg_start): + try: + # Compress and dump the content to a file + seg_content = get_many_bytes(seg_start, seg_end - seg_start) + if(seg_content == None): + print("Segment empty: {0}@0x{1:016x} (size:UNKNOWN)".format(SegName(seg_ea), seg_ea)) + seg_info["content_file"] = "" + else: + print("Dumping segment {0}@0x{1:016x} (size:{2})".format(SegName(seg_ea), seg_ea, len(seg_content))) + compressed_seg_content = zlib.compress(seg_content) + md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" + seg_info["content_file"] = md5_sum + + # Write the compressed contents to disk + out_file = open(os.path.join(output_dir, md5_sum), 'wb') + out_file.write(compressed_seg_content) + out_file.close() + except: + print("Exception reading segment: {}".format(sys.exc_info()[0])) + seg_info["content_file"] = "" + else: + print("Skipping segment {0}@0x{1:016x}".format(SegName(seg_ea), seg_ea)) + seg_info["content_file"] = "" + + # Add the segment to the list + segment_list.append(seg_info) + + return segment_list + +""" + TODO: FINISH IMPORT DUMPING +def import_callback(ea, name, ord): + if not name: + else: + + # True -> Continue enumeration + # False -> End enumeration + return True + +def dump_imports(): + import_dict = {} + + for i in xrange(0, number_of_import_modules): + enum_import_names(i, import_callback) + + return import_dict +""" + +#---------- +#---- Main + +def main(): + + try: + print("----- Unicorn Context Dumper -----") + print("You must be actively debugging before running this!") + print("If it fails, double check that you are actively debugging before running.") + + # Create the output directory + timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') + output_path = os.path.dirname(os.path.abspath(GetIdbPath())) + output_path = os.path.join(output_path, "UnicornContext_" + timestamp) + if not os.path.exists(output_path): + os.makedirs(output_path) + print("Process context will be output to {}".format(output_path)) + + # Get the context + context = { + "arch": dump_arch_info(), + "regs": dump_regs(), + "segments": dump_process_memory(output_path), + #"imports": dump_imports(), + } + + # Write the index file + index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') + index_file.write(json.dumps(context, indent=4)) + index_file.close() + print("Done.") + + except Exception, e: + print("!!! ERROR:\n\t{}".format(str(e))) + +if __name__ == "__main__": + main() diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py b/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py new file mode 100644 index 00000000..3c019d77 --- /dev/null +++ b/unicorn_mode/helper_scripts/unicorn_dumper_lldb.py @@ -0,0 +1,299 @@ +""" + unicorn_dumper_lldb.py + + When run with LLDB sitting at a debug breakpoint, this + dumps the current state (registers/memory/etc) of + the process to a directory consisting of an index + file with register and segment information and + sub-files containing all actual process memory. + + The output of this script is expected to be used + to initialize context for Unicorn emulation. + + ----------- + + Call this function when at a breakpoint in your process with: + command script import -r unicorn_dumper_lldb + + If there is trouble with "split on a NoneType", issue the following command: + script lldb.target.triple + + and try to import the script again. + + ----------- + +""" + +from copy import deepcopy +import datetime +import hashlib +import json +import os +import sys +import time +import zlib + +# LLDB Python SDK +import lldb + +# Maximum segment size that we'll store +# Yep, this could break stuff pretty quickly if we +# omit something that's used during emulation. +MAX_SEG_SIZE = 128 * 1024 * 1024 + +# Name of the index file +INDEX_FILE_NAME = "_index.json" +DEBUG_MEM_FILE_NAME = "_memory.json" + +# Page size required by Unicorn +UNICORN_PAGE_SIZE = 0x1000 + +# Alignment functions to align all memory segments to Unicorn page boundaries (4KB pages only) +ALIGN_PAGE_DOWN = lambda x: x & ~(UNICORN_PAGE_SIZE - 1) +ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE-1) + +#---------------------- +#---- Helper Functions + +def overlap_alignments(segments, memory): + final_list = [] + curr_seg_idx = 0 + curr_end_addr = 0 + curr_node = None + current_segment = None + sorted_segments = sorted(segments, key=lambda k: (k['start'], k['end'])) + if curr_seg_idx < len(sorted_segments): + current_segment = sorted_segments[curr_seg_idx] + for mem in sorted(memory, key=lambda k: (k['start'], -k['end'])): + if curr_node is None: + if current_segment is not None and current_segment['start'] == mem['start']: + curr_node = deepcopy(current_segment) + curr_node['permissions'] = mem['permissions'] + else: + curr_node = deepcopy(mem) + + curr_end_addr = curr_node['end'] + + while curr_end_addr <= mem['end']: + if curr_node['end'] == mem['end']: + if current_segment is not None and current_segment['start'] > curr_node['start'] and current_segment['start'] < curr_node['end']: + curr_node['end'] = current_segment['start'] + if(curr_node['end'] > curr_node['start']): + final_list.append(curr_node) + curr_node = deepcopy(current_segment) + curr_node['permissions'] = mem['permissions'] + curr_end_addr = curr_node['end'] + else: + if(curr_node['end'] > curr_node['start']): + final_list.append(curr_node) + # if curr_node is a segment + if current_segment is not None and current_segment['end'] == mem['end']: + curr_seg_idx += 1 + if curr_seg_idx < len(sorted_segments): + current_segment = sorted_segments[curr_seg_idx] + else: + current_segment = None + + curr_node = None + break + # could only be a segment + else: + if curr_node['end'] < mem['end']: + # check for remaining segments and valid segments + if(curr_node['end'] > curr_node['start']): + final_list.append(curr_node) + + curr_seg_idx += 1 + if curr_seg_idx < len(sorted_segments): + current_segment = sorted_segments[curr_seg_idx] + else: + current_segment = None + + if current_segment is not None and current_segment['start'] <= curr_end_addr and current_segment['start'] < mem['end']: + curr_node = deepcopy(current_segment) + curr_node['permissions'] = mem['permissions'] + else: + # no more segments + curr_node = deepcopy(mem) + + curr_node['start'] = curr_end_addr + curr_end_addr = curr_node['end'] + + return final_list + +# https://github.com/llvm-mirror/llvm/blob/master/include/llvm/ADT/Triple.h +def get_arch(): + arch, arch_vendor, arch_os = lldb.target.GetTriple().split('-') + if arch == 'x86_64': + return "x64" + elif arch == 'x86' or arch == 'i386': + return "x86" + elif arch == 'aarch64' or arch == 'arm64': + return "arm64le" + elif arch == 'aarch64_be': + return "arm64be" + elif arch == 'armeb': + return "armbe" + elif arch == 'arm': + return "armle" + else: + return "" + + +#----------------------- +#---- Dumping functions + +def dump_arch_info(): + arch_info = {} + arch_info["arch"] = get_arch() + return arch_info + + +def dump_regs(): + reg_state = {} + for reg_list in lldb.frame.GetRegisters(): + if 'general purpose registers' in reg_list.GetName().lower(): + for reg in reg_list: + reg_state[reg.GetName()] = int(reg.GetValue(), 16) + return reg_state + +def get_section_info(sec): + name = sec.name if sec.name is not None else '' + if sec.GetParent().name is not None: + name = sec.GetParent().name + '.' + sec.name + + module_name = sec.addr.module.file.GetFilename() + module_name = module_name if module_name is not None else '' + long_name = module_name + '.' + name + + return sec.addr.load_addr, (sec.addr.load_addr + sec.size), sec.size, long_name + + +def dump_process_memory(output_dir): + # Segment information dictionary + raw_segment_list = [] + raw_memory_list = [] + + # 1st pass: + # Loop over the segments, fill in the segment info dictionary + for module in lldb.target.module_iter(): + for seg_ea in module.section_iter(): + seg_info = {'module': module.file.GetFilename() } + seg_info['start'], seg_info['end'], seg_size, seg_info['name'] = get_section_info(seg_ea) + # TODO: Ugly hack for -1 LONG address on 32-bit + if seg_info['start'] >= sys.maxint or seg_size <= 0: + print "Throwing away page: {}".format(seg_info['name']) + continue + + # Page-align segment + seg_info['start'] = ALIGN_PAGE_DOWN(seg_info['start']) + seg_info['end'] = ALIGN_PAGE_UP(seg_info['end']) + print("Appending: {}".format(seg_info['name'])) + raw_segment_list.append(seg_info) + + # Add the stack memory region (just hardcode 0x1000 around the current SP) + sp = lldb.frame.GetSP() + start_sp = ALIGN_PAGE_DOWN(sp) + raw_segment_list.append({'start': start_sp, 'end': start_sp + 0x1000, 'name': 'STACK'}) + + # Write the original memory to file for debugging + index_file = open(os.path.join(output_dir, DEBUG_MEM_FILE_NAME), 'w') + index_file.write(json.dumps(raw_segment_list, indent=4)) + index_file.close() + + # Loop over raw memory regions + mem_info = lldb.SBMemoryRegionInfo() + start_addr = -1 + next_region_addr = 0 + while next_region_addr > start_addr: + err = lldb.process.GetMemoryRegionInfo(next_region_addr, mem_info) + # TODO: Should check err.success. If False, what do we do? + if not err.success: + break + next_region_addr = mem_info.GetRegionEnd() + if next_region_addr >= sys.maxsize: + break + + start_addr = mem_info.GetRegionBase() + end_addr = mem_info.GetRegionEnd() + + # Unknown region name + region_name = 'UNKNOWN' + + # Ignore regions that aren't even mapped + if mem_info.IsMapped() and mem_info.IsReadable(): + mem_info_obj = {'start': start_addr, 'end': end_addr, 'name': region_name, 'permissions': { + "r": mem_info.IsReadable(), + "w": mem_info.IsWritable(), + "x": mem_info.IsExecutable() + }} + + raw_memory_list.append(mem_info_obj) + + final_segment_list = overlap_alignments(raw_segment_list, raw_memory_list) + + for seg_info in final_segment_list: + try: + seg_info['content_file'] = '' + start_addr = seg_info['start'] + end_addr = seg_info['end'] + region_name = seg_info['name'] + # Compress and dump the content to a file + err = lldb.SBError() + seg_content = lldb.process.ReadMemory(start_addr, end_addr - start_addr, err) + if(seg_content == None): + print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(start_addr, region_name)) + seg_info['content_file'] = '' + else: + print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(start_addr, len(seg_content), region_name, repr(seg_info['permissions']))) + compressed_seg_content = zlib.compress(seg_content) + md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" + seg_info['content_file'] = md5_sum + + # Write the compressed contents to disk + out_file = open(os.path.join(output_dir, md5_sum), 'wb') + out_file.write(compressed_seg_content) + out_file.close() + + except: + print("Exception reading segment ({}): {}".format(region_name, sys.exc_info()[0])) + + return final_segment_list + +#---------- +#---- Main + +def main(): + + try: + print("----- Unicorn Context Dumper -----") + print("You must be actively debugging before running this!") + print("If it fails, double check that you are actively debugging before running.") + + # Create the output directory + timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') + output_path = "UnicornContext_" + timestamp + if not os.path.exists(output_path): + os.makedirs(output_path) + print("Process context will be output to {}".format(output_path)) + + # Get the context + context = { + "arch": dump_arch_info(), + "regs": dump_regs(), + "segments": dump_process_memory(output_path), + } + + # Write the index file + index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') + index_file.write(json.dumps(context, indent=4)) + index_file.close() + print("Done.") + + except Exception, e: + print("!!! ERROR:\n\t{}".format(repr(e))) + +if __name__ == "__main__": + main() +elif lldb.debugger: + main() diff --git a/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py b/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py new file mode 100644 index 00000000..bf2367cf --- /dev/null +++ b/unicorn_mode/helper_scripts/unicorn_dumper_pwndbg.py @@ -0,0 +1,224 @@ +""" + unicorn_dumper_pwndbg.py + + When run with GDB sitting at a debug breakpoint, this + dumps the current state (registers/memory/etc) of + the process to a directory consisting of an index + file with register and segment information and + sub-files containing all actual process memory. + + The output of this script is expected to be used + to initialize context for Unicorn emulation. + + ----------- + + In order to run this script, PWNDBG needs to be running in the GDB session (gdbinit.py) + # HELPERS from: https://github.com/pwndbg/pwndbg + It can be loaded with: + source <path_to_pwndbg>/gdbinit.py + + Call this function when at a breakpoint in your process with: + source unicorn_dumper_pwndbg.py + + ----------- + + +""" + +import datetime +import hashlib +import json +import os +import sys +import time +import zlib + +# GDB Python SDK +import gdb + +pwndbg_loaded = False + +try: + import pwndbg.arch + import pwndbg.regs + import pwndbg.vmmap + import pwndbg.memory + + pwndbg_loaded = True + +except ImportError: + print("!!! PWNGDB not running in GDB. Please run gdbinit.py by executing:") + print('\tpython execfile ("<path_to_pwndbg>/gdbinit.py")') + +# Maximum segment size that we'll store +# Yep, this could break stuff pretty quickly if we +# omit something that's used during emulation. +MAX_SEG_SIZE = 128 * 1024 * 1024 + +# Name of the index file +INDEX_FILE_NAME = "_index.json" + +#---------------------- +#---- Helper Functions + +def map_arch(): + arch = pwndbg.arch.current # from PWNDBG + if 'x86_64' in arch or 'x86-64' in arch: + return "x64" + elif 'x86' in arch or 'i386' in arch: + return "x86" + elif 'aarch64' in arch or 'arm64' in arch: + return "arm64le" + elif 'aarch64_be' in arch: + return "arm64be" + elif 'arm' in arch: + cpsr = pwndbg.regs['cpsr'] + # check endianess + if pwndbg.arch.endian == 'big': + # check for THUMB mode + if (cpsr & (1 << 5)): + return "armbethumb" + else: + return "armbe" + else: + # check for THUMB mode + if (cpsr & (1 << 5)): + return "armlethumb" + else: + return "armle" + elif 'mips' in arch: + if pwndbg.arch.endian == 'little': + return 'mipsel' + else: + return 'mips' + else: + return "" + + +#----------------------- +#---- Dumping functions + +def dump_arch_info(): + arch_info = {} + arch_info["arch"] = map_arch() + return arch_info + + +def dump_regs(): + reg_state = {} + for reg in pwndbg.regs.all: + reg_val = pwndbg.regs[reg] + # current dumper script looks for register values to be hex strings +# reg_str = "0x{:08x}".format(reg_val) +# if "64" in get_arch(): +# reg_str = "0x{:016x}".format(reg_val) +# reg_state[reg.strip().strip('$')] = reg_str + reg_state[reg.strip().strip('$')] = reg_val + return reg_state + + +def dump_process_memory(output_dir): + # Segment information dictionary + final_segment_list = [] + + # PWNDBG: + vmmap = pwndbg.vmmap.get() + + # Pointer to end of last dumped memory segment + segment_last_addr = 0x0; + + start = None + end = None + + if not vmmap: + print("No address mapping information found") + return final_segment_list + + # Assume segment entries are sorted by start address + for entry in vmmap: + if entry.start == entry.end: + continue + + start = entry.start + end = entry.end + + if (segment_last_addr > entry.start): # indicates overlap + if (segment_last_addr > entry.end): # indicates complete overlap, so we skip the segment entirely + continue + else: + start = segment_last_addr + + + seg_info = {'start': start, 'end': end, 'name': entry.objfile, 'permissions': { + "r": entry.read, + "w": entry.write, + "x": entry.execute + }, 'content_file': ''} + + # "(deleted)" may or may not be valid, but don't push it. + if entry.read and not '(deleted)' in entry.objfile: + try: + # Compress and dump the content to a file + seg_content = pwndbg.memory.read(start, end - start) + if(seg_content == None): + print("Segment empty: @0x{0:016x} (size:UNKNOWN) {1}".format(entry.start, entry.objfile)) + else: + print("Dumping segment @0x{0:016x} (size:0x{1:x}): {2} [{3}]".format(entry.start, len(seg_content), entry.objfile, repr(seg_info['permissions']))) + compressed_seg_content = zlib.compress(seg_content) + md5_sum = hashlib.md5(compressed_seg_content).hexdigest() + ".bin" + seg_info["content_file"] = md5_sum + + # Write the compressed contents to disk + out_file = open(os.path.join(output_dir, md5_sum), 'wb') + out_file.write(compressed_seg_content) + out_file.close() + + except: + print("Exception reading segment ({}): {}".format(entry.objfile, sys.exc_info()[0])) + else: + print("Skipping segment {0}@0x{1:016x}".format(entry.objfile, entry.start)) + + segment_last_addr = end + + # Add the segment to the list + final_segment_list.append(seg_info) + + + return final_segment_list + +#---------- +#---- Main + +def main(): + print("----- Unicorn Context Dumper -----") + print("You must be actively debugging before running this!") + print("If it fails, double check that you are actively debugging before running.") + + try: + + # Create the output directory + timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S') + output_path = "UnicornContext_" + timestamp + if not os.path.exists(output_path): + os.makedirs(output_path) + print("Process context will be output to {}".format(output_path)) + + # Get the context + context = { + "arch": dump_arch_info(), + "regs": dump_regs(), + "segments": dump_process_memory(output_path), + } + + # Write the index file + index_file = open(os.path.join(output_path, INDEX_FILE_NAME), 'w') + index_file.write(json.dumps(context, indent=4)) + index_file.close() + print("Done.") + + except Exception as e: + print("!!! ERROR:\n\t{}".format(repr(e))) + +if __name__ == "__main__" and pwndbg_loaded: + main() + diff --git a/unicorn_mode/helper_scripts/unicorn_loader.py b/unicorn_mode/helper_scripts/unicorn_loader.py new file mode 100644 index 00000000..adf21b64 --- /dev/null +++ b/unicorn_mode/helper_scripts/unicorn_loader.py @@ -0,0 +1,560 @@ +""" + unicorn_loader.py + + Loads a process context dumped created using a + Unicorn Context Dumper script into a Unicorn Engine + instance. Once this is performed emulation can be + started. +""" + +import argparse +import binascii +from collections import namedtuple +import datetime +import hashlib +import json +import os +import signal +import struct +import time +import zlib + +# Unicorn imports +from unicorn import * +from unicorn.arm_const import * +from unicorn.arm64_const import * +from unicorn.x86_const import * +from unicorn.mips_const import * + +# Name of the index file +INDEX_FILE_NAME = "_index.json" + +# Page size required by Unicorn +UNICORN_PAGE_SIZE = 0x1000 + +# Max allowable segment size (1G) +MAX_ALLOWABLE_SEG_SIZE = 1024 * 1024 * 1024 + +# Alignment functions to align all memory segments to Unicorn page boundaries (4KB pages only) +ALIGN_PAGE_DOWN = lambda x: x & ~(UNICORN_PAGE_SIZE - 1) +ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE-1) + +#--------------------------------------- +#---- Unicorn-based heap implementation + +class UnicornSimpleHeap(object): + """ Use this class to provide a simple heap implementation. This should + be used if malloc/free calls break things during emulation. This heap also + implements basic guard-page capabilities which enable immediate notice of + heap overflow and underflows. + """ + + # Helper data-container used to track chunks + class HeapChunk(object): + def __init__(self, actual_addr, total_size, data_size): + self.total_size = total_size # Total size of the chunk (including padding and guard page) + self.actual_addr = actual_addr # Actual start address of the chunk + self.data_size = data_size # Size requested by the caller of actual malloc call + self.data_addr = actual_addr + UNICORN_PAGE_SIZE # Address where data actually starts + + # Returns true if the specified buffer is completely within the chunk, else false + def is_buffer_in_chunk(self, addr, size): + if addr >= self.data_addr and ((addr + size) <= (self.data_addr + self.data_size)): + return True + else: + return False + + # Skip the zero-page to avoid weird potential issues with segment registers + HEAP_MIN_ADDR = 0x00002000 + HEAP_MAX_ADDR = 0xFFFFFFFF + + _uc = None # Unicorn engine instance to interact with + _chunks = [] # List of all known chunks + _debug_print = False # True to print debug information + + def __init__(self, uc, debug_print=False): + self._uc = uc + self._debug_print = debug_print + + # Add the watchpoint hook that will be used to implement psuedo-guard page support + self._uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, self.__check_mem_access) + + def malloc(self, size): + # Figure out the overall size to be allocated/mapped + # - Allocate at least 1 4k page of memory to make Unicorn happy + # - Add guard pages at the start and end of the region + total_chunk_size = UNICORN_PAGE_SIZE + ALIGN_PAGE_UP(size) + UNICORN_PAGE_SIZE + # Gross but efficient way to find space for the chunk: + chunk = None + for addr in xrange(self.HEAP_MIN_ADDR, self.HEAP_MAX_ADDR, UNICORN_PAGE_SIZE): + try: + self._uc.mem_map(addr, total_chunk_size, UC_PROT_READ | UC_PROT_WRITE) + chunk = self.HeapChunk(addr, total_chunk_size, size) + if self._debug_print: + print("Allocating 0x{0:x}-byte chunk @ 0x{1:016x}".format(chunk.data_size, chunk.data_addr)) + break + except UcError as e: + continue + # Something went very wrong + if chunk == None: + return 0 + self._chunks.append(chunk) + return chunk.data_addr + + def calloc(self, size, count): + # Simple wrapper around malloc with calloc() args + return self.malloc(size*count) + + def realloc(self, ptr, new_size): + # Wrapper around malloc(new_size) / memcpy(new, old, old_size) / free(old) + if self._debug_print: + print("Reallocating chunk @ 0x{0:016x} to be 0x{1:x} bytes".format(ptr, new_size)) + old_chunk = None + for chunk in self._chunks: + if chunk.data_addr == ptr: + old_chunk = chunk + new_chunk_addr = self.malloc(new_size) + if old_chunk != None: + self._uc.mem_write(new_chunk_addr, str(self._uc.mem_read(old_chunk.data_addr, old_chunk.data_size))) + self.free(old_chunk.data_addr) + return new_chunk_addr + + def free(self, addr): + for chunk in self._chunks: + if chunk.is_buffer_in_chunk(addr, 1): + if self._debug_print: + print("Freeing 0x{0:x}-byte chunk @ 0x{0:016x}".format(chunk.req_size, chunk.data_addr)) + self._uc.mem_unmap(chunk.actual_addr, chunk.total_size) + self._chunks.remove(chunk) + return True + return False + + # Implements basic guard-page functionality + def __check_mem_access(self, uc, access, address, size, value, user_data): + for chunk in self._chunks: + if address >= chunk.actual_addr and ((address + size) <= (chunk.actual_addr + chunk.total_size)): + if chunk.is_buffer_in_chunk(address, size) == False: + if self._debug_print: + print("Heap over/underflow attempting to {0} 0x{1:x} bytes @ {2:016x}".format( \ + "write" if access == UC_MEM_WRITE else "read", size, address)) + # Force a memory-based crash + uc.force_crash(UcError(UC_ERR_READ_PROT)) + +#--------------------------- +#---- Loading function + +class AflUnicornEngine(Uc): + + def __init__(self, context_directory, enable_trace=False, debug_print=False): + """ + Initializes an AflUnicornEngine instance, which extends standard the UnicornEngine + with a bunch of helper routines that are useful for creating afl-unicorn test harnesses. + + Parameters: + - context_directory: Path to the directory generated by one of the context dumper scripts + - enable_trace: If True trace information will be printed to STDOUT + - debug_print: If True debugging information will be printed while loading the context + """ + + # Make sure the index file exists and load it + index_file_path = os.path.join(context_directory, INDEX_FILE_NAME) + if not os.path.isfile(index_file_path): + raise Exception("Index file not found. Expected it to be at {}".format(index_file_path)) + + # Load the process context from the index file + if debug_print: + print("Loading process context index from {}".format(index_file_path)) + index_file = open(index_file_path, 'r') + context = json.load(index_file) + index_file.close() + + # Check the context to make sure we have the basic essential components + if 'arch' not in context: + raise Exception("Couldn't find architecture information in index file") + if 'regs' not in context: + raise Exception("Couldn't find register information in index file") + if 'segments' not in context: + raise Exception("Couldn't find segment/memory information in index file") + + # Set the UnicornEngine instance's architecture and mode + self._arch_str = context['arch']['arch'] + arch, mode = self.__get_arch_and_mode(self._arch_str) + Uc.__init__(self, arch, mode) + + # Load the registers + regs = context['regs'] + reg_map = self.__get_register_map(self._arch_str) + for register, value in regs.iteritems(): + if debug_print: + print("Reg {0} = {1}".format(register, value)) + if not reg_map.has_key(register.lower()): + if debug_print: + print("Skipping Reg: {}".format(register)) + else: + reg_write_retry = True + try: + self.reg_write(reg_map[register.lower()], value) + reg_write_retry = False + except Exception as e: + if debug_print: + print("ERROR writing register: {}, value: {} -- {}".format(register, value, repr(e))) + + if reg_write_retry: + if debug_print: + print("Trying to parse value ({}) as hex string".format(value)) + try: + self.reg_write(reg_map[register.lower()], int(value, 16)) + except Exception as e: + if debug_print: + print("ERROR writing hex string register: {}, value: {} -- {}".format(register, value, repr(e))) + + # Setup the memory map and load memory content + self.__map_segments(context['segments'], context_directory, debug_print) + + if enable_trace: + self.hook_add(UC_HOOK_BLOCK, self.__trace_block) + self.hook_add(UC_HOOK_CODE, self.__trace_instruction) + self.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, self.__trace_mem_access) + self.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_READ_INVALID, self.__trace_mem_invalid_access) + + if debug_print: + print("Done loading context.") + + def get_arch(self): + return self._arch + + def get_mode(self): + return self._mode + + def get_arch_str(self): + return self._arch_str + + def force_crash(self, uc_error): + """ This function should be called to indicate to AFL that a crash occurred during emulation. + You can pass the exception received from Uc.emu_start + """ + mem_errors = [ + UC_ERR_READ_UNMAPPED, UC_ERR_READ_PROT, UC_ERR_READ_UNALIGNED, + UC_ERR_WRITE_UNMAPPED, UC_ERR_WRITE_PROT, UC_ERR_WRITE_UNALIGNED, + UC_ERR_FETCH_UNMAPPED, UC_ERR_FETCH_PROT, UC_ERR_FETCH_UNALIGNED, + ] + if uc_error.errno in mem_errors: + # Memory error - throw SIGSEGV + os.kill(os.getpid(), signal.SIGSEGV) + elif uc_error.errno == UC_ERR_INSN_INVALID: + # Invalid instruction - throw SIGILL + os.kill(os.getpid(), signal.SIGILL) + else: + # Not sure what happened - throw SIGABRT + os.kill(os.getpid(), signal.SIGABRT) + + def dump_regs(self): + """ Dumps the contents of all the registers to STDOUT """ + for reg in sorted(self.__get_register_map(self._arch_str).items(), key=lambda reg: reg[0]): + print(">>> {0:>4}: 0x{1:016x}".format(reg[0], self.reg_read(reg[1]))) + + # TODO: Make this dynamically get the stack pointer register and pointer width for the current architecture + """ + def dump_stack(self, window=10): + print(">>> Stack:") + stack_ptr_addr = self.reg_read(UC_X86_REG_RSP) + for i in xrange(-window, window + 1): + addr = stack_ptr_addr + (i*8) + print("{0}0x{1:016x}: 0x{2:016x}".format( \ + 'SP->' if i == 0 else ' ', addr, \ + struct.unpack('<Q', self.mem_read(addr, 8))[0])) + """ + + #----------------------------- + #---- Loader Helper Functions + + def __map_segment(self, name, address, size, perms, debug_print=False): + # - size is unsigned and must be != 0 + # - starting address must be aligned to 4KB + # - map size must be multiple of the page size (4KB) + mem_start = address + mem_end = address + size + mem_start_aligned = ALIGN_PAGE_DOWN(mem_start) + mem_end_aligned = ALIGN_PAGE_UP(mem_end) + if debug_print: + if mem_start_aligned != mem_start or mem_end_aligned != mem_end: + print("Aligning segment to page boundary:") + print(" name: {}".format(name)) + print(" start: {0:016x} -> {1:016x}".format(mem_start, mem_start_aligned)) + print(" end: {0:016x} -> {1:016x}".format(mem_end, mem_end_aligned)) + print("Mapping segment from {0:016x} - {1:016x} with perm={2}: {3}".format(mem_start_aligned, mem_end_aligned, perms, name)) + if(mem_start_aligned < mem_end_aligned): + self.mem_map(mem_start_aligned, mem_end_aligned - mem_start_aligned, perms) + + + def __map_segments(self, segment_list, context_directory, debug_print=False): + for segment in segment_list: + + # Get the segment information from the index + name = segment['name'] + seg_start = segment['start'] + seg_end = segment['end'] + perms = \ + (UC_PROT_READ if segment['permissions']['r'] == True else 0) | \ + (UC_PROT_WRITE if segment['permissions']['w'] == True else 0) | \ + (UC_PROT_EXEC if segment['permissions']['x'] == True else 0) + + if debug_print: + print("Handling segment {}".format(name)) + + # Check for any overlap with existing segments. If there is, it must + # be consolidated and merged together before mapping since Unicorn + # doesn't allow overlapping segments. + found = False + overlap_start = False + overlap_end = False + tmp = 0 + for (mem_start, mem_end, mem_perm) in self.mem_regions(): + mem_end = mem_end + 1 + if seg_start >= mem_start and seg_end < mem_end: + found = True + break + if seg_start >= mem_start and seg_start < mem_end: + overlap_start = True + tmp = mem_end + break + if seg_end >= mem_start and seg_end < mem_end: + overlap_end = True + tmp = mem_start + break + + # Map memory into the address space if it is of an acceptable size. + if (seg_end - seg_start) > MAX_ALLOWABLE_SEG_SIZE: + if debug_print: + print("Skipping segment (LARGER THAN {0}) from {1:016x} - {2:016x} with perm={3}: {4}".format(MAX_ALLOWABLE_SEG_SIZE, seg_start, seg_end, perms, name)) + continue + elif not found: # Make sure it's not already mapped + if overlap_start: # Partial overlap (start) + self.__map_segment(name, tmp, seg_end - tmp, perms, debug_print) + elif overlap_end: # Patrial overlap (end) + self.__map_segment(name, seg_start, tmp - seg_start, perms, debug_print) + else: # Not found + self.__map_segment(name, seg_start, seg_end - seg_start, perms, debug_print) + else: + if debug_print: + print("Segment {} already mapped. Moving on.".format(name)) + + # Load the content (if available) + if 'content_file' in segment and len(segment['content_file']) > 0: + content_file_path = os.path.join(context_directory, segment['content_file']) + if not os.path.isfile(content_file_path): + raise Exception("Unable to find segment content file. Expected it to be at {}".format(content_file_path)) + #if debug_print: + # print("Loading content for segment {} from {}".format(name, segment['content_file'])) + content_file = open(content_file_path, 'rb') + compressed_content = content_file.read() + content_file.close() + self.mem_write(seg_start, zlib.decompress(compressed_content)) + + else: + if debug_print: + print("No content found for segment {0} @ {1:016x}".format(name, seg_start)) + self.mem_write(seg_start, '\x00' * (seg_end - seg_start)) + + def __get_arch_and_mode(self, arch_str): + arch_map = { + "x64" : [ UC_X86_REG_RIP, UC_ARCH_X86, UC_MODE_64 ], + "x86" : [ UC_X86_REG_EIP, UC_ARCH_X86, UC_MODE_32 ], + "arm64be" : [ UC_ARM64_REG_PC, UC_ARCH_ARM64, UC_MODE_ARM | UC_MODE_BIG_ENDIAN ], + "arm64le" : [ UC_ARM64_REG_PC, UC_ARCH_ARM64, UC_MODE_ARM | UC_MODE_LITTLE_ENDIAN ], + "armbe" : [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_ARM | UC_MODE_BIG_ENDIAN ], + "armle" : [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_ARM | UC_MODE_LITTLE_ENDIAN ], + "armbethumb": [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_THUMB | UC_MODE_BIG_ENDIAN ], + "armlethumb": [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_THUMB | UC_MODE_LITTLE_ENDIAN ], + "mips" : [ UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS32 | UC_MODE_BIG_ENDIAN ], + "mipsel" : [ UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS32 | UC_MODE_LITTLE_ENDIAN ], + } + return (arch_map[arch_str][1], arch_map[arch_str][2]) + + def __get_register_map(self, arch): + if arch == "arm64le" or arch == "arm64be": + arch = "arm64" + elif arch == "armle" or arch == "armbe" or "thumb" in arch: + arch = "arm" + elif arch == "mipsel": + arch = "mips" + + registers = { + "x64" : { + "rax": UC_X86_REG_RAX, + "rbx": UC_X86_REG_RBX, + "rcx": UC_X86_REG_RCX, + "rdx": UC_X86_REG_RDX, + "rsi": UC_X86_REG_RSI, + "rdi": UC_X86_REG_RDI, + "rbp": UC_X86_REG_RBP, + "rsp": UC_X86_REG_RSP, + "r8": UC_X86_REG_R8, + "r9": UC_X86_REG_R9, + "r10": UC_X86_REG_R10, + "r11": UC_X86_REG_R11, + "r12": UC_X86_REG_R12, + "r13": UC_X86_REG_R13, + "r14": UC_X86_REG_R14, + "r15": UC_X86_REG_R15, + "rip": UC_X86_REG_RIP, + "rsp": UC_X86_REG_RSP, + "efl": UC_X86_REG_EFLAGS, + "cs": UC_X86_REG_CS, + "ds": UC_X86_REG_DS, + "es": UC_X86_REG_ES, + "fs": UC_X86_REG_FS, + "gs": UC_X86_REG_GS, + "ss": UC_X86_REG_SS, + }, + "x86" : { + "eax": UC_X86_REG_EAX, + "ebx": UC_X86_REG_EBX, + "ecx": UC_X86_REG_ECX, + "edx": UC_X86_REG_EDX, + "esi": UC_X86_REG_ESI, + "edi": UC_X86_REG_EDI, + "ebp": UC_X86_REG_EBP, + "esp": UC_X86_REG_ESP, + "eip": UC_X86_REG_EIP, + "esp": UC_X86_REG_ESP, + "efl": UC_X86_REG_EFLAGS, + # Segment registers removed... + # They caused segfaults (from unicorn?) when they were here + }, + "arm" : { + "r0": UC_ARM_REG_R0, + "r1": UC_ARM_REG_R1, + "r2": UC_ARM_REG_R2, + "r3": UC_ARM_REG_R3, + "r4": UC_ARM_REG_R4, + "r5": UC_ARM_REG_R5, + "r6": UC_ARM_REG_R6, + "r7": UC_ARM_REG_R7, + "r8": UC_ARM_REG_R8, + "r9": UC_ARM_REG_R9, + "r10": UC_ARM_REG_R10, + "r11": UC_ARM_REG_R11, + "r12": UC_ARM_REG_R12, + "pc": UC_ARM_REG_PC, + "sp": UC_ARM_REG_SP, + "lr": UC_ARM_REG_LR, + "cpsr": UC_ARM_REG_CPSR + }, + "arm64" : { + "x0": UC_ARM64_REG_X0, + "x1": UC_ARM64_REG_X1, + "x2": UC_ARM64_REG_X2, + "x3": UC_ARM64_REG_X3, + "x4": UC_ARM64_REG_X4, + "x5": UC_ARM64_REG_X5, + "x6": UC_ARM64_REG_X6, + "x7": UC_ARM64_REG_X7, + "x8": UC_ARM64_REG_X8, + "x9": UC_ARM64_REG_X9, + "x10": UC_ARM64_REG_X10, + "x11": UC_ARM64_REG_X11, + "x12": UC_ARM64_REG_X12, + "x13": UC_ARM64_REG_X13, + "x14": UC_ARM64_REG_X14, + "x15": UC_ARM64_REG_X15, + "x16": UC_ARM64_REG_X16, + "x17": UC_ARM64_REG_X17, + "x18": UC_ARM64_REG_X18, + "x19": UC_ARM64_REG_X19, + "x20": UC_ARM64_REG_X20, + "x21": UC_ARM64_REG_X21, + "x22": UC_ARM64_REG_X22, + "x23": UC_ARM64_REG_X23, + "x24": UC_ARM64_REG_X24, + "x25": UC_ARM64_REG_X25, + "x26": UC_ARM64_REG_X26, + "x27": UC_ARM64_REG_X27, + "x28": UC_ARM64_REG_X28, + "pc": UC_ARM64_REG_PC, + "sp": UC_ARM64_REG_SP, + "fp": UC_ARM64_REG_FP, + "lr": UC_ARM64_REG_LR, + "nzcv": UC_ARM64_REG_NZCV, + "cpsr": UC_ARM_REG_CPSR, + }, + "mips" : { + "0" : UC_MIPS_REG_ZERO, + "at": UC_MIPS_REG_AT, + "v0": UC_MIPS_REG_V0, + "v1": UC_MIPS_REG_V1, + "a0": UC_MIPS_REG_A0, + "a1": UC_MIPS_REG_A1, + "a2": UC_MIPS_REG_A2, + "a3": UC_MIPS_REG_A3, + "t0": UC_MIPS_REG_T0, + "t1": UC_MIPS_REG_T1, + "t2": UC_MIPS_REG_T2, + "t3": UC_MIPS_REG_T3, + "t4": UC_MIPS_REG_T4, + "t5": UC_MIPS_REG_T5, + "t6": UC_MIPS_REG_T6, + "t7": UC_MIPS_REG_T7, + "t8": UC_MIPS_REG_T8, + "t9": UC_MIPS_REG_T9, + "s0": UC_MIPS_REG_S0, + "s1": UC_MIPS_REG_S1, + "s2": UC_MIPS_REG_S2, + "s3": UC_MIPS_REG_S3, + "s4": UC_MIPS_REG_S4, + "s5": UC_MIPS_REG_S5, + "s6": UC_MIPS_REG_S6, + "s7": UC_MIPS_REG_S7, + "s8": UC_MIPS_REG_S8, + "k0": UC_MIPS_REG_K0, + "k1": UC_MIPS_REG_K1, + "gp": UC_MIPS_REG_GP, + "pc": UC_MIPS_REG_PC, + "sp": UC_MIPS_REG_SP, + "fp": UC_MIPS_REG_FP, + "ra": UC_MIPS_REG_RA, + "hi": UC_MIPS_REG_HI, + "lo": UC_MIPS_REG_LO + } + } + return registers[arch] + + #--------------------------- + # Callbacks for tracing + + # TODO: Make integer-printing fixed widths dependent on bitness of architecture + # (i.e. only show 4 bytes for 32-bit, 8 bytes for 64-bit) + + # TODO: Figure out how best to determine the capstone mode and architecture here + """ + try: + # If Capstone is installed then we'll dump disassembly, otherwise just dump the binary. + from capstone import * + cs = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN) + def __trace_instruction(self, uc, address, size, user_data): + mem = uc.mem_read(address, size) + for (cs_address, cs_size, cs_mnemonic, cs_opstr) in cs.disasm_lite(bytes(mem), size): + print(" Instr: {:#016x}:\t{}\t{}".format(address, cs_mnemonic, cs_opstr)) + except ImportError: + def __trace_instruction(self, uc, address, size, user_data): + print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + """ + + def __trace_instruction(self, uc, address, size, user_data): + print(" Instr: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + + def __trace_block(self, uc, address, size, user_data): + print("Basic Block: addr=0x{0:016x}, size=0x{1:016x}".format(address, size)) + + def __trace_mem_access(self, uc, access, address, size, value, user_data): + if access == UC_MEM_WRITE: + print(" >>> Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) + else: + print(" >>> Read: addr=0x{0:016x} size={1}".format(address, size)) + + def __trace_mem_invalid_access(self, uc, access, address, size, value, user_data): + if access == UC_MEM_WRITE_UNMAPPED: + print(" >>> INVALID Write: addr=0x{0:016x} size={1} data=0x{2:016x}".format(address, size, value)) + else: + print(" >>> INVALID Read: addr=0x{0:016x} size={1}".format(address, size)) + |