| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import re |
| |
| from cqlshlib.saferscanner import SaferScanner |
| |
| |
| class LexingError(Exception): |
| |
| @classmethod |
| def from_text(cls, rulestr, unmatched, msg='Lexing error'): |
| bad_char = len(rulestr) - len(unmatched) |
| linenum = rulestr[:bad_char].count('\n') + 1 |
| charnum = len(rulestr[:bad_char].rsplit('\n', 1)[-1]) + 1 |
| snippet_start = max(0, min(len(rulestr), bad_char - 10)) |
| snippet_end = max(0, min(len(rulestr), bad_char + 10)) |
| msg += " (Error at: '...%s...')" % (rulestr[snippet_start:snippet_end],) |
| raise cls(linenum, charnum, msg) |
| |
| def __init__(self, linenum, charnum, msg='Lexing error'): |
| self.linenum = linenum |
| self.charnum = charnum |
| self.msg = msg |
| self.args = (linenum, charnum, msg) |
| |
| def __str__(self): |
| return '%s at line %d, char %d' % (self.msg, self.linenum, self.charnum) |
| |
| |
| class Hint: |
| |
| def __init__(self, text): |
| self.text = text |
| |
| def __hash__(self): |
| return hash((id(self.__class__), self.text)) |
| |
| def __eq__(self, other): |
| return isinstance(other, self.__class__) and other.text == self.text |
| |
| def __repr__(self): |
| return '%s(%r)' % (self.__class__, self.text) |
| |
| |
| def is_hint(x): |
| return isinstance(x, Hint) |
| |
| |
| class ParseContext: |
| """ |
| These are meant to be immutable, although it would be something of a |
| pain to enforce that in python. |
| """ |
| |
| def __init__(self, ruleset, bindings, matched, remainder, productionname): |
| self.ruleset = ruleset |
| self.bindings = bindings |
| self.matched = matched |
| self.remainder = remainder |
| self.productionname = productionname |
| |
| def get_production_by_name(self, name): |
| return self.ruleset[name] |
| |
| def get_completer(self, symname): |
| return self.ruleset[(self.productionname, symname)] |
| |
| def get_binding(self, name, default=None): |
| return self.bindings.get(name, default) |
| |
| def with_binding(self, name, val): |
| newbinds = self.bindings.copy() |
| newbinds[name] = val |
| return self.__class__(self.ruleset, newbinds, self.matched, |
| self.remainder, self.productionname) |
| |
| def with_match(self, num): |
| return self.__class__(self.ruleset, self.bindings, |
| self.matched + self.remainder[:num], |
| self.remainder[num:], self.productionname) |
| |
| def with_production_named(self, newname): |
| return self.__class__(self.ruleset, self.bindings, self.matched, |
| self.remainder, newname) |
| |
| def extract_orig(self, tokens=None): |
| if tokens is None: |
| tokens = self.matched |
| if not tokens: |
| return '' |
| orig = self.bindings.get('*SRC*', None) |
| if orig is None: |
| # pretty much just guess |
| return ' '.join([t[1] for t in tokens]) |
| # low end of span for first token, to high end of span for last token |
| orig_text = orig[tokens[0][2][0]:tokens[-1][2][1]] |
| return orig_text |
| |
| def __repr__(self): |
| return '<%s matched=%r remainder=%r prodname=%r bindings=%r>' \ |
| % (self.__class__.__name__, self.matched, self.remainder, self.productionname, self.bindings) |
| |
| |
| class matcher: |
| |
| def __init__(self, arg): |
| self.arg = arg |
| |
| def match(self, ctxt, completions): |
| raise NotImplementedError |
| |
| def match_with_results(self, ctxt, completions): |
| matched_before = len(ctxt.matched) |
| newctxts = self.match(ctxt, completions) |
| return [(newctxt, newctxt.matched[matched_before:]) for newctxt in newctxts] |
| |
| @staticmethod |
| def try_registered_completion(ctxt, symname, completions): |
| debugging = ctxt.get_binding('*DEBUG*', False) |
| if ctxt.remainder or completions is None: |
| return False |
| try: |
| completer = ctxt.get_completer(symname) |
| except KeyError: |
| return False |
| if debugging: |
| print("Trying completer %r with %r" % (completer, ctxt)) |
| try: |
| new_compls = completer(ctxt) |
| except Exception: |
| if debugging: |
| import traceback |
| traceback.print_exc() |
| return False |
| if debugging: |
| print("got %r" % (new_compls,)) |
| completions.update(new_compls) |
| return True |
| |
| def __repr__(self): |
| return '%s(%r)' % (self.__class__.__name__, self.arg) |
| |
| |
| class choice(matcher): |
| |
| def match(self, ctxt, completions): |
| foundctxts = [] |
| for a in self.arg: |
| subctxts = a.match(ctxt, completions) |
| foundctxts.extend(subctxts) |
| return foundctxts |
| |
| |
| class one_or_none(matcher): |
| |
| def match(self, ctxt, completions): |
| return [ctxt] + list(self.arg.match(ctxt, completions)) |
| |
| |
| class repeat(matcher): |
| |
| def match(self, ctxt, completions): |
| found = [ctxt] |
| ctxts = [ctxt] |
| while True: |
| new_ctxts = [] |
| for c in ctxts: |
| new_ctxts.extend(self.arg.match(c, completions)) |
| if not new_ctxts: |
| return found |
| found.extend(new_ctxts) |
| ctxts = new_ctxts |
| |
| |
| class rule_reference(matcher): |
| |
| def match(self, ctxt, completions): |
| prevname = ctxt.productionname |
| try: |
| rule = ctxt.get_production_by_name(self.arg) |
| except KeyError: |
| raise ValueError("Can't look up production rule named %r" % (self.arg,)) |
| output = rule.match(ctxt.with_production_named(self.arg), completions) |
| return [c.with_production_named(prevname) for c in output] |
| |
| |
| class rule_series(matcher): |
| |
| def match(self, ctxt, completions): |
| ctxts = [ctxt] |
| for patpiece in self.arg: |
| new_ctxts = [] |
| for c in ctxts: |
| new_ctxts.extend(patpiece.match(c, completions)) |
| if not new_ctxts: |
| return () |
| ctxts = new_ctxts |
| return ctxts |
| |
| |
| class named_symbol(matcher): |
| |
| def __init__(self, name, arg): |
| matcher.__init__(self, arg) |
| self.name = name |
| |
| def match(self, ctxt, completions): |
| pass_in_compls = completions |
| if self.try_registered_completion(ctxt, self.name, completions): |
| # don't collect other completions under this; use a dummy |
| pass_in_compls = set() |
| results = self.arg.match_with_results(ctxt, pass_in_compls) |
| return [c.with_binding(self.name, ctxt.extract_orig(matchtoks)) for (c, matchtoks) in results] |
| |
| def __repr__(self): |
| return '%s(%r, %r)' % (self.__class__.__name__, self.name, self.arg) |
| |
| |
| class named_collector(named_symbol): |
| |
| def match(self, ctxt, completions): |
| pass_in_compls = completions |
| if self.try_registered_completion(ctxt, self.name, completions): |
| # don't collect other completions under this; use a dummy |
| pass_in_compls = set() |
| output = [] |
| for ctxt, matchtoks in self.arg.match_with_results(ctxt, pass_in_compls): |
| oldval = ctxt.get_binding(self.name, ()) |
| output.append(ctxt.with_binding(self.name, oldval + (ctxt.extract_orig(matchtoks),))) |
| return output |
| |
| |
| class terminal_matcher(matcher): |
| |
| def pattern(self): |
| raise NotImplementedError |
| |
| |
| class regex_rule(terminal_matcher): |
| |
| def __init__(self, pat): |
| terminal_matcher.__init__(self, pat) |
| self.regex = pat |
| self.re = re.compile(pat + '$', re.I | re.S) |
| |
| def match(self, ctxt, completions): |
| if ctxt.remainder: |
| if self.re.match(ctxt.remainder[0][1]): |
| return [ctxt.with_match(1)] |
| elif completions is not None: |
| completions.add(Hint('<%s>' % ctxt.productionname)) |
| return [] |
| |
| def pattern(self): |
| return self.regex |
| |
| |
| class text_match(terminal_matcher): |
| alpha_re = re.compile(r'[a-zA-Z]') |
| |
| def __init__(self, text): |
| try: |
| terminal_matcher.__init__(self, eval(text)) |
| except SyntaxError: |
| print("bad syntax %r" % (text,)) |
| |
| def match(self, ctxt, completions): |
| if ctxt.remainder: |
| if self.arg.lower() == ctxt.remainder[0][1].lower(): |
| return [ctxt.with_match(1)] |
| elif completions is not None: |
| completions.add(self.arg) |
| return [] |
| |
| def pattern(self): |
| # can't use (?i) here- Scanner component regex flags won't be applied |
| def ignorecaseify(matchobj): |
| c = matchobj.group(0) |
| return '[%s%s]' % (c.upper(), c.lower()) |
| return self.alpha_re.sub(ignorecaseify, re.escape(self.arg)) |
| |
| |
| class case_match(text_match): |
| |
| def match(self, ctxt, completions): |
| if ctxt.remainder: |
| if self.arg == ctxt.remainder[0][1]: |
| return [ctxt.with_match(1)] |
| elif completions is not None: |
| completions.add(self.arg) |
| return [] |
| |
| def pattern(self): |
| return re.escape(self.arg) |
| |
| |
| class word_match(text_match): |
| |
| def pattern(self): |
| return r'\b' + text_match.pattern(self) + r'\b' |
| |
| |
| class case_word_match(case_match): |
| |
| def pattern(self): |
| return r'\b' + case_match.pattern(self) + r'\b' |
| |
| |
| class terminal_type_matcher(matcher): |
| |
| def __init__(self, tokentype, submatcher): |
| matcher.__init__(self, tokentype) |
| self.tokentype = tokentype |
| self.submatcher = submatcher |
| |
| def match(self, ctxt, completions): |
| if ctxt.remainder: |
| if ctxt.remainder[0][0] == self.tokentype: |
| return [ctxt.with_match(1)] |
| elif completions is not None: |
| self.submatcher.match(ctxt, completions) |
| return [] |
| |
| def __repr__(self): |
| return '%s(%r, %r)' % (self.__class__.__name__, self.tokentype, self.submatcher) |
| |
| |
| class ParsingRuleSet: |
| RuleSpecScanner = SaferScanner([ |
| (r'::=', lambda s, t: t), |
| (r'\[[a-z0-9_]+\]=', lambda s, t: ('named_collector', t[1:-2])), |
| (r'[a-z0-9_]+=', lambda s, t: ('named_symbol', t[:-1])), |
| (r'/(\[\^?.[^]]*\]|[^/]|\\.)*/', lambda s, t: ('regex', t[1:-1].replace(r'\/', '/'))), |
| (r'"([^"]|\\.)*"', lambda s, t: ('litstring', t)), |
| (r'<[^>]*>', lambda s, t: ('reference', t[1:-1])), |
| (r'\bJUNK\b', lambda s, t: ('junk', t)), |
| (r'[@()|?*;]', lambda s, t: t), |
| (r'\s+', None), |
| (r'#[^\n]*', None), |
| ], re.I | re.S | re.U) |
| |
| def __init__(self): |
| self.ruleset = {} |
| self.scanner = None |
| self.terminals = [] |
| |
| @classmethod |
| def from_rule_defs(cls, rule_defs): |
| prs = cls() |
| prs.ruleset, prs.terminals = cls.parse_rules(rule_defs) |
| return prs |
| |
| @classmethod |
| def parse_rules(cls, rulestr): |
| tokens, unmatched = cls.RuleSpecScanner.scan(rulestr) |
| if unmatched: |
| raise LexingError.from_text(rulestr, unmatched, msg="Syntax rules unparseable") |
| rules = {} |
| terminals = [] |
| tokeniter = iter(tokens) |
| for t in tokeniter: |
| if isinstance(t, tuple) and t[0] in ('reference', 'junk'): |
| assign = next(tokeniter) |
| if assign != '::=': |
| raise ValueError('Unexpected token %r; expected "::="' % (assign,)) |
| name = t[1] |
| production = cls.read_rule_tokens_until(';', tokeniter) |
| if isinstance(production, terminal_matcher): |
| terminals.append((name, production)) |
| production = terminal_type_matcher(name, production) |
| rules[name] = production |
| else: |
| raise ValueError('Unexpected token %r; expected name' % (t,)) |
| return rules, terminals |
| |
| @staticmethod |
| def mkrule(pieces): |
| if isinstance(pieces, (tuple, list)): |
| if len(pieces) == 1: |
| return pieces[0] |
| return rule_series(pieces) |
| return pieces |
| |
| @classmethod |
| def read_rule_tokens_until(cls, endtoks, tokeniter): |
| if isinstance(endtoks, str): |
| endtoks = (endtoks,) |
| counttarget = None |
| if isinstance(endtoks, int): |
| counttarget = endtoks |
| endtoks = () |
| countsofar = 0 |
| myrules = [] |
| mybranches = [myrules] |
| for t in tokeniter: |
| countsofar += 1 |
| if t in endtoks: |
| if len(mybranches) == 1: |
| return cls.mkrule(mybranches[0]) |
| return choice(list(map(cls.mkrule, mybranches))) |
| if isinstance(t, tuple): |
| if t[0] == 'reference': |
| t = rule_reference(t[1]) |
| elif t[0] == 'litstring': |
| if t[1][1].isalnum() or t[1][1] == '_': |
| t = word_match(t[1]) |
| else: |
| t = text_match(t[1]) |
| elif t[0] == 'regex': |
| t = regex_rule(t[1]) |
| elif t[0] == 'named_collector': |
| t = named_collector(t[1], cls.read_rule_tokens_until(1, tokeniter)) |
| elif t[0] == 'named_symbol': |
| t = named_symbol(t[1], cls.read_rule_tokens_until(1, tokeniter)) |
| elif t == '(': |
| t = cls.read_rule_tokens_until(')', tokeniter) |
| elif t == '?': |
| t = one_or_none(myrules.pop(-1)) |
| elif t == '*': |
| t = repeat(myrules.pop(-1)) |
| elif t == '@': |
| x = next(tokeniter) |
| if not isinstance(x, tuple) or x[0] != 'litstring': |
| raise ValueError("Unexpected token %r following '@'" % (x,)) |
| t = case_match(x[1]) |
| elif t == '|': |
| myrules = [] |
| mybranches.append(myrules) |
| continue |
| else: |
| raise ValueError('Unparseable rule token %r after %r' % (t, myrules[-1])) |
| myrules.append(t) |
| if countsofar == counttarget: |
| if len(mybranches) == 1: |
| return cls.mkrule(mybranches[0]) |
| return choice(list(map(cls.mkrule, mybranches))) |
| raise ValueError('Unexpected end of rule tokens') |
| |
| def append_rules(self, rulestr): |
| rules, terminals = self.parse_rules(rulestr) |
| self.ruleset.update(rules) |
| self.terminals.extend(terminals) |
| if terminals: |
| self.scanner = None # recreate it if/when necessary |
| |
| def register_completer(self, func, rulename, symname): |
| self.ruleset[(rulename, symname)] = func |
| |
| def make_lexer(self): |
| def make_handler(name): |
| if name == 'JUNK': |
| return None |
| return lambda s, t: (name, t, s.match.span()) |
| regexes = [(p.pattern(), make_handler(name)) for (name, p) in self.terminals] |
| return SaferScanner(regexes, re.I | re.S | re.U).scan |
| |
| def lex(self, text): |
| if self.scanner is None: |
| self.scanner = self.make_lexer() |
| tokens, unmatched = self.scanner(text) |
| if unmatched: |
| raise LexingError.from_text(text, unmatched, 'text could not be lexed') |
| return tokens |
| |
| def parse(self, startsymbol, tokens, init_bindings=None): |
| if init_bindings is None: |
| init_bindings = {} |
| ctxt = ParseContext(self.ruleset, init_bindings, (), tuple(tokens), startsymbol) |
| pattern = self.ruleset[startsymbol] |
| return pattern.match(ctxt, None) |
| |
| def whole_match(self, startsymbol, tokens, srcstr=None): |
| bindings = {} |
| if srcstr is not None: |
| bindings['*SRC*'] = srcstr |
| for c in self.parse(startsymbol, tokens, init_bindings=bindings): |
| if not c.remainder: |
| return c |
| |
| def lex_and_parse(self, text, startsymbol='Start'): |
| return self.parse(startsymbol, self.lex(text), init_bindings={'*SRC*': text}) |
| |
| def lex_and_whole_match(self, text, startsymbol='Start'): |
| tokens = self.lex(text) |
| return self.whole_match(startsymbol, tokens, srcstr=text) |
| |
| def complete(self, startsymbol, tokens, init_bindings=None): |
| if init_bindings is None: |
| init_bindings = {} |
| ctxt = ParseContext(self.ruleset, init_bindings, (), tuple(tokens), startsymbol) |
| pattern = self.ruleset[startsymbol] |
| if init_bindings.get('*DEBUG*', False): |
| completions = Debugotron(stream=sys.stderr) |
| else: |
| completions = set() |
| pattern.match(ctxt, completions) |
| return completions |
| |
| |
| import sys |
| |
| |
| class Debugotron(set): |
| depth = 10 |
| |
| def __init__(self, initializer=(), stream=sys.stdout): |
| set.__init__(self, initializer) |
| self.stream = stream |
| |
| def add(self, item): |
| self._note_addition(item) |
| set.add(self, item) |
| |
| def _note_addition(self, foo): |
| self.stream.write("\nitem %r added by:\n" % (foo,)) |
| frame = sys._getframe().f_back.f_back |
| for i in range(self.depth): |
| name = frame.f_code.co_name |
| filename = frame.f_code.co_filename |
| lineno = frame.f_lineno |
| if 'self' in frame.f_locals: |
| clsobj = frame.f_locals['self'] |
| line = '%s.%s() (%s:%d)' % (clsobj, name, filename, lineno) |
| else: |
| line = '%s (%s:%d)' % (name, filename, lineno) |
| self.stream.write(' - %s\n' % (line,)) |
| if i == 0 and 'ctxt' in frame.f_locals: |
| self.stream.write(' - %s\n' % (frame.f_locals['ctxt'],)) |
| frame = frame.f_back |
| |
| def update(self, items): |
| if items: |
| self._note_addition(items) |
| set.update(self, items) |