aboutsummaryrefslogtreecommitdiff
path: root/custom_mutators/gramatron/preprocess
diff options
context:
space:
mode:
Diffstat (limited to 'custom_mutators/gramatron/preprocess')
-rw-r--r--custom_mutators/gramatron/preprocess/construct_automata.py275
-rw-r--r--custom_mutators/gramatron/preprocess/gnf_converter.py289
-rwxr-xr-xcustom_mutators/gramatron/preprocess/prep_automaton.sh38
3 files changed, 602 insertions, 0 deletions
diff --git a/custom_mutators/gramatron/preprocess/construct_automata.py b/custom_mutators/gramatron/preprocess/construct_automata.py
new file mode 100644
index 00000000..b9e84aa8
--- /dev/null
+++ b/custom_mutators/gramatron/preprocess/construct_automata.py
@@ -0,0 +1,275 @@
+import sys
+import json
+import re
+from collections import defaultdict
+# import pygraphviz as pgv
+
+gram_data = None
+state_count = 1
+pda = []
+worklist = []
+state_stacks = {}
+
+# === If user provides upper bound on the stack size during FSA creation ===
+# Specifies the upper bound to which the stack is allowed to grow
+# If for any generated state, the stack size is >= stack_limit then this
+# state is not expanded further.
+stack_limit = None
+# Holds the set of unexpanded rules owing to the user-passed stack constraint limit
+unexpanded_rules = set()
+
+def main(grammar, limit):
+ global worklist, gram_data, stack_limit
+ current = '0'
+ stack_limit = limit
+ if stack_limit:
+ print ('[X] Operating in bounded stack mode')
+
+ with open(grammar, 'r') as fd:
+ gram_data = json.load(fd)
+ start_symbol = gram_data["Start"][0]
+ worklist.append([current, [start_symbol]])
+ # print (grammar)
+ filename = (grammar.split('/')[-1]).split('.')[0]
+
+
+ while worklist:
+ # Take an element from the worklist
+ # print ('================')
+ # print ('Worklist:', worklist)
+ element = worklist.pop(0)
+ prep_transitions(element)
+
+ pda_file = filename + '_transition.json'
+ graph_file = filename + '.png'
+ # print ('XXXXXXXXXXXXXXXX')
+ # print ('PDA file:%s Png graph file:%s' % (pda_file, graph_file))
+ # XXX Commented out because visualization of current version of PHP causes segfault
+ # Create the graph and dump the transitions to a file
+ # create_graph(filename)
+ transformed = postprocess()
+ with open(filename + '_automata.json', 'w+') as fd:
+ json.dump(transformed, fd)
+ with open(filename + '_transition.json', 'w+') as fd:
+ json.dump(pda, fd)
+ if not unexpanded_rules:
+ print ('[X] No unexpanded rules, absolute FSA formed')
+ exit(0)
+ else:
+ print ('[X] Certain rules were not expanded due to stack size limit. Inexact approximation has been created and the disallowed rules have been put in {}_disallowed.json'.format(filename))
+ print ('[X] Number of unexpanded rules:', len(unexpanded_rules))
+ with open(filename + '_disallowed.json', 'w+') as fd:
+ json.dump(list(unexpanded_rules), fd)
+
+def create_graph(filename):
+ '''
+ Creates a DOT representation of the PDA
+ '''
+ global pda
+ G = pgv.AGraph(strict = False, directed = True)
+ for transition in pda:
+ print ('Transition:', transition)
+ G.add_edge(transition['source'], transition['dest'],
+ label = 'Term:{}'.format(transition['terminal']))
+ G.layout(prog = 'dot')
+ print ('Do it up 2')
+ G.draw(filename + '.png')
+
+def prep_transitions(element):
+ '''
+ Generates transitions
+ '''
+ global gram_data, state_count, pda, worklist, state_stacks, stack_limit, unexpanded_rules
+ state = element[0]
+ try:
+ nonterminal = element[1][0]
+ except IndexError:
+ # Final state was encountered, pop from worklist without doing anything
+ return
+ rules = gram_data[nonterminal]
+ count = 1
+ for rule in rules:
+ isRecursive = False
+ # print ('Current state:', state)
+ terminal, ss, termIsRegex = tokenize(rule)
+ transition = get_template()
+ transition['trigger'] = '_'.join([state, str(count)])
+ transition['source'] = state
+ transition['dest'] = str(state_count)
+ transition['ss'] = ss
+ transition['terminal'] = terminal
+ transition['rule'] = "{} -> {}".format(nonterminal, rule )
+ if termIsRegex:
+ transition['termIsRegex'] = True
+
+ # Creating a state stack for the new state
+ try:
+ state_stack = state_stacks[state][:]
+ except:
+ state_stack = []
+ if len(state_stack):
+ state_stack.pop(0)
+ if ss:
+ for symbol in ss[::-1]:
+ state_stack.insert(0, symbol)
+ transition['stack'] = state_stack
+
+ # Check if a recursive transition state being created, if so make a backward
+ # edge and don't add anything to the worklist
+ # print (state_stacks)
+ if state_stacks:
+ for state_element, stack in state_stacks.items():
+ # print ('Stack:', sorted(stack))
+ # print ('State stack:', sorted(state_stack))
+ if sorted(stack) == sorted(state_stack):
+ transition['dest'] = state_element
+ # print ('Recursive:', transition)
+ pda.append(transition)
+ count += 1
+ isRecursive = True
+ break
+ # If a recursive transition exercised don't add the same transition as a new
+ # edge, continue onto the next transitions
+ if isRecursive:
+ continue
+
+ # If the generated state has a stack size > stack_limit then that state is abandoned
+ # and not added to the FSA or the worklist for further expansion
+ if stack_limit:
+ if (len(transition['stack']) > stack_limit):
+ unexpanded_rules.add(transition['rule'])
+ continue
+
+ # Create transitions for the non-recursive relations and add to the worklist
+ # print ('Normal:', transition)
+ # print ('State2:', state)
+ pda.append(transition)
+ worklist.append([transition['dest'], transition['stack']])
+ state_stacks[transition['dest']] = state_stack
+ state_count += 1
+ count += 1
+
+def tokenize(rule):
+ '''
+ Gets the terminal and the corresponding stack symbols from a rule in GNF form
+ '''
+ pattern = re.compile("([r])*\'([\s\S]+)\'([\s\S]*)")
+ terminal = None
+ ss = None
+ termIsRegex = False
+ match = pattern.match(rule)
+ if match.group(1):
+ termIsRegex = True
+ if match.group(2):
+ terminal = match.group(2)
+ else:
+ raise AssertionError("Rule is not in GNF form")
+
+ if match.group(3):
+ ss = (match.group(3)).split()
+
+ return terminal, ss, termIsRegex
+
+def get_template():
+ transition_template = {
+ 'trigger':None,
+ 'source': None,
+ 'dest': None,
+ 'termIsRegex': False,
+ 'terminal' : None,
+ 'stack': []
+ }
+ return transition_template
+
+def postprocess():
+ '''
+ Creates a representation to be passed on to the C-module
+ '''
+ global pda
+ final_struct = {}
+ memoized = defaultdict(list)
+ # Supporting data structures for if stack limit is imposed
+ culled_pda = []
+ culled_final = []
+ num_transitions = 0 # Keep track of number of transitions
+
+
+ states, final, initial = _get_states()
+
+ print (initial)
+ assert len(initial) == 1, 'More than one init state found'
+
+ # Cull transitions to states which were not expanded owing to the stack limit
+ if stack_limit:
+
+ blocklist = []
+ for final_state in final:
+ for transition in pda:
+ if (transition["dest"] == final_state) and (len(transition["stack"]) > 0):
+ blocklist.append(transition["dest"])
+ continue
+ else:
+ culled_pda.append(transition)
+
+ culled_final = [state for state in final if state not in blocklist]
+
+ assert len(culled_final) == 1, 'More than one final state found'
+
+ for transition in culled_pda:
+ state = transition["source"]
+ if transition["dest"] in blocklist:
+ continue
+ num_transitions += 1
+ memoized[state].append([transition["trigger"], transition["dest"],
+ transition["terminal"]])
+ final_struct["init_state"] = initial
+ final_struct["final_state"] = culled_final[0]
+ # The reason we do this is because when states are culled, the indexing is
+ # still relative to the actual number of states hence we keep numstates recorded
+ # as the original number of states
+ print ('[X] Actual Number of states:', len(memoized.keys()))
+ print ('[X] Number of transitions:', num_transitions)
+ print ('[X] Original Number of states:', len(states))
+ final_struct["numstates"] = len(states)
+ final_struct["pda"] = memoized
+ return final_struct
+
+ # Running FSA construction in exact approximation mode and postprocessing it like so
+ for transition in pda:
+ state = transition["source"]
+ memoized[state].append([transition["trigger"], transition["dest"],
+ transition["terminal"]])
+
+ final_struct["init_state"] = initial
+ final_struct["final_state"] = final[0]
+ print ('[X] Actual Number of states:', len(memoized.keys()))
+ final_struct["numstates"] = len(memoized.keys())
+ final_struct["pda"] = memoized
+ return final_struct
+
+
+def _get_states():
+ source = set()
+ dest = set()
+ global pda
+ for transition in pda:
+ source.add(transition["source"])
+ dest.add(transition["dest"])
+ source_copy = source.copy()
+ source_copy.update(dest)
+ return list(source_copy), list(dest.difference(source)), str(''.join(list(source.difference(dest))))
+
+if __name__ == '__main__':
+ import argparse
+ parser = argparse.ArgumentParser(description = 'Script to convert GNF grammar to PDA')
+ parser.add_argument(
+ '--gf',
+ type = str,
+ help = 'Location of GNF grammar')
+ parser.add_argument(
+ '--limit',
+ type = int,
+ default = None,
+ help = 'Specify the upper bound for the stack size')
+ args = parser.parse_args()
+ main(args.gf, args.limit)
diff --git a/custom_mutators/gramatron/preprocess/gnf_converter.py b/custom_mutators/gramatron/preprocess/gnf_converter.py
new file mode 100644
index 00000000..1e7c8b6c
--- /dev/null
+++ b/custom_mutators/gramatron/preprocess/gnf_converter.py
@@ -0,0 +1,289 @@
+import sys
+import re
+import copy
+import json
+from string import ascii_uppercase
+from itertools import combinations
+from collections import defaultdict
+
+NONTERMINALSET = []
+COUNT = 1
+
+def main(grammar_file, out, start):
+ grammar = None
+ # If grammar file is a preprocessed NT file, then skip preprocessing
+ if '.json' in grammar_file:
+ with open(grammar_file, 'r') as fd:
+ grammar = json.load(fd)
+ elif '.g4' in grammar_file:
+ with open(grammar_file, 'r') as fd:
+ data = fd.readlines()
+ grammar = preprocess(data)
+ else:
+ raise('Unknwown file format passed. Accepts (.g4/.json)')
+
+ with open('debug_preprocess.json', 'w+') as fd:
+ json.dump(grammar, fd)
+ grammar = remove_unit(grammar) # eliminates unit productions
+ with open('debug_unit.json', 'w+') as fd:
+ json.dump(grammar, fd)
+ grammar = remove_mixed(grammar) # eliminate terminals existing with non-terminals
+ with open('debug_mixed.json', 'w+') as fd:
+ json.dump(grammar, fd)
+ grammar = break_rules(grammar) # eliminate rules with more than two non-terminals
+ with open('debug_break.json', 'w+') as fd:
+ json.dump(grammar, fd)
+ grammar = gnf(grammar)
+
+ # Dump GNF form of the grammar with only reachable rules
+ # reachable_grammar = get_reachable(grammar, start)
+ # with open('debug_gnf_reachable.json', 'w+') as fd:
+ # json.dump(reachable_grammar, fd)
+ with open('debug_gnf.json', 'w+') as fd:
+ json.dump(grammar, fd)
+
+ grammar["Start"] = [start]
+ with open(out, 'w+') as fd:
+ json.dump(grammar, fd)
+
+def get_reachable(grammar, start):
+ '''
+ Returns a grammar without dead rules
+ '''
+ reachable_nt = set()
+ worklist = list()
+ processed = set()
+ reachable_grammar = dict()
+ worklist.append(start)
+
+ while worklist:
+ nt = worklist.pop(0)
+ processed.add(nt)
+ reachable_grammar[nt] = grammar[nt]
+ rules = grammar[nt]
+ for rule in rules:
+ tokens = gettokens(rule)
+ for token in tokens:
+ if not isTerminal(token):
+ if token not in processed:
+ worklist.append(token)
+ return reachable_grammar
+
+
+def gettokens(rule):
+ pattern = re.compile("([^\s\"\']+)|\"([^\"]*)\"|\'([^\']*)\'")
+ return [matched.group(0) for matched in pattern.finditer(rule)]
+
+def gnf(grammar):
+ old_grammar = copy.deepcopy(grammar)
+ new_grammar = defaultdict(list)
+ isgnf = False
+ while not isgnf:
+ for lhs, rules in old_grammar.items():
+ for rule in rules:
+ tokens = gettokens(rule)
+ if len(tokens) == 1 and isTerminal(rule):
+ new_grammar[lhs].append(rule)
+ continue
+ startoken = tokens[0]
+ endrule = tokens[1:]
+ if not isTerminal(startoken):
+ newrules = []
+ extendrules = old_grammar[startoken]
+ for extension in extendrules:
+ temprule = endrule[:]
+ temprule.insert(0, extension)
+ newrules.append(temprule)
+ for newnew in newrules:
+ new_grammar[lhs].append(' '.join(newnew))
+ else:
+ new_grammar[lhs].append(rule)
+ isgnf = True
+ for lhs, rules in new_grammar.items():
+ for rule in rules:
+ # if "\' \'" or isTerminal(rule):
+ tokens = gettokens(rule)
+ if len(tokens) == 1 and isTerminal(rule):
+ continue
+ startoken = tokens[0]
+ if not isTerminal(startoken):
+ isgnf = False
+ break
+ if not isgnf:
+ old_grammar = copy.deepcopy(new_grammar)
+ new_grammar = defaultdict(list)
+ return new_grammar
+
+
+def preprocess(data):
+ productions = []
+ production = []
+ for line in data:
+ if line != '\n':
+ production.append(line)
+ else:
+ productions.append(production)
+ production = []
+ final_rule_set = {}
+ for production in productions:
+ rules = []
+ init = production[0]
+ nonterminal = init.split(':')[0]
+ rules.append(strip_chars(init.split(':')[1]).strip('| '))
+ for production_rule in production[1:]:
+ rules.append(strip_chars(production_rule.split('|')[0]))
+ final_rule_set[nonterminal] = rules
+ # for line in data:
+ # if line != '\n':
+ # production.append(line)
+ return final_rule_set
+
+def remove_unit(grammar):
+ nounitproductions = False
+ old_grammar = copy.deepcopy(grammar)
+ new_grammar = defaultdict(list)
+ while not nounitproductions:
+ for lhs, rules in old_grammar.items():
+ for rhs in rules:
+ # Checking if the rule is a unit production rule
+ if len(gettokens(rhs)) == 1:
+ if not isTerminal(rhs):
+ new_grammar[lhs].extend([rule for rule in old_grammar[rhs]])
+ else:
+ new_grammar[lhs].append(rhs)
+ else:
+ new_grammar[lhs].append(rhs)
+ # Checking there are no unit productions left in the grammar
+ nounitproductions = True
+ for lhs, rules in new_grammar.items():
+ for rhs in rules:
+ if len(gettokens(rhs)) == 1:
+ if not isTerminal(rhs):
+ nounitproductions = False
+ break
+ if not nounitproductions:
+ break
+ # Unit productions are still there in the grammar -- repeat the process
+ if not nounitproductions:
+ old_grammar = copy.deepcopy(new_grammar)
+ new_grammar = defaultdict(list)
+ return new_grammar
+
+def isTerminal(rule):
+ # pattern = re.compile("([r]*\'[\s\S]+\')")
+ pattern = re.compile("\'(.*?)\'")
+ match = pattern.match(rule)
+ if match:
+ return True
+ else:
+ return False
+
+def remove_mixed(grammar):
+ '''
+ Remove rules where there are terminals mixed in with non-terminals
+ '''
+ new_grammar = defaultdict(list)
+ for lhs, rules in grammar.items():
+ for rhs in rules:
+ # tokens = rhs.split(' ')
+ regen_rule = []
+ tokens = gettokens(rhs)
+ if len(gettokens(rhs)) == 1:
+ new_grammar[lhs].append(rhs)
+ continue
+ for token in tokens:
+ # Identify if there is a terminal in the RHS
+ if isTerminal(token):
+ # Check if a corresponding nonterminal already exists
+ nonterminal = terminal_exist(token, new_grammar)
+ if nonterminal:
+ regen_rule.append(nonterminal)
+ else:
+ new_nonterm = get_nonterminal()
+ new_grammar[new_nonterm].append(token)
+ regen_rule.append(new_nonterm)
+ else:
+ regen_rule.append(token)
+ new_grammar[lhs].append(' '.join(regen_rule))
+ return new_grammar
+
+def break_rules(grammar):
+ new_grammar = defaultdict(list)
+ old_grammar = copy.deepcopy(grammar)
+ nomulti = False
+ while not nomulti:
+ for lhs, rules in old_grammar.items():
+ for rhs in rules:
+ tokens = gettokens(rhs)
+ if len(tokens) > 2 and (not isTerminal(rhs)):
+ split = tokens[:-1]
+ nonterminal = terminal_exist(' '.join(split), new_grammar)
+ if nonterminal:
+ newrule = ' '.join([nonterminal, tokens[-1]])
+ new_grammar[lhs].append(newrule)
+ else:
+ nonterminal = get_nonterminal()
+ new_grammar[nonterminal].append(' '.join(split))
+ newrule = ' '.join([nonterminal, tokens[-1]])
+ new_grammar[lhs].append(newrule)
+ else:
+ new_grammar[lhs].append(rhs)
+ nomulti = True
+ for lhs, rules in new_grammar.items():
+ for rhs in rules:
+ # tokens = rhs.split(' ')
+ tokens = gettokens(rhs)
+ if len(tokens) > 2 and (not isTerminal(rhs)):
+ nomulti = False
+ break
+ if not nomulti:
+ old_grammar = copy.deepcopy(new_grammar)
+ new_grammar = defaultdict(list)
+ return new_grammar
+
+def strip_chars(rule):
+ return rule.strip('\n\t ')
+
+def get_nonterminal():
+ global NONTERMINALSET
+ if NONTERMINALSET:
+ return NONTERMINALSET.pop(0)
+ else:
+ _repopulate()
+ return NONTERMINALSET.pop(0)
+
+def _repopulate():
+ global COUNT
+ global NONTERMINALSET
+ NONTERMINALSET = [''.join(x) for x in list(combinations(ascii_uppercase, COUNT))]
+ COUNT += 1
+
+def terminal_exist(token, grammar):
+ for nonterminal, rules in grammar.items():
+ if token in rules:
+ return nonterminal
+ return None
+
+
+
+if __name__ == '__main__':
+ import argparse
+ parser = argparse.ArgumentParser(description = 'Script to convert grammar to GNF form')
+ parser.add_argument(
+ '--gf',
+ type = str,
+ required = True,
+ help = 'Location of grammar file')
+ parser.add_argument(
+ '--out',
+ type = str,
+ required = True,
+ help = 'Location of output file')
+ parser.add_argument(
+ '--start',
+ type = str,
+ required = True,
+ help = 'Start token')
+ args = parser.parse_args()
+
+ main(args.gf, args.out, args.start)
diff --git a/custom_mutators/gramatron/preprocess/prep_automaton.sh b/custom_mutators/gramatron/preprocess/prep_automaton.sh
new file mode 100755
index 00000000..28d99fb0
--- /dev/null
+++ b/custom_mutators/gramatron/preprocess/prep_automaton.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+# This script creates a FSA describing the input grammar *.g4
+
+if [ ! "$#" -lt 4 ]; then
+ echo "Usage: ./prep_pda.sh <grammar_file> <start> [stack_limit]"
+ exit 1
+fi
+
+GRAMMAR_FILE=$1
+GRAMMAR_DIR="$(dirname $GRAMMAR_FILE)"
+START="$2"
+STACK_LIMIT="$3"
+
+# Get filename
+FILE=$(basename -- "$GRAMMAR_FILE")
+echo "File:$FILE"
+FILENAME="${FILE%.*}"
+echo "Name:$FILENAME"
+
+
+# Create the GNF form of the grammar
+CMD="python gnf_converter.py --gf $GRAMMAR_FILE --out ${FILENAME}.json --start $START"
+$CMD
+
+# Generate grammar automaton
+# Check if user provided a stack limit
+if [ -z "${STACK_LIMIT}" ]; then
+CMD="python3 construct_automata.py --gf ${FILENAME}.json"
+else
+CMD="python construct_automata.py --gf ${FILENAME}.json --limit ${STACK_LIMIT}"
+fi
+echo $CMD
+$CMD
+
+# Move PDA to the source dir of the grammar
+echo "Copying ${FILENAME}_automata.json to $GRAMMAR_DIR"
+mv "${FILENAME}_automata.json" $GRAMMAR_DIR/