about summary refs log tree commit diff
path: root/custom_mutators/gramatron/preprocess
diff options
context:
space:
mode:
authorvan Hauser <vh@thc.org>2021-07-20 08:57:37 +0200
committerGitHub <noreply@github.com>2021-07-20 08:57:37 +0200
commitfff8c49f7c73a1531166ad52fc50306dbd01775f (patch)
tree217b85dfd5b6ccf62a8fa4ac59a65d615a08143f /custom_mutators/gramatron/preprocess
parentb3fe3b8877931f7ba7c4150fcc24e8cd18835d86 (diff)
parent5bcbb2f59affc411a1e8bb7ccaabaa5ba63e6596 (diff)
downloadafl++-fff8c49f7c73a1531166ad52fc50306dbd01775f.tar.gz
Merge pull request #1034 from AFLplusplus/grammatron
Grammatron
Diffstat (limited to 'custom_mutators/gramatron/preprocess')
-rw-r--r--custom_mutators/gramatron/preprocess/construct_automata.py275
-rw-r--r--custom_mutators/gramatron/preprocess/gnf_converter.py289
-rwxr-xr-xcustom_mutators/gramatron/preprocess/prep_automaton.sh38
3 files changed, 602 insertions, 0 deletions
diff --git a/custom_mutators/gramatron/preprocess/construct_automata.py b/custom_mutators/gramatron/preprocess/construct_automata.py
new file mode 100644
index 00000000..b9e84aa8
--- /dev/null
+++ b/custom_mutators/gramatron/preprocess/construct_automata.py
@@ -0,0 +1,275 @@
+import sys
+import json
+import re
+from collections import defaultdict
+# import pygraphviz as pgv
+
+gram_data = None
+state_count = 1
+pda = []
+worklist = []
+state_stacks = {} 
+
+# === If user provides upper bound on the stack size during FSA creation ===
+# Specifies the upper bound to which the stack is allowed to grow
+# If for any generated state, the stack size is >= stack_limit then this
+# state is not expanded further.
+stack_limit = None 
+# Holds the set of unexpanded rules owing to the user-passed stack constraint limit
+unexpanded_rules = set()
+
+def main(grammar, limit):
+    global worklist, gram_data, stack_limit
+    current = '0'
+    stack_limit = limit
+    if stack_limit:
+        print ('[X] Operating in bounded stack mode')
+
+    with open(grammar, 'r') as fd:
+        gram_data = json.load(fd)
+    start_symbol = gram_data["Start"][0]
+    worklist.append([current, [start_symbol]])
+    # print (grammar)
+    filename = (grammar.split('/')[-1]).split('.')[0]
+    
+
+    while worklist:
+        # Take an element from the worklist
+        # print ('================')
+        # print ('Worklist:', worklist)
+        element = worklist.pop(0)
+        prep_transitions(element)
+    
+    pda_file = filename + '_transition.json'
+    graph_file = filename + '.png'
+    # print ('XXXXXXXXXXXXXXXX')
+    # print ('PDA file:%s Png graph file:%s' % (pda_file, graph_file))
+    # XXX Commented out because visualization of current version of PHP causes segfault
+    # Create the graph and dump the transitions to a file
+    # create_graph(filename)
+    transformed = postprocess()
+    with open(filename + '_automata.json', 'w+') as fd:
+        json.dump(transformed, fd)
+    with open(filename + '_transition.json', 'w+') as fd:
+        json.dump(pda, fd)
+    if not unexpanded_rules:
+        print ('[X] No unexpanded rules, absolute FSA formed')
+        exit(0)
+    else:
+        print ('[X] Certain rules were not expanded due to stack size limit. Inexact approximation has been created and the disallowed rules have been put in {}_disallowed.json'.format(filename))
+        print ('[X] Number of unexpanded rules:', len(unexpanded_rules))
+        with open(filename + '_disallowed.json', 'w+') as fd:
+            json.dump(list(unexpanded_rules), fd)
+
+def create_graph(filename):
+    '''
+    Creates a DOT representation of the PDA
+    '''
+    global pda
+    G = pgv.AGraph(strict = False, directed = True)
+    for transition in pda:
+        print ('Transition:', transition)
+        G.add_edge(transition['source'], transition['dest'], 
+                label = 'Term:{}'.format(transition['terminal']))
+    G.layout(prog = 'dot')
+    print ('Do it up 2')
+    G.draw(filename + '.png')
+
+def prep_transitions(element):
+    '''
+    Generates transitions
+    '''
+    global gram_data, state_count, pda, worklist, state_stacks, stack_limit, unexpanded_rules
+    state = element[0]
+    try:
+        nonterminal = element[1][0] 
+    except IndexError:
+        # Final state was encountered, pop from worklist without doing anything
+        return
+    rules = gram_data[nonterminal]
+    count = 1
+    for rule in rules:
+        isRecursive  = False
+        # print ('Current state:', state)
+        terminal, ss, termIsRegex = tokenize(rule)
+        transition = get_template()
+        transition['trigger'] = '_'.join([state, str(count)])
+        transition['source'] = state
+        transition['dest'] = str(state_count) 
+        transition['ss'] = ss 
+        transition['terminal'] = terminal
+        transition['rule'] = "{} -> {}".format(nonterminal, rule )
+        if termIsRegex:
+            transition['termIsRegex'] = True
+        
+        # Creating a state stack for the new state
+        try:
+            state_stack = state_stacks[state][:]
+        except:
+            state_stack = []
+        if len(state_stack):
+            state_stack.pop(0)
+        if ss:
+            for symbol in ss[::-1]:
+                state_stack.insert(0, symbol)
+        transition['stack'] = state_stack 
+
+        # Check if a recursive transition state being created, if so make a backward
+        # edge and don't add anything to the worklist
+        # print (state_stacks)
+        if state_stacks:
+            for state_element, stack in state_stacks.items():
+                # print ('Stack:', sorted(stack))
+                # print ('State stack:', sorted(state_stack))
+                if sorted(stack) == sorted(state_stack):
+                    transition['dest'] = state_element
+                    # print ('Recursive:', transition)
+                    pda.append(transition)
+                    count += 1
+                    isRecursive = True
+                    break 
+        # If a recursive transition exercised don't add the same transition as a new
+        # edge, continue onto the next transitions
+        if isRecursive:
+            continue
+            
+        # If the generated state has a stack size > stack_limit then that state is abandoned
+        # and not added to the FSA or the worklist for further expansion
+        if stack_limit:
+            if (len(transition['stack']) > stack_limit):
+                unexpanded_rules.add(transition['rule'])
+                continue
+
+        # Create transitions for the non-recursive relations and add to the worklist
+        # print ('Normal:', transition)
+        # print ('State2:', state)
+        pda.append(transition)
+        worklist.append([transition['dest'], transition['stack']])
+        state_stacks[transition['dest']] = state_stack
+        state_count += 1
+        count += 1
+
+def tokenize(rule):
+    '''
+    Gets the terminal and the corresponding stack symbols from a rule in GNF form
+    '''
+    pattern = re.compile("([r])*\'([\s\S]+)\'([\s\S]*)")
+    terminal = None
+    ss = None
+    termIsRegex = False
+    match = pattern.match(rule)
+    if match.group(1):
+        termIsRegex = True
+    if match.group(2):
+        terminal = match.group(2)
+    else:
+        raise AssertionError("Rule is not in GNF form")
+
+    if match.group(3):
+        ss = (match.group(3)).split()
+
+    return terminal, ss, termIsRegex
+
+def get_template():
+    transition_template = {
+            'trigger':None,
+            'source': None,
+            'dest': None,
+            'termIsRegex': False,
+            'terminal' : None,
+            'stack': []
+            }
+    return transition_template
+
+def postprocess():
+    '''
+    Creates a representation to be passed on to the C-module
+    '''
+    global pda
+    final_struct = {}
+    memoized = defaultdict(list)
+    # Supporting data structures for if stack limit is imposed
+    culled_pda = []
+    culled_final = []
+    num_transitions = 0 # Keep track of number of transitions
+
+
+    states, final, initial = _get_states()
+
+    print (initial)
+    assert len(initial) == 1, 'More than one init state found'
+
+    # Cull transitions to states which were not expanded owing to the stack limit
+    if stack_limit:
+
+        blocklist = []
+        for final_state in final:
+            for transition in pda:
+                if (transition["dest"] == final_state) and (len(transition["stack"]) > 0):
+                    blocklist.append(transition["dest"])
+                    continue
+                else:
+                    culled_pda.append(transition)
+        
+        culled_final = [state for state in final if state not in blocklist]
+
+        assert len(culled_final) == 1, 'More than one final state found'
+
+        for transition in culled_pda:
+            state = transition["source"]
+            if transition["dest"] in blocklist:
+                    continue 
+            num_transitions += 1
+            memoized[state].append([transition["trigger"], transition["dest"], 
+                transition["terminal"]])
+        final_struct["init_state"] = initial
+        final_struct["final_state"] = culled_final[0]
+        # The reason we do this is because when states are culled, the indexing is
+        # still relative to the actual number of states hence we keep numstates recorded
+        # as the original number of states
+        print ('[X] Actual Number of states:', len(memoized.keys()))
+        print ('[X] Number of transitions:', num_transitions)
+        print ('[X] Original Number of states:', len(states))
+        final_struct["numstates"] = len(states) 
+        final_struct["pda"] = memoized
+        return final_struct
+    
+    # Running FSA construction in exact approximation mode and postprocessing it like so
+    for transition in pda:
+       state = transition["source"]
+       memoized[state].append([transition["trigger"], transition["dest"], 
+           transition["terminal"]])
+
+    final_struct["init_state"] = initial
+    final_struct["final_state"] = final[0]
+    print ('[X] Actual Number of states:', len(memoized.keys()))
+    final_struct["numstates"] = len(memoized.keys()) 
+    final_struct["pda"] = memoized
+    return final_struct
+
+
+def _get_states():
+    source = set()
+    dest = set()
+    global pda
+    for transition in pda:
+        source.add(transition["source"])
+        dest.add(transition["dest"])
+    source_copy = source.copy()
+    source_copy.update(dest)
+    return list(source_copy), list(dest.difference(source)), str(''.join(list(source.difference(dest))))
+
+if __name__ == '__main__':
+    import argparse
+    parser = argparse.ArgumentParser(description = 'Script to convert GNF grammar to PDA')
+    parser.add_argument(
+            '--gf',
+            type = str,
+            help = 'Location of GNF grammar')
+    parser.add_argument(
+            '--limit',
+            type = int,
+            default = None,
+            help = 'Specify the upper bound for the stack size')
+    args = parser.parse_args()
+    main(args.gf, args.limit)
diff --git a/custom_mutators/gramatron/preprocess/gnf_converter.py b/custom_mutators/gramatron/preprocess/gnf_converter.py
new file mode 100644
index 00000000..1e7c8b6c
--- /dev/null
+++ b/custom_mutators/gramatron/preprocess/gnf_converter.py
@@ -0,0 +1,289 @@
+import sys
+import re
+import copy
+import json
+from string import ascii_uppercase
+from itertools import combinations
+from collections import defaultdict
+
+NONTERMINALSET = []
+COUNT = 1
+
+def main(grammar_file, out, start):
+    grammar = None
+    # If grammar file is a preprocessed NT file, then skip preprocessing
+    if '.json' in grammar_file:
+        with open(grammar_file, 'r') as fd:
+            grammar = json.load(fd)
+    elif '.g4' in grammar_file:
+        with open(grammar_file, 'r') as fd:
+            data = fd.readlines()
+        grammar = preprocess(data)
+    else:
+        raise('Unknwown file format passed. Accepts (.g4/.json)')
+
+    with open('debug_preprocess.json', 'w+') as fd:
+        json.dump(grammar, fd)
+    grammar = remove_unit(grammar) # eliminates unit productions
+    with open('debug_unit.json', 'w+') as fd:
+        json.dump(grammar, fd)
+    grammar = remove_mixed(grammar) # eliminate terminals existing with non-terminals
+    with open('debug_mixed.json', 'w+') as fd:
+        json.dump(grammar, fd)
+    grammar = break_rules(grammar) # eliminate rules with more than two non-terminals
+    with open('debug_break.json', 'w+') as fd:
+        json.dump(grammar, fd)
+    grammar = gnf(grammar)
+
+    # Dump GNF form of the grammar with only reachable rules 
+    # reachable_grammar = get_reachable(grammar, start)
+    # with open('debug_gnf_reachable.json', 'w+') as fd:
+    #     json.dump(reachable_grammar, fd)
+    with open('debug_gnf.json', 'w+') as fd:
+        json.dump(grammar, fd)
+
+    grammar["Start"] = [start]
+    with open(out, 'w+') as fd:
+        json.dump(grammar, fd)
+
+def get_reachable(grammar, start):
+    '''
+    Returns a grammar without dead rules
+    '''
+    reachable_nt = set()
+    worklist = list()
+    processed = set()
+    reachable_grammar = dict()
+    worklist.append(start)
+
+    while worklist:
+        nt = worklist.pop(0)
+        processed.add(nt)
+        reachable_grammar[nt] = grammar[nt]
+        rules = grammar[nt]
+        for rule in rules:
+            tokens = gettokens(rule)
+            for token in tokens:
+                if not isTerminal(token):
+                    if token not in processed:
+                        worklist.append(token)
+    return reachable_grammar
+
+
+def gettokens(rule):
+    pattern = re.compile("([^\s\"\']+)|\"([^\"]*)\"|\'([^\']*)\'")
+    return [matched.group(0) for matched in pattern.finditer(rule)]
+
+def gnf(grammar):
+    old_grammar = copy.deepcopy(grammar)
+    new_grammar = defaultdict(list)
+    isgnf = False
+    while not isgnf:
+        for lhs, rules in old_grammar.items():
+            for rule in rules:
+                tokens = gettokens(rule) 
+                if len(tokens) == 1 and isTerminal(rule):
+                    new_grammar[lhs].append(rule)
+                    continue
+                startoken = tokens[0]
+                endrule = tokens[1:]
+                if not isTerminal(startoken):
+                    newrules = []
+                    extendrules = old_grammar[startoken]
+                    for extension in extendrules:
+                        temprule = endrule[:]
+                        temprule.insert(0, extension)
+                        newrules.append(temprule)
+                    for newnew in newrules:
+                        new_grammar[lhs].append(' '.join(newnew))
+                else:
+                    new_grammar[lhs].append(rule)
+        isgnf = True
+        for lhs, rules in new_grammar.items():
+            for rule in rules:
+                # if "\' \'" or isTerminal(rule):
+                tokens = gettokens(rule)
+                if len(tokens) == 1 and isTerminal(rule):
+                    continue
+                startoken = tokens[0]
+                if not isTerminal(startoken):
+                    isgnf = False
+                    break
+        if not isgnf:
+            old_grammar = copy.deepcopy(new_grammar)
+            new_grammar = defaultdict(list)
+    return new_grammar
+                
+
+def preprocess(data):
+    productions = []
+    production = []
+    for line in data:
+        if line != '\n': 
+            production.append(line)
+        else:
+            productions.append(production)
+            production = []
+    final_rule_set = {}
+    for production in productions:
+        rules = []
+        init = production[0]
+        nonterminal = init.split(':')[0]
+        rules.append(strip_chars(init.split(':')[1]).strip('| '))
+        for production_rule in production[1:]:
+            rules.append(strip_chars(production_rule.split('|')[0]))
+        final_rule_set[nonterminal] = rules
+    # for line in data:
+    #     if line != '\n':
+    #         production.append(line)
+    return final_rule_set
+
+def remove_unit(grammar):
+    nounitproductions = False 
+    old_grammar = copy.deepcopy(grammar)
+    new_grammar = defaultdict(list)
+    while not nounitproductions:
+        for lhs, rules in old_grammar.items():
+            for rhs in rules:
+                # Checking if the rule is a unit production rule
+                if len(gettokens(rhs)) == 1:
+                    if not isTerminal(rhs):
+                        new_grammar[lhs].extend([rule for rule in old_grammar[rhs]])
+                    else:
+                        new_grammar[lhs].append(rhs)
+                else:
+                    new_grammar[lhs].append(rhs)
+        # Checking there are no unit productions left in the grammar 
+        nounitproductions = True
+        for lhs, rules in new_grammar.items():
+            for rhs in rules:
+                if len(gettokens(rhs)) == 1:
+                    if not isTerminal(rhs):
+                        nounitproductions = False
+                        break
+            if not nounitproductions:
+                break
+        # Unit productions are still there in the grammar -- repeat the process
+        if not nounitproductions:
+            old_grammar = copy.deepcopy(new_grammar)
+            new_grammar = defaultdict(list)
+    return new_grammar
+
+def isTerminal(rule):
+    # pattern = re.compile("([r]*\'[\s\S]+\')")
+    pattern = re.compile("\'(.*?)\'")
+    match = pattern.match(rule)
+    if match:
+        return True
+    else:
+        return False
+
+def remove_mixed(grammar):
+    '''
+    Remove rules where there are terminals mixed in with non-terminals
+    '''
+    new_grammar = defaultdict(list)
+    for lhs, rules in grammar.items():
+        for rhs in rules:
+            # tokens = rhs.split(' ')
+            regen_rule = []
+            tokens = gettokens(rhs)
+            if len(gettokens(rhs)) == 1:
+                new_grammar[lhs].append(rhs)
+                continue
+            for token in tokens:
+                # Identify if there is a terminal in the RHS
+                if isTerminal(token):
+                    # Check if a corresponding nonterminal already exists
+                    nonterminal = terminal_exist(token, new_grammar)
+                    if nonterminal:
+                        regen_rule.append(nonterminal)
+                    else:
+                        new_nonterm = get_nonterminal()
+                        new_grammar[new_nonterm].append(token)
+                        regen_rule.append(new_nonterm)
+                else:
+                    regen_rule.append(token)
+            new_grammar[lhs].append(' '.join(regen_rule))
+    return new_grammar
+
+def break_rules(grammar):
+    new_grammar = defaultdict(list)
+    old_grammar = copy.deepcopy(grammar)
+    nomulti = False
+    while not nomulti:
+        for lhs, rules in old_grammar.items():
+            for rhs in rules:
+                tokens = gettokens(rhs)
+                if len(tokens) > 2 and (not isTerminal(rhs)):
+                    split = tokens[:-1] 
+                    nonterminal = terminal_exist(' '.join(split), new_grammar)
+                    if nonterminal:
+                        newrule = ' '.join([nonterminal, tokens[-1]])
+                        new_grammar[lhs].append(newrule)
+                    else:
+                        nonterminal = get_nonterminal()
+                        new_grammar[nonterminal].append(' '.join(split))
+                        newrule = ' '.join([nonterminal, tokens[-1]])
+                        new_grammar[lhs].append(newrule)
+                else:
+                    new_grammar[lhs].append(rhs)
+        nomulti = True
+        for lhs, rules in new_grammar.items():
+            for rhs in rules:
+                # tokens = rhs.split(' ')
+                tokens = gettokens(rhs)
+                if len(tokens) > 2 and (not isTerminal(rhs)):
+                    nomulti = False
+                    break
+        if not nomulti:
+            old_grammar = copy.deepcopy(new_grammar)
+            new_grammar = defaultdict(list)
+    return new_grammar
+
+def strip_chars(rule):
+    return rule.strip('\n\t ')
+
+def get_nonterminal():
+    global NONTERMINALSET
+    if NONTERMINALSET:
+        return NONTERMINALSET.pop(0)
+    else:
+        _repopulate()
+        return NONTERMINALSET.pop(0)
+
+def _repopulate():
+    global COUNT
+    global NONTERMINALSET
+    NONTERMINALSET = [''.join(x) for x in list(combinations(ascii_uppercase, COUNT))]
+    COUNT += 1
+
+def terminal_exist(token, grammar):
+    for nonterminal, rules in grammar.items():
+        if token in rules:
+            return nonterminal
+    return None
+
+
+
+if __name__ == '__main__':
+    import argparse
+    parser = argparse.ArgumentParser(description = 'Script to convert grammar to GNF form')
+    parser.add_argument(
+            '--gf',
+            type = str,
+            required = True,
+            help = 'Location of grammar file')
+    parser.add_argument(
+            '--out',
+            type = str,
+            required = True,
+            help = 'Location of output file')
+    parser.add_argument(
+            '--start',
+            type = str,
+            required = True,
+            help = 'Start token')
+    args = parser.parse_args()
+
+    main(args.gf, args.out, args.start)
diff --git a/custom_mutators/gramatron/preprocess/prep_automaton.sh b/custom_mutators/gramatron/preprocess/prep_automaton.sh
new file mode 100755
index 00000000..28d99fb0
--- /dev/null
+++ b/custom_mutators/gramatron/preprocess/prep_automaton.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+# This script creates a FSA describing the input grammar *.g4
+
+if [ ! "$#" -lt 4 ]; then
+  echo "Usage: ./prep_pda.sh <grammar_file> <start> [stack_limit]"         
+  exit 1
+fi
+
+GRAMMAR_FILE=$1
+GRAMMAR_DIR="$(dirname $GRAMMAR_FILE)"
+START="$2"
+STACK_LIMIT="$3"
+
+# Get filename
+FILE=$(basename -- "$GRAMMAR_FILE")
+echo "File:$FILE"
+FILENAME="${FILE%.*}"
+echo "Name:$FILENAME"
+
+
+# Create the GNF form of the grammar
+CMD="python gnf_converter.py --gf $GRAMMAR_FILE --out ${FILENAME}.json --start $START"
+$CMD
+
+# Generate grammar automaton 
+# Check if user provided a stack limit
+if [ -z "${STACK_LIMIT}" ]; then
+CMD="python3 construct_automata.py --gf ${FILENAME}.json" 
+else
+CMD="python construct_automata.py --gf ${FILENAME}.json --limit ${STACK_LIMIT}" 
+fi
+echo $CMD
+$CMD
+
+# Move PDA to the source dir of the grammar
+echo "Copying ${FILENAME}_automata.json to $GRAMMAR_DIR"
+mv "${FILENAME}_automata.json" $GRAMMAR_DIR/