Improve lisp compiler and interpreter.
This commit is contained in:
parent
4180e3b427
commit
be97b30e86
1 changed files with 363 additions and 287 deletions
642
i3toolwait
642
i3toolwait
|
@ -2,6 +2,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
|
||||||
|
import string
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
import signal
|
import signal
|
||||||
|
@ -21,317 +22,391 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from yaml import SafeLoader
|
from yaml import SafeLoader
|
||||||
|
|
||||||
class Expression:
|
|
||||||
def __init__(self):
|
|
||||||
pass
|
|
||||||
def reduce(self, ipc_data):
|
|
||||||
if self.should_call:
|
|
||||||
return self.call(ipc_data)
|
|
||||||
return functools.reduce(self.reduce_function(ipc_data), self.children)
|
|
||||||
@property
|
|
||||||
def should_call(self):
|
|
||||||
return False
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
raise NotImplemented('TODO: implement in subclass')
|
|
||||||
def reduce_function(self, ipc_data):
|
|
||||||
raise NotImplemented('TODO: implement in subclass')
|
|
||||||
def call(self, ipc_data):
|
|
||||||
raise NotImplementedError('TODO: implement in subclass')
|
|
||||||
|
|
||||||
class LiteralExpression(Expression):
|
def lazy_fc_if(env, a, b, c):
|
||||||
def __init__(self, value):
|
a.reduce(env)
|
||||||
self._value = value
|
if a.reduced:
|
||||||
def __repr__(self) -> str:
|
b.reduce(env)
|
||||||
return f'"{self._value}"'
|
return b
|
||||||
@property
|
c.reduce(env)
|
||||||
def children(self):
|
|
||||||
return [self._value]
|
|
||||||
def reduce_function(self, ipc_data):
|
|
||||||
def reduce(a, b):
|
|
||||||
raise NotImplemented('I should never be called')
|
|
||||||
|
|
||||||
class IntLiteralExpression(LiteralExpression):
|
def lazy_fc_nif(env, a, b, c):
|
||||||
def __repr__(self) -> str:
|
a.reduce(env)
|
||||||
return str(self._value)
|
if not a.reduced:
|
||||||
|
b.reduce(env)
|
||||||
|
return b
|
||||||
|
c.reduce(env)
|
||||||
|
|
||||||
class BoolLiteralExpression(LiteralExpression):
|
def fc_load(env, path):
|
||||||
def __repr__(self) -> str:
|
ipc_value = env.input
|
||||||
return str(self._value)
|
for k in path.strip('.').split('.'):
|
||||||
|
|
||||||
class AndExpression(Expression):
|
|
||||||
def __init__(self, children, *args, **kwargs):
|
|
||||||
self._children = children
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
cs = ' '.join([repr(c) for c in self.children])
|
|
||||||
return f'(& {cs})'
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return self._children
|
|
||||||
def reduce_function(self, ipc_data):
|
|
||||||
return lambda a, b: a.reduce(ipc_data) and b.reduce(ipc_data)
|
|
||||||
|
|
||||||
class OrExpression(Expression):
|
|
||||||
def __init__(self, children, *args, **kwargs):
|
|
||||||
self._children = children
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
cs = ' '.join([repr(c) for c in self.children])
|
|
||||||
return f'(| {cs})'
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return self._children
|
|
||||||
def reduce_function(self, ipc_data):
|
|
||||||
return lambda a, b: a.reduce(ipc_data) or b.reduce(ipc_data)
|
|
||||||
|
|
||||||
class IfExpression(Expression):
|
|
||||||
def __init__(self, children, *args, **kwargs):
|
|
||||||
self._children = children
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
cs = ' '.join([repr(c) for c in self.children])
|
|
||||||
return f'(? {cs})'
|
|
||||||
@property
|
|
||||||
def should_call(self):
|
|
||||||
return True
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return self._children
|
|
||||||
def call(self, ipc_data):
|
|
||||||
if self._children[0].reduce(ipc_data):
|
|
||||||
i = 1
|
|
||||||
else:
|
|
||||||
i = 2
|
|
||||||
return self._children[i].reduce(ipc_data)
|
|
||||||
|
|
||||||
class EqExpression(Expression):
|
|
||||||
def __init__(self, children, *args, **kwargs):
|
|
||||||
self._children = children
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
cs = ' '.join([repr(c) for c in self.children])
|
|
||||||
return f'(= {cs})'
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return self._children
|
|
||||||
def reduce_function(self, ipc_data):
|
|
||||||
def reduce(v0, v1):
|
|
||||||
print(f'reducing: {repr(self)}')
|
|
||||||
return v0.reduce(ipc_data) == v1.reduce(ipc_data)
|
|
||||||
return reduce
|
|
||||||
|
|
||||||
class NeqExpression(Expression):
|
|
||||||
def __init__(self, children, *args, **kwargs):
|
|
||||||
self._children = children
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
cs = ' '.join([repr(c) for c in self.children])
|
|
||||||
return f'(!= {cs})'
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return self._children
|
|
||||||
def reduce_function(self, ipc_data):
|
|
||||||
def reduce(v0, v1):
|
|
||||||
return v0.reduce(ipc_data) != v1.reduce(ipc_data)
|
|
||||||
return reduce
|
|
||||||
|
|
||||||
class GtExpression(Expression):
|
|
||||||
def __init__(self, children, *args, **kwargs):
|
|
||||||
self._children = children
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
cs = ' '.join([repr(c) for c in self.children])
|
|
||||||
return f'(> {cs})'
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return self._children
|
|
||||||
def reduce_function(self, ipc_data):
|
|
||||||
def reduce(v0, v1):
|
|
||||||
return v0.reduce(ipc_data) > v1.reduce(ipc_data)
|
|
||||||
return reduce
|
|
||||||
|
|
||||||
class LtExpression(Expression):
|
|
||||||
def __init__(self, children, *args, **kwargs):
|
|
||||||
self._children = children
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
cs = ' '.join([repr(c) for c in self.children])
|
|
||||||
return f'(< {cs})'
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return self._children
|
|
||||||
def reduce_function(self, ipc_data):
|
|
||||||
def reduce(v0, v1):
|
|
||||||
return v0.reduce(ipc_data) < v1.reduce(ipc_data)
|
|
||||||
return reduce
|
|
||||||
|
|
||||||
class LoadExpression(Expression):
|
|
||||||
def __init__(self, value, *args, **kwargs):
|
|
||||||
self._value = value
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
return f'(load {self._value})'
|
|
||||||
@property
|
|
||||||
def should_call(self):
|
|
||||||
return True
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return [self._value]
|
|
||||||
def call(self, ipc_data):
|
|
||||||
ipc_value = ipc_data
|
|
||||||
for k in self._value[0].children[0].strip('.').split('.'):
|
|
||||||
ipc_value = ipc_value[k]
|
ipc_value = ipc_value[k]
|
||||||
return ipc_value
|
return ipc_value
|
||||||
|
|
||||||
class HasKeyExpression(Expression):
|
def fc_has_key(env, path):
|
||||||
def __init__(self, value, *args, **kwargs):
|
ipc_value = env.input
|
||||||
self._value = value
|
for k in path.strip('.').split('.'):
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
return f'(has-key {self._value})'
|
|
||||||
@property
|
|
||||||
def should_call(self):
|
|
||||||
return True
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
return [self._value]
|
|
||||||
def call(self, ipc_data):
|
|
||||||
ipc_value = ipc_data
|
|
||||||
for k in self._value[0].children[0].strip('.').split('.'):
|
|
||||||
try:
|
try:
|
||||||
ipc_value = ipc_value[k]
|
ipc_value = ipc_value[k]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
class Environment:
|
||||||
|
|
||||||
expression_mapping = {
|
def __init__(self, input):
|
||||||
'&': AndExpression,
|
self._input = input
|
||||||
'|': OrExpression,
|
self._variables = {}
|
||||||
'?': IfExpression,
|
self._functions = {
|
||||||
'=': EqExpression,
|
'__last__': lambda *a: a[-1], # special function, if multiple expressions, execute all and return result of last one
|
||||||
'!=': NeqExpression,
|
'defvar': lambda env, n, v: env.set_variable(n, v),
|
||||||
'>': GtExpression,
|
'write': lambda _, a: print(a),
|
||||||
'<': LtExpression,
|
'load': fc_load,
|
||||||
'load': LoadExpression,
|
'has-key': fc_has_key,
|
||||||
'has-key': HasKeyExpression,
|
'=': lambda _, a, b: a == b,
|
||||||
}
|
'!=': lambda _, a, b: a != b,
|
||||||
|
'>': lambda _, a, b: a > b,
|
||||||
|
'<': lambda _, a, b: a < b,
|
||||||
|
'>=': lambda _, a, b: a >= b,
|
||||||
|
'<=': lambda _, a, b: a <= b,
|
||||||
|
'+': lambda _, *a: sum(a),
|
||||||
|
'-': lambda _, a, b: a - b,
|
||||||
|
'*': lambda _, *a: functools.reduce(lambda a, b: a * b, a),
|
||||||
|
'/': lambda _, a, b: a // b,
|
||||||
|
}
|
||||||
|
self._lazy_functions = {
|
||||||
|
'?': lazy_fc_if,
|
||||||
|
'!?': lazy_fc_nif,
|
||||||
|
}
|
||||||
|
|
||||||
def group_tokens(tokens: list[str]) -> list[list[str]]:
|
@property
|
||||||
groups = []
|
def input(self):
|
||||||
current_group = []
|
return self._input
|
||||||
|
|
||||||
brace_count = 0
|
def set_variable(self, name: str, value: object):
|
||||||
for token in tokens:
|
self._variables[name] = value
|
||||||
if token == '(':
|
|
||||||
brace_count += 1
|
def get_variable(self, name: str):
|
||||||
elif token == ')':
|
return self._variables[name]
|
||||||
brace_count -= 1
|
|
||||||
if brace_count == 0:
|
def get_function(self, name: str):
|
||||||
groups += [current_group]
|
return self._functions[name]
|
||||||
current_group = []
|
|
||||||
elif brace_count == 0:
|
def get_lazy_function(self, name: str):
|
||||||
groups += [[token]]
|
return self._lazy_functions[name]
|
||||||
|
|
||||||
|
class Expression:
|
||||||
|
|
||||||
|
STATE_CONSTRUCTED = 0
|
||||||
|
STATE_REDUCED = 1
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._state = Expression.STATE_CONSTRUCTED
|
||||||
|
self._reduced = None
|
||||||
|
|
||||||
|
def _reduce(self, env: Environment, args: list[object]):
|
||||||
|
_ = env, args
|
||||||
|
raise NotImplementedError('Implement in subclass')
|
||||||
|
|
||||||
|
def reduce(self, env: Environment):
|
||||||
|
if self._state == Expression.STATE_REDUCED:
|
||||||
|
return
|
||||||
|
self._reduced = self._reduce(env, [])
|
||||||
|
self._state = Expression.STATE_REDUCED
|
||||||
|
|
||||||
|
@property
|
||||||
|
def reduced(self) -> object:
|
||||||
|
if self._state != Expression.STATE_REDUCED:
|
||||||
|
raise RuntimeError('Tried to get the reduced value before reducing')
|
||||||
|
return self._reduced
|
||||||
|
|
||||||
|
class Constant(Expression):
|
||||||
|
|
||||||
|
def __init__(self, value):
|
||||||
|
super().__init__()
|
||||||
|
self._value = value
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return repr(self._value)
|
||||||
|
|
||||||
|
def _reduce(self, env: Environment, args: list[Expression]):
|
||||||
|
_ = env, args
|
||||||
|
return self._value
|
||||||
|
|
||||||
|
class VariableSet(Constant):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class VariableGet(Constant):
|
||||||
|
|
||||||
|
def _reduce(self, env: Environment, args: list[Expression]):
|
||||||
|
_ = args
|
||||||
|
return env.get_variable(self._value)
|
||||||
|
|
||||||
|
class Function(Expression):
|
||||||
|
|
||||||
|
def __init__(self, fc, args: list[Expression]):
|
||||||
|
super().__init__()
|
||||||
|
self._fc = fc
|
||||||
|
self._args = args
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'({self._fc} {self._args})'
|
||||||
|
|
||||||
|
def _reduce(self, env: Environment, args: list[Expression]):
|
||||||
|
try:
|
||||||
|
fc = env.get_function(self._fc)
|
||||||
|
[a.reduce(env) for a in args]
|
||||||
|
r = fc(env, *[a.reduced for a in args])
|
||||||
|
except KeyError as e:
|
||||||
|
fc = env.get_lazy_function(self._fc)
|
||||||
|
r = fc(env, *args)
|
||||||
|
return r
|
||||||
|
|
||||||
|
def reduce(self, env: Environment):
|
||||||
|
if self._state == Expression.STATE_REDUCED:
|
||||||
|
return
|
||||||
|
self._reduced = self._reduce(env, self._args)
|
||||||
|
self._state = Expression.STATE_REDUCED
|
||||||
|
|
||||||
|
class Token:
|
||||||
|
|
||||||
|
CONSTANT_STRING = 0
|
||||||
|
CONSTANT_INTEGER = 10
|
||||||
|
CONSTANT_BOOLEAN = 20
|
||||||
|
KEYWORD = 30
|
||||||
|
VARIABLE_SET = 40
|
||||||
|
VARIABLE_GET = 50
|
||||||
|
FUNCTION = 60
|
||||||
|
GROUPING_OPEN = 70
|
||||||
|
GROUPING_CLOSE = 80
|
||||||
|
WHITESPACE = 90
|
||||||
|
|
||||||
|
def __init__(self, t, v):
|
||||||
|
self.t = t
|
||||||
|
self.v = v
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'{self.v}::{self.t}'
|
||||||
|
|
||||||
|
def to_expression(self):
|
||||||
|
if self.t == Token.CONSTANT_STRING:
|
||||||
|
return Constant(self.v[1:-1]) # slice away the quotes
|
||||||
|
if self.t == Token.CONSTANT_INTEGER:
|
||||||
|
return Constant(int(self.v, base=0))
|
||||||
|
if self.t == Token.CONSTANT_BOOLEAN:
|
||||||
|
return Constant(self.v == 'True')
|
||||||
|
if self.t == Token.KEYWORD:
|
||||||
|
raise RuntimeError(f'This is a meta token type and should be swallowed by the sanitizer: {self}')
|
||||||
|
if self.t == Token.VARIABLE_GET:
|
||||||
|
return VariableGet(self.v)
|
||||||
|
if self.t == Token.VARIABLE_SET:
|
||||||
|
return VariableSet(self.v)
|
||||||
|
if self.t == Token.FUNCTION:
|
||||||
|
raise RuntimeError('Cant construct function just from its token')
|
||||||
|
if self.t == Token.GROUPING_OPEN or self.t == Token.GROUPING_CLOSE:
|
||||||
|
raise RuntimeError('Groupings should never be constructed, this is a bug')
|
||||||
|
if self.t == Token.WHITESPACE:
|
||||||
|
raise RuntimeError('Whitespaces should not be present in this stage of the build')
|
||||||
|
raise RuntimeError(f'The token type {self.t} is not implemented')
|
||||||
|
|
||||||
|
def token_extract_string(stream: str) -> tuple[Token, str]:
|
||||||
|
if stream[0] != '"':
|
||||||
|
raise ValueError('No such token in stream')
|
||||||
|
i = stream.find('"', 1)
|
||||||
|
return Token(Token.CONSTANT_STRING, stream[:i+1]), stream[i+1:]
|
||||||
|
|
||||||
|
def token_extract_integer(stream: str) -> tuple[Token, str]:
|
||||||
|
i = 0
|
||||||
|
base = None
|
||||||
|
if stream[i] in '+-':
|
||||||
|
i += 1
|
||||||
|
if stream[i] in '0123456789':
|
||||||
|
i += 1
|
||||||
else:
|
else:
|
||||||
current_group += [token]
|
raise ValueError('Malformed integer')
|
||||||
return groups
|
|
||||||
|
|
||||||
def build_expression(tokens: list[str]) -> Expression:
|
if stream[i] in 'xbo':
|
||||||
if tokens[0] == '(' and tokens[-1] == ')':
|
base = stream[i]
|
||||||
tokens = tokens[1:-1]
|
i += 1
|
||||||
token_groups = group_tokens(tokens)
|
int_set = {None: '0123456789', 'x': '0123456789abcdefABCDEF', 'b': '01', 'o': '01234567'}[base]
|
||||||
|
while stream[i] in int_set:
|
||||||
|
i += 1
|
||||||
|
return Token(Token.CONSTANT_INTEGER, stream[:i]), stream[i:]
|
||||||
|
|
||||||
expressions = [build_expression(ts) for ts in token_groups[1:]] if len(token_groups) > 1 else []
|
def token_extract_boolean(stream: str) -> tuple[Token, str]:
|
||||||
root_expression = None
|
if stream.startswith('True'):
|
||||||
token = token_groups[0][0]
|
return Token(Token.CONSTANT_BOOLEAN, stream[:4]), stream[4:]
|
||||||
if token in expression_mapping:
|
elif stream.startswith('False'):
|
||||||
root_expression = expression_mapping[token](expressions)
|
return Token(Token.CONSTANT_BOOLEAN, stream[:5]), stream[5:]
|
||||||
elif token.startswith('"'):
|
raise ValueError('No such token in stream')
|
||||||
root_expression = LiteralExpression(token[1:-1])
|
|
||||||
elif token.isnumeric():
|
|
||||||
root_expression = IntLiteralExpression(int(token))
|
|
||||||
elif token in ('True', 'False'):
|
|
||||||
root_expression = BoolLiteralExpression(token == 'True')
|
|
||||||
assert isinstance(root_expression, Expression)
|
|
||||||
return root_expression
|
|
||||||
|
|
||||||
def take_space(s: str) -> tuple:
|
def token_extract_keyword(stream: str) -> tuple[Token, str]:
|
||||||
if s[0] in ' \n':
|
i = 0
|
||||||
return None, s[1:]
|
if stream[i] in string.ascii_letters + '_-><=!+-*/?':
|
||||||
return None, s
|
i += 1
|
||||||
|
|
||||||
def take_operator(s: str) -> tuple:
|
|
||||||
token = ''
|
|
||||||
for c in s:
|
|
||||||
if c in ''.join(set(expression_mapping.keys())):
|
|
||||||
token += c
|
|
||||||
else:
|
else:
|
||||||
break
|
raise ValueError('No keyword in stream')
|
||||||
if token == '':
|
while stream[i] in string.ascii_letters + '_-><=!+-*/?':
|
||||||
return None, s
|
i += 1
|
||||||
else:
|
return Token(Token.KEYWORD, stream[:i]), stream[i:]
|
||||||
return token, s[len(token):]
|
|
||||||
|
|
||||||
def take_brace(s: str) -> tuple:
|
def token_extract_grouping_open(stream: str) -> tuple[Token, str]:
|
||||||
if s[0] in '()':
|
if stream[0] == '(':
|
||||||
return s[0], s[1:]
|
return Token(Token.GROUPING_OPEN, '('), stream[1:]
|
||||||
else:
|
raise ValueError('No such token in stream')
|
||||||
return None, s
|
|
||||||
|
|
||||||
def take_literal(s: str) -> tuple:
|
def token_extract_grouping_close(stream: str) -> tuple[Token, str]:
|
||||||
token = '"'
|
if stream[0] == ')':
|
||||||
if s[0] != '"':
|
return Token(Token.GROUPING_CLOSE, ')'), stream[1:]
|
||||||
return None, s
|
raise ValueError('No such token in stream')
|
||||||
for c in s[1:]:
|
|
||||||
token += c
|
|
||||||
if c == '"':
|
|
||||||
break
|
|
||||||
if not token.endswith('"'):
|
|
||||||
raise ValueError('Missing closing quotes (`"`)')
|
|
||||||
return token, s[len(token):]
|
|
||||||
|
|
||||||
def take_int_literal(s: str) -> tuple:
|
def token_extract_space(stream: str) -> tuple[Token, str]:
|
||||||
token = ''
|
i = 0
|
||||||
for c in s:
|
while stream[i] in string.whitespace:
|
||||||
if not c.isnumeric():
|
i += 1
|
||||||
break
|
return Token(Token.WHITESPACE, stream[:i]), stream[i:]
|
||||||
token += c
|
|
||||||
if token == '':
|
|
||||||
return None, s
|
|
||||||
return token, s[len(token):]
|
|
||||||
|
|
||||||
def take_bool_literal(s: str) -> tuple:
|
def tokenize(program: str) -> list[Token]:
|
||||||
if s.startswith('True'):
|
extractors = [
|
||||||
return 'True', s[len('True'):]
|
token_extract_boolean,
|
||||||
if s.startswith('False'):
|
token_extract_integer,
|
||||||
return 'False', s[len('False'):]
|
token_extract_string,
|
||||||
return None, s
|
token_extract_keyword,
|
||||||
|
token_extract_grouping_open,
|
||||||
def tokenize(s: str) -> list[str]:
|
token_extract_grouping_close,
|
||||||
operator_extractors = [
|
token_extract_space,
|
||||||
take_operator,
|
|
||||||
take_brace,
|
|
||||||
take_literal,
|
|
||||||
take_int_literal,
|
|
||||||
take_bool_literal,
|
|
||||||
take_space,
|
|
||||||
]
|
]
|
||||||
|
p = program
|
||||||
tokens = []
|
tokens = []
|
||||||
while s != '':
|
while p:
|
||||||
previous_len = len(s)
|
success = False
|
||||||
for operator_extractor in operator_extractors:
|
for e in extractors:
|
||||||
token, s = operator_extractor(s)
|
try:
|
||||||
if token is not None:
|
t, p = e(p)
|
||||||
tokens += [token]
|
tokens += [t]
|
||||||
|
success = True
|
||||||
break
|
break
|
||||||
if len(s) == previous_len:
|
except ValueError:
|
||||||
raise ValueError(f'Could not tokenize string {s}')
|
pass
|
||||||
|
if not success:
|
||||||
|
raise ValueError('Program is invalid')
|
||||||
|
return [t for t in tokens if t.t != Token.WHITESPACE]
|
||||||
|
|
||||||
|
def tokenize_sanitize_function(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None:
|
||||||
|
if token_before is None:
|
||||||
|
return
|
||||||
|
if token_before.t == Token.GROUPING_OPEN and token.t == Token.KEYWORD:
|
||||||
|
return Token(Token.FUNCTION, token.v)
|
||||||
|
|
||||||
|
def tokenize_sanitize_setvar(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None:
|
||||||
|
if token_before is None:
|
||||||
|
return
|
||||||
|
if (token_before.t == Token.FUNCTION and token_before.v == 'defvar') and token.t == Token.KEYWORD:
|
||||||
|
return Token(Token.VARIABLE_SET, token.v)
|
||||||
|
|
||||||
|
def tokenize_sanitize_getvar(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None:
|
||||||
|
if token_before is None:
|
||||||
|
if token.t == Token.KEYWORD:
|
||||||
|
return Token(Token.VARIABLE_GET, token.v)
|
||||||
|
return
|
||||||
|
if (token_before.t != Token.FUNCTION or token_before.v != 'defvar') and token.t == Token.KEYWORD:
|
||||||
|
return Token(Token.VARIABLE_GET, token.v)
|
||||||
|
|
||||||
|
def _tokenize_sanitize(tokens: list[Token]) -> tuple[bool, list[Token]]:
|
||||||
|
sanitizers = [
|
||||||
|
tokenize_sanitize_function,
|
||||||
|
tokenize_sanitize_setvar,
|
||||||
|
tokenize_sanitize_getvar,
|
||||||
|
]
|
||||||
|
new_tokens = []
|
||||||
|
changed = False
|
||||||
|
for i in range(len(tokens)):
|
||||||
|
for s in sanitizers:
|
||||||
|
p_token = new_tokens[i-1] if i > 0 else None
|
||||||
|
n_token = tokens[i+1] if i < (len(tokens)-1) else None
|
||||||
|
new_token = s(p_token, tokens[i], n_token)
|
||||||
|
if new_token is not None:
|
||||||
|
changed = True
|
||||||
|
new_tokens += [new_token]
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
new_tokens += [tokens[i]]
|
||||||
|
return changed, new_tokens
|
||||||
|
|
||||||
|
def tokenize_sanitize(tokens: list[Token]) -> list[Token]:
|
||||||
|
_, tokens = _tokenize_sanitize(tokens)
|
||||||
return tokens
|
return tokens
|
||||||
|
|
||||||
def parse(s: str) -> Expression:
|
def take_token_group(tokens: list[Token], n: int = 1) -> list[Token]:
|
||||||
tokens = tokenize(s)
|
i = 0
|
||||||
return build_expression(tokens)
|
start = i
|
||||||
|
group_count = 0
|
||||||
|
consider_groups = False
|
||||||
|
while n:
|
||||||
|
if tokens[i].t == Token.GROUPING_OPEN:
|
||||||
|
consider_groups = True
|
||||||
|
if group_count == 0:
|
||||||
|
start = i
|
||||||
|
group_count += 1
|
||||||
|
elif tokens[i].t == Token.GROUPING_CLOSE:
|
||||||
|
group_count -= 1
|
||||||
|
if group_count == 0:
|
||||||
|
consider_groups = False
|
||||||
|
else:
|
||||||
|
if not consider_groups:
|
||||||
|
start = i
|
||||||
|
if group_count == 0:
|
||||||
|
n -= 1
|
||||||
|
if group_count < 0:
|
||||||
|
raise ValueError('reached past end')
|
||||||
|
i += 1
|
||||||
|
return tokens[start:i]
|
||||||
|
|
||||||
|
def unwrap_token_group(tokens: list[Token]) -> list[Token]:
|
||||||
|
if tokens[0].t != Token.GROUPING_OPEN:
|
||||||
|
return tokens
|
||||||
|
|
||||||
|
brace_count = 0
|
||||||
|
for i, t in enumerate(tokens):
|
||||||
|
brace_count += int(t.t == Token.GROUPING_OPEN)
|
||||||
|
brace_count -= int(t.t == Token.GROUPING_CLOSE)
|
||||||
|
if i == len(tokens) - 2:
|
||||||
|
if brace_count > 0:
|
||||||
|
tokens = tokens[1:-1]
|
||||||
|
break
|
||||||
|
return tokens
|
||||||
|
|
||||||
|
def build(tokens: list[Token]) -> Expression:
|
||||||
|
tokens = unwrap_token_group(tokens)
|
||||||
|
token_groups: list[list[Token]] = []
|
||||||
|
i = 1
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
token_groups += [take_token_group(tokens, n=i)]
|
||||||
|
i += 1
|
||||||
|
except IndexError:
|
||||||
|
break
|
||||||
|
|
||||||
|
# special function case
|
||||||
|
if len(token_groups[0]) == 1 and token_groups[0][0].t == Token.FUNCTION:
|
||||||
|
token_0 = token_groups[0][0]
|
||||||
|
args = [build(tg) for tg in token_groups[1:]]
|
||||||
|
return Function(token_0.v, args)
|
||||||
|
|
||||||
|
# combine to multiple statements
|
||||||
|
if len(token_groups) > 1:
|
||||||
|
return Function('__last__', [build(tg) for tg in token_groups])
|
||||||
|
|
||||||
|
# create a basic expression
|
||||||
|
if len(token_groups) == 1 and len(token_groups[0]) == 1:
|
||||||
|
return token_groups[0][0].to_expression()
|
||||||
|
|
||||||
|
raise RuntimeError(f'Did not handle token case in build function, token_groups: {token_groups}')
|
||||||
|
|
||||||
|
def parse(program: str) -> Expression:
|
||||||
|
tokens = tokenize_sanitize(tokenize(program))
|
||||||
|
expression = build(tokens)
|
||||||
|
return expression
|
||||||
|
|
||||||
class Filter(Expression):
|
class Filter(Expression):
|
||||||
|
|
||||||
|
@ -453,7 +528,8 @@ def window_new(runtime_data: RuntimeData, *, debug):
|
||||||
print(json.dumps(e.ipc_data))
|
print(json.dumps(e.ipc_data))
|
||||||
async with runtime_data.lock:
|
async with runtime_data.lock:
|
||||||
for i, cfg in enumerate(runtime_data.programs):
|
for i, cfg in enumerate(runtime_data.programs):
|
||||||
if cfg.match.reduce(e.ipc_data):
|
cfg.match.reduce(Environment(e.ipc_data))
|
||||||
|
if cfg.match.reduced:
|
||||||
container_id = e.ipc_data['container']['id']
|
container_id = e.ipc_data['container']['id']
|
||||||
await ipc.command(f'for_window [con_id="{container_id}"] focus')
|
await ipc.command(f'for_window [con_id="{container_id}"] focus')
|
||||||
await ipc.command(f'move container to workspace {cfg.workspace}')
|
await ipc.command(f'move container to workspace {cfg.workspace}')
|
||||||
|
|
Loading…
Reference in a new issue