From be97b30e8664132e0f7740ea6aeb08db98a285e4 Mon Sep 17 00:00:00 2001 From: redxef Date: Sun, 27 Nov 2022 00:32:20 +0100 Subject: [PATCH] Improve lisp compiler and interpreter. --- i3toolwait | 650 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 363 insertions(+), 287 deletions(-) diff --git a/i3toolwait b/i3toolwait index d67e1bf..8e473f1 100755 --- a/i3toolwait +++ b/i3toolwait @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- +import string import typing import asyncio import signal @@ -21,317 +22,391 @@ try: except ImportError: from yaml import SafeLoader -class Expression: - def __init__(self): - pass - def reduce(self, ipc_data): - if self.should_call: - return self.call(ipc_data) - return functools.reduce(self.reduce_function(ipc_data), self.children) - @property - def should_call(self): - return False - @property - def children(self): - raise NotImplemented('TODO: implement in subclass') - def reduce_function(self, ipc_data): - raise NotImplemented('TODO: implement in subclass') - def call(self, ipc_data): - raise NotImplementedError('TODO: implement in subclass') -class LiteralExpression(Expression): - def __init__(self, value): - self._value = value - def __repr__(self) -> str: - return f'"{self._value}"' - @property - def children(self): - return [self._value] - def reduce_function(self, ipc_data): - def reduce(a, b): - raise NotImplemented('I should never be called') +def lazy_fc_if(env, a, b, c): + a.reduce(env) + if a.reduced: + b.reduce(env) + return b + c.reduce(env) -class IntLiteralExpression(LiteralExpression): - def __repr__(self) -> str: - return str(self._value) +def lazy_fc_nif(env, a, b, c): + a.reduce(env) + if not a.reduced: + b.reduce(env) + return b + c.reduce(env) -class BoolLiteralExpression(LiteralExpression): - def __repr__(self) -> str: - return str(self._value) +def fc_load(env, path): + ipc_value = env.input + for k in path.strip('.').split('.'): + ipc_value = ipc_value[k] + return ipc_value -class AndExpression(Expression): - def __init__(self, children, *args, **kwargs): - self._children = children - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - cs = ' '.join([repr(c) for c in self.children]) - return f'(& {cs})' - @property - def children(self): - return self._children - def reduce_function(self, ipc_data): - return lambda a, b: a.reduce(ipc_data) and b.reduce(ipc_data) - -class OrExpression(Expression): - def __init__(self, children, *args, **kwargs): - self._children = children - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - cs = ' '.join([repr(c) for c in self.children]) - return f'(| {cs})' - @property - def children(self): - return self._children - def reduce_function(self, ipc_data): - return lambda a, b: a.reduce(ipc_data) or b.reduce(ipc_data) - -class IfExpression(Expression): - def __init__(self, children, *args, **kwargs): - self._children = children - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - cs = ' '.join([repr(c) for c in self.children]) - return f'(? {cs})' - @property - def should_call(self): - return True - @property - def children(self): - return self._children - def call(self, ipc_data): - if self._children[0].reduce(ipc_data): - i = 1 - else: - i = 2 - return self._children[i].reduce(ipc_data) - -class EqExpression(Expression): - def __init__(self, children, *args, **kwargs): - self._children = children - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - cs = ' '.join([repr(c) for c in self.children]) - return f'(= {cs})' - @property - def children(self): - return self._children - def reduce_function(self, ipc_data): - def reduce(v0, v1): - print(f'reducing: {repr(self)}') - return v0.reduce(ipc_data) == v1.reduce(ipc_data) - return reduce - -class NeqExpression(Expression): - def __init__(self, children, *args, **kwargs): - self._children = children - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - cs = ' '.join([repr(c) for c in self.children]) - return f'(!= {cs})' - @property - def children(self): - return self._children - def reduce_function(self, ipc_data): - def reduce(v0, v1): - return v0.reduce(ipc_data) != v1.reduce(ipc_data) - return reduce - -class GtExpression(Expression): - def __init__(self, children, *args, **kwargs): - self._children = children - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - cs = ' '.join([repr(c) for c in self.children]) - return f'(> {cs})' - @property - def children(self): - return self._children - def reduce_function(self, ipc_data): - def reduce(v0, v1): - return v0.reduce(ipc_data) > v1.reduce(ipc_data) - return reduce - -class LtExpression(Expression): - def __init__(self, children, *args, **kwargs): - self._children = children - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - cs = ' '.join([repr(c) for c in self.children]) - return f'(< {cs})' - @property - def children(self): - return self._children - def reduce_function(self, ipc_data): - def reduce(v0, v1): - return v0.reduce(ipc_data) < v1.reduce(ipc_data) - return reduce - -class LoadExpression(Expression): - def __init__(self, value, *args, **kwargs): - self._value = value - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - return f'(load {self._value})' - @property - def should_call(self): - return True - @property - def children(self): - return [self._value] - def call(self, ipc_data): - ipc_value = ipc_data - for k in self._value[0].children[0].strip('.').split('.'): +def fc_has_key(env, path): + ipc_value = env.input + for k in path.strip('.').split('.'): + try: ipc_value = ipc_value[k] - return ipc_value + except KeyError: + return False + return True + +class Environment: + + def __init__(self, input): + self._input = input + self._variables = {} + self._functions = { + '__last__': lambda *a: a[-1], # special function, if multiple expressions, execute all and return result of last one + 'defvar': lambda env, n, v: env.set_variable(n, v), + 'write': lambda _, a: print(a), + 'load': fc_load, + 'has-key': fc_has_key, + '=': lambda _, a, b: a == b, + '!=': lambda _, a, b: a != b, + '>': lambda _, a, b: a > b, + '<': lambda _, a, b: a < b, + '>=': lambda _, a, b: a >= b, + '<=': lambda _, a, b: a <= b, + '+': lambda _, *a: sum(a), + '-': lambda _, a, b: a - b, + '*': lambda _, *a: functools.reduce(lambda a, b: a * b, a), + '/': lambda _, a, b: a // b, + } + self._lazy_functions = { + '?': lazy_fc_if, + '!?': lazy_fc_nif, + } + + @property + def input(self): + return self._input + + def set_variable(self, name: str, value: object): + self._variables[name] = value + + def get_variable(self, name: str): + return self._variables[name] + + def get_function(self, name: str): + return self._functions[name] + + def get_lazy_function(self, name: str): + return self._lazy_functions[name] + +class Expression: + + STATE_CONSTRUCTED = 0 + STATE_REDUCED = 1 + + def __init__(self): + self._state = Expression.STATE_CONSTRUCTED + self._reduced = None + + def _reduce(self, env: Environment, args: list[object]): + _ = env, args + raise NotImplementedError('Implement in subclass') + + def reduce(self, env: Environment): + if self._state == Expression.STATE_REDUCED: + return + self._reduced = self._reduce(env, []) + self._state = Expression.STATE_REDUCED -class HasKeyExpression(Expression): - def __init__(self, value, *args, **kwargs): + @property + def reduced(self) -> object: + if self._state != Expression.STATE_REDUCED: + raise RuntimeError('Tried to get the reduced value before reducing') + return self._reduced + +class Constant(Expression): + + def __init__(self, value): + super().__init__() self._value = value - super().__init__(*args, **kwargs) - def __repr__(self) -> str: - return f'(has-key {self._value})' - @property - def should_call(self): - return True - @property - def children(self): - return [self._value] - def call(self, ipc_data): - ipc_value = ipc_data - for k in self._value[0].children[0].strip('.').split('.'): - try: - ipc_value = ipc_value[k] - except KeyError: - return False - return True + def __repr__(self): + return repr(self._value) -expression_mapping = { - '&': AndExpression, - '|': OrExpression, - '?': IfExpression, - '=': EqExpression, - '!=': NeqExpression, - '>': GtExpression, - '<': LtExpression, - 'load': LoadExpression, - 'has-key': HasKeyExpression, -} + def _reduce(self, env: Environment, args: list[Expression]): + _ = env, args + return self._value -def group_tokens(tokens: list[str]) -> list[list[str]]: - groups = [] - current_group = [] - - brace_count = 0 - for token in tokens: - if token == '(': - brace_count += 1 - elif token == ')': - brace_count -= 1 - if brace_count == 0: - groups += [current_group] - current_group = [] - elif brace_count == 0: - groups += [[token]] - else: - current_group += [token] - return groups +class VariableSet(Constant): + pass -def build_expression(tokens: list[str]) -> Expression: - if tokens[0] == '(' and tokens[-1] == ')': - tokens = tokens[1:-1] - token_groups = group_tokens(tokens) +class VariableGet(Constant): - expressions = [build_expression(ts) for ts in token_groups[1:]] if len(token_groups) > 1 else [] - root_expression = None - token = token_groups[0][0] - if token in expression_mapping: - root_expression = expression_mapping[token](expressions) - elif token.startswith('"'): - root_expression = LiteralExpression(token[1:-1]) - elif token.isnumeric(): - root_expression = IntLiteralExpression(int(token)) - elif token in ('True', 'False'): - root_expression = BoolLiteralExpression(token == 'True') - assert isinstance(root_expression, Expression) - return root_expression - -def take_space(s: str) -> tuple: - if s[0] in ' \n': - return None, s[1:] - return None, s + def _reduce(self, env: Environment, args: list[Expression]): + _ = args + return env.get_variable(self._value) -def take_operator(s: str) -> tuple: - token = '' - for c in s: - if c in ''.join(set(expression_mapping.keys())): - token += c - else: - break - if token == '': - return None, s +class Function(Expression): + + def __init__(self, fc, args: list[Expression]): + super().__init__() + self._fc = fc + self._args = args + + def __repr__(self): + return f'({self._fc} {self._args})' + + def _reduce(self, env: Environment, args: list[Expression]): + try: + fc = env.get_function(self._fc) + [a.reduce(env) for a in args] + r = fc(env, *[a.reduced for a in args]) + except KeyError as e: + fc = env.get_lazy_function(self._fc) + r = fc(env, *args) + return r + + def reduce(self, env: Environment): + if self._state == Expression.STATE_REDUCED: + return + self._reduced = self._reduce(env, self._args) + self._state = Expression.STATE_REDUCED + +class Token: + + CONSTANT_STRING = 0 + CONSTANT_INTEGER = 10 + CONSTANT_BOOLEAN = 20 + KEYWORD = 30 + VARIABLE_SET = 40 + VARIABLE_GET = 50 + FUNCTION = 60 + GROUPING_OPEN = 70 + GROUPING_CLOSE = 80 + WHITESPACE = 90 + + def __init__(self, t, v): + self.t = t + self.v = v + + def __repr__(self): + return f'{self.v}::{self.t}' + + def to_expression(self): + if self.t == Token.CONSTANT_STRING: + return Constant(self.v[1:-1]) # slice away the quotes + if self.t == Token.CONSTANT_INTEGER: + return Constant(int(self.v, base=0)) + if self.t == Token.CONSTANT_BOOLEAN: + return Constant(self.v == 'True') + if self.t == Token.KEYWORD: + raise RuntimeError(f'This is a meta token type and should be swallowed by the sanitizer: {self}') + if self.t == Token.VARIABLE_GET: + return VariableGet(self.v) + if self.t == Token.VARIABLE_SET: + return VariableSet(self.v) + if self.t == Token.FUNCTION: + raise RuntimeError('Cant construct function just from its token') + if self.t == Token.GROUPING_OPEN or self.t == Token.GROUPING_CLOSE: + raise RuntimeError('Groupings should never be constructed, this is a bug') + if self.t == Token.WHITESPACE: + raise RuntimeError('Whitespaces should not be present in this stage of the build') + raise RuntimeError(f'The token type {self.t} is not implemented') + +def token_extract_string(stream: str) -> tuple[Token, str]: + if stream[0] != '"': + raise ValueError('No such token in stream') + i = stream.find('"', 1) + return Token(Token.CONSTANT_STRING, stream[:i+1]), stream[i+1:] + +def token_extract_integer(stream: str) -> tuple[Token, str]: + i = 0 + base = None + if stream[i] in '+-': + i += 1 + if stream[i] in '0123456789': + i += 1 else: - return token, s[len(token):] + raise ValueError('Malformed integer') -def take_brace(s: str) -> tuple: - if s[0] in '()': - return s[0], s[1:] + if stream[i] in 'xbo': + base = stream[i] + i += 1 + int_set = {None: '0123456789', 'x': '0123456789abcdefABCDEF', 'b': '01', 'o': '01234567'}[base] + while stream[i] in int_set: + i += 1 + return Token(Token.CONSTANT_INTEGER, stream[:i]), stream[i:] + +def token_extract_boolean(stream: str) -> tuple[Token, str]: + if stream.startswith('True'): + return Token(Token.CONSTANT_BOOLEAN, stream[:4]), stream[4:] + elif stream.startswith('False'): + return Token(Token.CONSTANT_BOOLEAN, stream[:5]), stream[5:] + raise ValueError('No such token in stream') + +def token_extract_keyword(stream: str) -> tuple[Token, str]: + i = 0 + if stream[i] in string.ascii_letters + '_-><=!+-*/?': + i += 1 else: - return None, s + raise ValueError('No keyword in stream') + while stream[i] in string.ascii_letters + '_-><=!+-*/?': + i += 1 + return Token(Token.KEYWORD, stream[:i]), stream[i:] -def take_literal(s: str) -> tuple: - token = '"' - if s[0] != '"': - return None, s - for c in s[1:]: - token += c - if c == '"': - break - if not token.endswith('"'): - raise ValueError('Missing closing quotes (`"`)') - return token, s[len(token):] +def token_extract_grouping_open(stream: str) -> tuple[Token, str]: + if stream[0] == '(': + return Token(Token.GROUPING_OPEN, '('), stream[1:] + raise ValueError('No such token in stream') -def take_int_literal(s: str) -> tuple: - token = '' - for c in s: - if not c.isnumeric(): - break - token += c - if token == '': - return None, s - return token, s[len(token):] +def token_extract_grouping_close(stream: str) -> tuple[Token, str]: + if stream[0] == ')': + return Token(Token.GROUPING_CLOSE, ')'), stream[1:] + raise ValueError('No such token in stream') -def take_bool_literal(s: str) -> tuple: - if s.startswith('True'): - return 'True', s[len('True'):] - if s.startswith('False'): - return 'False', s[len('False'):] - return None, s +def token_extract_space(stream: str) -> tuple[Token, str]: + i = 0 + while stream[i] in string.whitespace: + i += 1 + return Token(Token.WHITESPACE, stream[:i]), stream[i:] -def tokenize(s: str) -> list[str]: - operator_extractors = [ - take_operator, - take_brace, - take_literal, - take_int_literal, - take_bool_literal, - take_space, +def tokenize(program: str) -> list[Token]: + extractors = [ + token_extract_boolean, + token_extract_integer, + token_extract_string, + token_extract_keyword, + token_extract_grouping_open, + token_extract_grouping_close, + token_extract_space, ] + p = program tokens = [] - while s != '': - previous_len = len(s) - for operator_extractor in operator_extractors: - token, s = operator_extractor(s) - if token is not None: - tokens += [token] + while p: + success = False + for e in extractors: + try: + t, p = e(p) + tokens += [t] + success = True break - if len(s) == previous_len: - raise ValueError(f'Could not tokenize string {s}') + except ValueError: + pass + if not success: + raise ValueError('Program is invalid') + return [t for t in tokens if t.t != Token.WHITESPACE] + +def tokenize_sanitize_function(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None: + if token_before is None: + return + if token_before.t == Token.GROUPING_OPEN and token.t == Token.KEYWORD: + return Token(Token.FUNCTION, token.v) + +def tokenize_sanitize_setvar(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None: + if token_before is None: + return + if (token_before.t == Token.FUNCTION and token_before.v == 'defvar') and token.t == Token.KEYWORD: + return Token(Token.VARIABLE_SET, token.v) + +def tokenize_sanitize_getvar(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None: + if token_before is None: + if token.t == Token.KEYWORD: + return Token(Token.VARIABLE_GET, token.v) + return + if (token_before.t != Token.FUNCTION or token_before.v != 'defvar') and token.t == Token.KEYWORD: + return Token(Token.VARIABLE_GET, token.v) + +def _tokenize_sanitize(tokens: list[Token]) -> tuple[bool, list[Token]]: + sanitizers = [ + tokenize_sanitize_function, + tokenize_sanitize_setvar, + tokenize_sanitize_getvar, + ] + new_tokens = [] + changed = False + for i in range(len(tokens)): + for s in sanitizers: + p_token = new_tokens[i-1] if i > 0 else None + n_token = tokens[i+1] if i < (len(tokens)-1) else None + new_token = s(p_token, tokens[i], n_token) + if new_token is not None: + changed = True + new_tokens += [new_token] + break + else: + new_tokens += [tokens[i]] + return changed, new_tokens + +def tokenize_sanitize(tokens: list[Token]) -> list[Token]: + _, tokens = _tokenize_sanitize(tokens) return tokens -def parse(s: str) -> Expression: - tokens = tokenize(s) - return build_expression(tokens) +def take_token_group(tokens: list[Token], n: int = 1) -> list[Token]: + i = 0 + start = i + group_count = 0 + consider_groups = False + while n: + if tokens[i].t == Token.GROUPING_OPEN: + consider_groups = True + if group_count == 0: + start = i + group_count += 1 + elif tokens[i].t == Token.GROUPING_CLOSE: + group_count -= 1 + if group_count == 0: + consider_groups = False + else: + if not consider_groups: + start = i + if group_count == 0: + n -= 1 + if group_count < 0: + raise ValueError('reached past end') + i += 1 + return tokens[start:i] + +def unwrap_token_group(tokens: list[Token]) -> list[Token]: + if tokens[0].t != Token.GROUPING_OPEN: + return tokens + + brace_count = 0 + for i, t in enumerate(tokens): + brace_count += int(t.t == Token.GROUPING_OPEN) + brace_count -= int(t.t == Token.GROUPING_CLOSE) + if i == len(tokens) - 2: + if brace_count > 0: + tokens = tokens[1:-1] + break + return tokens + +def build(tokens: list[Token]) -> Expression: + tokens = unwrap_token_group(tokens) + token_groups: list[list[Token]] = [] + i = 1 + while True: + try: + token_groups += [take_token_group(tokens, n=i)] + i += 1 + except IndexError: + break + + # special function case + if len(token_groups[0]) == 1 and token_groups[0][0].t == Token.FUNCTION: + token_0 = token_groups[0][0] + args = [build(tg) for tg in token_groups[1:]] + return Function(token_0.v, args) + + # combine to multiple statements + if len(token_groups) > 1: + return Function('__last__', [build(tg) for tg in token_groups]) + + # create a basic expression + if len(token_groups) == 1 and len(token_groups[0]) == 1: + return token_groups[0][0].to_expression() + + raise RuntimeError(f'Did not handle token case in build function, token_groups: {token_groups}') + +def parse(program: str) -> Expression: + tokens = tokenize_sanitize(tokenize(program)) + expression = build(tokens) + return expression class Filter(Expression): @@ -453,7 +528,8 @@ def window_new(runtime_data: RuntimeData, *, debug): print(json.dumps(e.ipc_data)) async with runtime_data.lock: for i, cfg in enumerate(runtime_data.programs): - if cfg.match.reduce(e.ipc_data): + cfg.match.reduce(Environment(e.ipc_data)) + if cfg.match.reduced: container_id = e.ipc_data['container']['id'] await ipc.command(f'for_window [con_id="{container_id}"] focus') await ipc.command(f'move container to workspace {cfg.workspace}')