# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from cqlshlib.saferscanner import SaferScanner
class LexingError(Exception):
def from_text(cls, rulestr, unmatched, msg='Lexing error'):
bad_char = len(rulestr) - len(unmatched)
linenum = rulestr[:bad_char].count('\n') + 1
charnum = len(rulestr[:bad_char].rsplit('\n', 1)[-1]) + 1
snippet_start = max(0, min(len(rulestr), bad_char - 10))
snippet_end = max(0, min(len(rulestr), bad_char + 10))
msg += " (Error at: '...%s...')" % (rulestr[snippet_start:snippet_end],)
raise cls(linenum, charnum, msg)
def __init__(self, linenum, charnum, msg='Lexing error'):
self.linenum = linenum
self.charnum = charnum
self.msg = msg
self.args = (linenum, charnum, msg)
def __str__(self):
return '%s at line %d, char %d' % (self.msg, self.linenum, self.charnum)
class Hint:
def __init__(self, text):
self.text = text
def __hash__(self):
return hash((id(self.__class__), self.text))
def __eq__(self, other):
return isinstance(other, self.__class__) and other.text == self.text
def __repr__(self):
return '%s(%r)' % (self.__class__, self.text)
def is_hint(x):
return isinstance(x, Hint)
class ParseContext:
These are meant to be immutable, although it would be something of a
pain to enforce that in python.
def __init__(self, ruleset, bindings, matched, remainder, productionname):
self.ruleset = ruleset
self.bindings = bindings
self.matched = matched
self.remainder = remainder
self.productionname = productionname
def get_production_by_name(self, name):
return self.ruleset[name]
def get_completer(self, symname):
return self.ruleset[(self.productionname, symname)]
def get_binding(self, name, default=None):
return self.bindings.get(name, default)
def with_binding(self, name, val):
newbinds = self.bindings.copy()
newbinds[name] = val
return self.__class__(self.ruleset, newbinds, self.matched,
self.remainder, self.productionname)
def with_match(self, num):
return self.__class__(self.ruleset, self.bindings,
self.matched + self.remainder[:num],
self.remainder[num:], self.productionname)
def with_production_named(self, newname):
return self.__class__(self.ruleset, self.bindings, self.matched,
self.remainder, newname)
def extract_orig(self, tokens=None):
if tokens is None:
tokens = self.matched
if not tokens:
return ''
orig = self.bindings.get('*SRC*', None)
if orig is None:
# pretty much just guess
return ' '.join([t[1] for t in tokens])
# low end of span for first token, to high end of span for last token
orig_text = orig[tokens[0][2][0]:tokens[-1][2][1]]
return orig_text
def __repr__(self):
return '<%s matched=%r remainder=%r prodname=%r bindings=%r>' \
% (self.__class__.__name__, self.matched, self.remainder, self.productionname, self.bindings)
class matcher:
def __init__(self, arg):
self.arg = arg
def match(self, ctxt, completions):
raise NotImplementedError
def match_with_results(self, ctxt, completions):
matched_before = len(ctxt.matched)
newctxts = self.match(ctxt, completions)
return [(newctxt, newctxt.matched[matched_before:]) for newctxt in newctxts]
def try_registered_completion(ctxt, symname, completions):
debugging = ctxt.get_binding('*DEBUG*', False)
if ctxt.remainder or completions is None:
return False
completer = ctxt.get_completer(symname)
except KeyError:
return False
if debugging:
print("Trying completer %r with %r" % (completer, ctxt))
new_compls = completer(ctxt)
except Exception:
if debugging:
import traceback
return False
if debugging:
print("got %r" % (new_compls,))
return True
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.arg)
class choice(matcher):
def match(self, ctxt, completions):
foundctxts = []
for a in self.arg:
subctxts = a.match(ctxt, completions)
return foundctxts
class one_or_none(matcher):
def match(self, ctxt, completions):
return [ctxt] + list(self.arg.match(ctxt, completions))
class repeat(matcher):
def match(self, ctxt, completions):
found = [ctxt]
ctxts = [ctxt]
while True:
new_ctxts = []
for c in ctxts:
new_ctxts.extend(self.arg.match(c, completions))
if not new_ctxts:
return found
ctxts = new_ctxts
class rule_reference(matcher):
def match(self, ctxt, completions):
prevname = ctxt.productionname
rule = ctxt.get_production_by_name(self.arg)
except KeyError:
raise ValueError("Can't look up production rule named %r" % (self.arg,))
output = rule.match(ctxt.with_production_named(self.arg), completions)
return [c.with_production_named(prevname) for c in output]
class rule_series(matcher):
def match(self, ctxt, completions):
ctxts = [ctxt]
for patpiece in self.arg:
new_ctxts = []
for c in ctxts:
new_ctxts.extend(patpiece.match(c, completions))
if not new_ctxts:
return ()
ctxts = new_ctxts
return ctxts
class named_symbol(matcher):
def __init__(self, name, arg):
matcher.__init__(self, arg) = name
def match(self, ctxt, completions):
pass_in_compls = completions
if self.try_registered_completion(ctxt,, completions):
# don't collect other completions under this; use a dummy
pass_in_compls = set()
results = self.arg.match_with_results(ctxt, pass_in_compls)
return [c.with_binding(, ctxt.extract_orig(matchtoks)) for (c, matchtoks) in results]
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__,, self.arg)
class named_collector(named_symbol):
def match(self, ctxt, completions):
pass_in_compls = completions
if self.try_registered_completion(ctxt,, completions):
# don't collect other completions under this; use a dummy
pass_in_compls = set()
output = []
for ctxt, matchtoks in self.arg.match_with_results(ctxt, pass_in_compls):
oldval = ctxt.get_binding(, ())
output.append(ctxt.with_binding(, oldval + (ctxt.extract_orig(matchtoks),)))
return output
class terminal_matcher(matcher):
def pattern(self):
raise NotImplementedError
class regex_rule(terminal_matcher):
def __init__(self, pat):
terminal_matcher.__init__(self, pat)
self.regex = pat = re.compile(pat + '$', re.I | re.S)
def match(self, ctxt, completions):
if ctxt.remainder:
return [ctxt.with_match(1)]
elif completions is not None:
completions.add(Hint('<%s>' % ctxt.productionname))
return []
def pattern(self):
return self.regex
class text_match(terminal_matcher):
alpha_re = re.compile(r'[a-zA-Z]')
def __init__(self, text):
terminal_matcher.__init__(self, eval(text))
except SyntaxError:
print("bad syntax %r" % (text,))
def match(self, ctxt, completions):
if ctxt.remainder:
if self.arg.lower() == ctxt.remainder[0][1].lower():
return [ctxt.with_match(1)]
elif completions is not None:
return []
def pattern(self):
# can't use (?i) here- Scanner component regex flags won't be applied
def ignorecaseify(matchobj):
c =
return '[%s%s]' % (c.upper(), c.lower())
return self.alpha_re.sub(ignorecaseify, re.escape(self.arg))
class case_match(text_match):
def match(self, ctxt, completions):
if ctxt.remainder:
if self.arg == ctxt.remainder[0][1]:
return [ctxt.with_match(1)]
elif completions is not None:
return []
def pattern(self):
return re.escape(self.arg)
class word_match(text_match):
def pattern(self):
return r'\b' + text_match.pattern(self) + r'\b'
class case_word_match(case_match):
def pattern(self):
return r'\b' + case_match.pattern(self) + r'\b'
class terminal_type_matcher(matcher):
def __init__(self, tokentype, submatcher):
matcher.__init__(self, tokentype)
self.tokentype = tokentype
self.submatcher = submatcher
def match(self, ctxt, completions):
if ctxt.remainder:
if ctxt.remainder[0][0] == self.tokentype:
return [ctxt.with_match(1)]
elif completions is not None:
self.submatcher.match(ctxt, completions)
return []
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.tokentype, self.submatcher)
class ParsingRuleSet:
RuleSpecScanner = SaferScanner([
(r'::=', lambda s, t: t),
(r'\[[a-z0-9_]+\]=', lambda s, t: ('named_collector', t[1:-2])),
(r'[a-z0-9_]+=', lambda s, t: ('named_symbol', t[:-1])),
(r'/(\[\^?.[^]]*\]|[^/]|\\.)*/', lambda s, t: ('regex', t[1:-1].replace(r'\/', '/'))),
(r'"([^"]|\\.)*"', lambda s, t: ('litstring', t)),
(r'<[^>]*>', lambda s, t: ('reference', t[1:-1])),
(r'\bJUNK\b', lambda s, t: ('junk', t)),
(r'[@()|?*;]', lambda s, t: t),
(r'\s+', None),
(r'#[^\n]*', None),
], re.I | re.S | re.U)
def __init__(self):
self.ruleset = {}
self.scanner = None
self.terminals = []
def from_rule_defs(cls, rule_defs):
prs = cls()
prs.ruleset, prs.terminals = cls.parse_rules(rule_defs)
return prs
def parse_rules(cls, rulestr):
tokens, unmatched = cls.RuleSpecScanner.scan(rulestr)
if unmatched:
raise LexingError.from_text(rulestr, unmatched, msg="Syntax rules unparseable")
rules = {}
terminals = []
tokeniter = iter(tokens)
for t in tokeniter:
if isinstance(t, tuple) and t[0] in ('reference', 'junk'):
assign = next(tokeniter)
if assign != '::=':
raise ValueError('Unexpected token %r; expected "::="' % (assign,))
name = t[1]
production = cls.read_rule_tokens_until(';', tokeniter)
if isinstance(production, terminal_matcher):
terminals.append((name, production))
production = terminal_type_matcher(name, production)
rules[name] = production
raise ValueError('Unexpected token %r; expected name' % (t,))
return rules, terminals
def mkrule(pieces):
if isinstance(pieces, (tuple, list)):
if len(pieces) == 1:
return pieces[0]
return rule_series(pieces)
return pieces
def read_rule_tokens_until(cls, endtoks, tokeniter):
if isinstance(endtoks, str):
endtoks = (endtoks,)
counttarget = None
if isinstance(endtoks, int):
counttarget = endtoks
endtoks = ()
countsofar = 0
myrules = []
mybranches = [myrules]
for t in tokeniter:
countsofar += 1
if t in endtoks:
if len(mybranches) == 1:
return cls.mkrule(mybranches[0])
return choice(list(map(cls.mkrule, mybranches)))
if isinstance(t, tuple):
if t[0] == 'reference':
t = rule_reference(t[1])
elif t[0] == 'litstring':
if t[1][1].isalnum() or t[1][1] == '_':
t = word_match(t[1])
t = text_match(t[1])
elif t[0] == 'regex':
t = regex_rule(t[1])
elif t[0] == 'named_collector':
t = named_collector(t[1], cls.read_rule_tokens_until(1, tokeniter))
elif t[0] == 'named_symbol':
t = named_symbol(t[1], cls.read_rule_tokens_until(1, tokeniter))
elif t == '(':
t = cls.read_rule_tokens_until(')', tokeniter)
elif t == '?':
t = one_or_none(myrules.pop(-1))
elif t == '*':
t = repeat(myrules.pop(-1))
elif t == '@':
x = next(tokeniter)
if not isinstance(x, tuple) or x[0] != 'litstring':
raise ValueError("Unexpected token %r following '@'" % (x,))
t = case_match(x[1])
elif t == '|':
myrules = []
raise ValueError('Unparseable rule token %r after %r' % (t, myrules[-1]))
if countsofar == counttarget:
if len(mybranches) == 1:
return cls.mkrule(mybranches[0])
return choice(list(map(cls.mkrule, mybranches)))
raise ValueError('Unexpected end of rule tokens')
def append_rules(self, rulestr):
rules, terminals = self.parse_rules(rulestr)
if terminals:
self.scanner = None # recreate it if/when necessary
def register_completer(self, func, rulename, symname):
self.ruleset[(rulename, symname)] = func
def make_lexer(self):
def make_handler(name):
if name == 'JUNK':
return None
return lambda s, t: (name, t, s.match.span())
regexes = [(p.pattern(), make_handler(name)) for (name, p) in self.terminals]
return SaferScanner(regexes, re.I | re.S | re.U).scan
def lex(self, text):
if self.scanner is None:
self.scanner = self.make_lexer()
tokens, unmatched = self.scanner(text)
if unmatched:
raise LexingError.from_text(text, unmatched, 'text could not be lexed')
return tokens
def parse(self, startsymbol, tokens, init_bindings=None):
if init_bindings is None:
init_bindings = {}
ctxt = ParseContext(self.ruleset, init_bindings, (), tuple(tokens), startsymbol)
pattern = self.ruleset[startsymbol]
return pattern.match(ctxt, None)
def whole_match(self, startsymbol, tokens, srcstr=None):
bindings = {}
if srcstr is not None:
bindings['*SRC*'] = srcstr
for c in self.parse(startsymbol, tokens, init_bindings=bindings):
if not c.remainder:
return c
def lex_and_parse(self, text, startsymbol='Start'):
return self.parse(startsymbol, self.lex(text), init_bindings={'*SRC*': text})
def lex_and_whole_match(self, text, startsymbol='Start'):
tokens = self.lex(text)
return self.whole_match(startsymbol, tokens, srcstr=text)
def complete(self, startsymbol, tokens, init_bindings=None):
if init_bindings is None:
init_bindings = {}
ctxt = ParseContext(self.ruleset, init_bindings, (), tuple(tokens), startsymbol)
pattern = self.ruleset[startsymbol]
if init_bindings.get('*DEBUG*', False):
completions = Debugotron(stream=sys.stderr)
completions = set()
pattern.match(ctxt, completions)
return completions
import sys
class Debugotron(set):
depth = 10
def __init__(self, initializer=(), stream=sys.stdout):
set.__init__(self, initializer) = stream
def add(self, item):
set.add(self, item)
def _note_addition(self, foo):"\nitem %r added by:\n" % (foo,))
frame = sys._getframe().f_back.f_back
for i in range(self.depth):
name = frame.f_code.co_name
filename = frame.f_code.co_filename
lineno = frame.f_lineno
if 'self' in frame.f_locals:
clsobj = frame.f_locals['self']
line = '%s.%s() (%s:%d)' % (clsobj, name, filename, lineno)
line = '%s (%s:%d)' % (name, filename, lineno)' - %s\n' % (line,))
if i == 0 and 'ctxt' in frame.f_locals:' - %s\n' % (frame.f_locals['ctxt'],))
frame = frame.f_back
def update(self, items):
if items:
set.update(self, items)