about summary refs log tree commit diff
path: root/custom_mutators/aflpp_tritondse/aflpp_tritondse.py
diff options
context:
space:
mode:
Diffstat (limited to 'custom_mutators/aflpp_tritondse/aflpp_tritondse.py')
-rw-r--r--custom_mutators/aflpp_tritondse/aflpp_tritondse.py148
1 files changed, 148 insertions, 0 deletions
diff --git a/custom_mutators/aflpp_tritondse/aflpp_tritondse.py b/custom_mutators/aflpp_tritondse/aflpp_tritondse.py
new file mode 100644
index 00000000..e0219f0b
--- /dev/null
+++ b/custom_mutators/aflpp_tritondse/aflpp_tritondse.py
@@ -0,0 +1,148 @@
+import sys
+import os
+import logging
+import hashlib
+
+from tritondse import CleLoader
+from tritondse import CompositeData
+from tritondse import Config
+from tritondse import CoverageStrategy
+from tritondse import ProcessState
+from tritondse import Program
+from tritondse import Seed
+from tritondse import SeedFormat
+from tritondse import SymbolicExecutor
+from tritondse import SymbolicExplorator
+
+is_debug = False
+out_path = ""
+input_file = None
+prog = None
+config = None
+dse = None
+cycle = 0
+count = 0
+hashes = set()
+format = SeedFormat.RAW
+
+def pre_exec_hook(se: SymbolicExecutor, state: ProcessState):
+    global count
+    global hashes
+    if se.seed.hash not in hashes:
+        hashes.add(se.seed.hash)
+        filename = out_path + "/id:" + f"{count:06}" + "," + se.seed.hash
+        if not os.path.exists(filename):
+            if is_debug:
+                print('Creating queue input ' + filename)
+            with open(filename, 'wb') as file:
+                if input_file:
+                    file.write(se.seed.content.files[input_file])
+                else:
+                    file.write(se.seed.content)
+                count += 1
+    #if input_file:
+    #    if is_debug:
+    #        print('Writing to ' + input_file + ' the content: ' + str(se.seed.content))
+    #    with open(input_file, 'wb') as file:
+    #        file.write(se.seed.content)
+
+
+def init(seed):
+    global config
+    global dse
+    global format
+    global input_file
+    global is_debug
+    global out_path
+    global prog
+    # Load the program (LIEF-based program loader).
+    prog = CleLoader(os.environ['AFL_CUSTOM_INFO_PROGRAM'])
+    # Process other configuration environment variables.
+    argv = None
+    try:
+        foo = os.environ['AFL_DEBUG']
+        is_debug = True
+    except KeyError:
+        pass
+    try:
+        foo = os.environ['AFL_CUSTOM_INFO_OUT']
+        out_path = foo + '/../tritondse/queue'
+    except KeyError:
+        pass
+    try:
+        foo = os.environ['AFL_CUSTOM_INFO_PROGRAM_INPUT']
+        input_file = foo
+    except KeyError:
+        pass
+    try:
+        argv_list = os.environ['AFL_CUSTOM_INFO_PROGRAM_ARGV']
+        argv_tmp = [ os.environ['AFL_CUSTOM_INFO_PROGRAM'] ]
+        argv_tmp += argv_list.split()
+        argv = []
+        # now check for @@
+        for item in argv_tmp:
+            if "@@" in item:
+                input_file = out_path + '/../.input'
+                argv.append(input_file)
+            else:
+                argv.append(item)
+    except KeyError:
+        pass
+    # Create the output directory
+    os.makedirs(out_path, exist_ok=True)
+    # Debug
+    if is_debug:
+        print('DEBUG target: ' + os.environ['AFL_CUSTOM_INFO_PROGRAM'])
+        if argv:
+            print('DEBUG argv: ')
+            print(argv)
+        if input_file:
+            print('DEBUG input_file: ' + input_file)
+        print('DEBUG out_path: ' + out_path)
+        print('')
+    if input_file:
+        format = SeedFormat.COMPOSITE
+    # Now set up TritonDSE
+    config = Config(coverage_strategy = CoverageStrategy.PATH,
+                    debug = is_debug,
+                    pipe_stdout = is_debug,
+                    pipe_stderr = is_debug,
+                    execution_timeout = 1,
+                    program_argv = argv,
+                    smt_timeout= 50,
+                    seed_format = format)
+    # Create an instance of the Symbolic Explorator
+    dse = SymbolicExplorator(config, prog)
+    # Add callbacks.
+    dse.callback_manager.register_pre_execution_callback(pre_exec_hook)
+
+
+#def fuzz(buf, add_buf, max_size):
+#    return b""
+
+
+def queue_new_entry(filename_new_queue, filename_orig_queue):
+    global cycle
+    global dse
+    # Add seed to the worklist.
+    with open(filename_new_queue, "rb") as file:
+        data = file.read()
+    hash = hashlib.md5(data).hexdigest()
+    if hash not in hashes:
+        hashes.add(hash)
+        if is_debug:
+            print("NEW FILE " + filename_new_queue + " hash " + hash + " count " + str(cycle))
+            cycle += 1
+        if input_file:
+            seed = Seed(CompositeData(files={"stdin": b"", # nothing on stdin
+                                  input_file: data}))
+        else:
+            seed = Seed(data)
+        dse.add_input_seed(seed)
+        # Start exploration!
+        #dse.step()
+        dse.explore()
+    pass
+
+def splice_optout():
+    pass