i3toolwait/i3toolwait

373 lines
11 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import collections.abc
import asyncio
import signal
import os
import functools
import json
import yaml
import click
import i3ipc
import i3ipc.aio
try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader
class Expression:
def __init__(self):
pass
def reduce(self, ipc_data):
return functools.reduce(self.reduce_function(ipc_data), self.children)
@property
def children(self):
raise NotImplemented('TODO: implement in subclass')
def reduce_function(self, ipc_data):
raise NotImplemented('TODO: implement in subclass')
class LiteralExpression(Expression):
def __init__(self, value):
self._value = value
def __repr__(self) -> str:
return f'"{self._value}"'
@property
def children(self):
return [self._value]
def reduce_function(self, ipc_data):
def reduce(a, b):
raise NotImplemented('I should never be called')
class IntLiteralExpression(LiteralExpression):
def __repr__(self) -> str:
return str(self._value)
class BoolLiteralExpression(LiteralExpression):
def __repr__(self) -> str:
return str(self._value)
class AndExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(& {cs})'
@property
def children(self):
return self._children
def reduce_function(self, ipc_data):
return lambda a, b: a.reduce(ipc_data) and b.reduce(ipc_data)
class OrExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(| {cs})'
@property
def children(self):
return self._children
def reduce_function(self, ipc_data):
return lambda a, b: a.reduce(ipc_data) or b.reduce(ipc_data)
class EqExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(= {cs})'
@property
def children(self):
return self._children
def reduce_function(self, ipc_data):
def reduce(key, value):
ipc_value = ipc_data
for k in key.reduce(ipc_data).strip('.').split('.'):
ipc_value = ipc_value[k]
return ipc_value == value.reduce(ipc_data)
return reduce
class NeqExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(!= {cs})'
@property
def children(self):
return self._children
def reduce_function(self, ipc_data):
def reduce(key, value):
ipc_value = ipc_data
for k in key.reduce(ipc_data).strip('.').split('.'):
ipc_value = ipc_value[k]
return ipc_value != value.reduce(ipc_data)
return reduce
class GtExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(> {cs})'
@property
def children(self):
return self._children
def reduce_function(self, ipc_data):
def reduce(key, value):
ipc_value = ipc_data
for k in key.reduce(ipc_data).strip('.').split('.'):
ipc_value = ipc_value[k]
return ipc_value > value.reduce(ipc_data)
return reduce
class LtExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(< {cs})'
@property
def children(self):
return self._children
def reduce_function(self, ipc_data):
def reduce(key, value):
ipc_value = ipc_data
for k in key.reduce(ipc_data).strip('.').split('.'):
ipc_value = ipc_value[k]
return ipc_value < value.reduce(ipc_data)
return reduce
expression_mapping = {
'&': AndExpression,
'|': OrExpression,
'=': EqExpression,
'!=': NeqExpression,
'>': GtExpression,
'<': LtExpression,
}
def group_tokens(tokens: list[str]) -> list[list[str]]:
groups = []
current_group = []
brace_count = 0
for token in tokens:
if token == '(':
brace_count += 1
elif token == ')':
brace_count -= 1
if brace_count == 0:
groups += [current_group]
current_group = []
elif brace_count == 0:
groups += [[token]]
else:
current_group += [token]
return groups
def build_expression(tokens: list[str]) -> Expression:
if tokens[0] == '(' and tokens[-1] == ')':
tokens = tokens[1:-1]
token_groups = group_tokens(tokens)
expressions = [build_expression(ts) for ts in token_groups[1:]] if len(token_groups) > 1 else []
root_expression = None
token = token_groups[0][0]
if token in expression_mapping:
root_expression = expression_mapping[token](expressions)
elif token.startswith('"'):
root_expression = LiteralExpression(token[1:-1])
elif token.isnumeric():
root_expression = IntLiteralExpression(int(token))
elif token in ('True', 'False'):
root_expression = BoolLiteralExpression(token == 'True')
assert isinstance(root_expression, Expression)
return root_expression
def take_space(s: str) -> tuple:
if s[0] in ' \n':
return None, s[1:]
return None, s
def take_operator(s: str) -> tuple:
token = ''
for c in s:
if c in ''.join(set(expression_mapping.keys())):
token += c
else:
break
if token == '':
return None, s
else:
return token, s[len(token):]
def take_brace(s: str) -> tuple:
if s[0] in '()':
return s[0], s[1:]
else:
return None, s
def take_literal(s: str) -> tuple:
token = '"'
if s[0] != '"':
return None, s
for c in s[1:]:
token += c
if c == '"':
break
if not token.endswith('"'):
raise ValueError('Missing closing quotes (`"`)')
return token, s[len(token):]
def take_int_literal(s: str) -> tuple:
token = ''
for c in s:
if not c.isnumeric():
break
token += c
if token == '':
return None, s
return token, s[len(token):]
def take_bool_literal(s: str) -> tuple:
if s.startswith('True'):
return 'True', s[len('True'):]
if s.startswith('False'):
return 'False', s[len('False'):]
return None, s
def tokenize(s: str) -> list[str]:
operator_extractors = [
take_operator,
take_brace,
take_literal,
take_int_literal,
take_bool_literal,
take_space,
]
tokens = []
while s != '':
previous_len = len(s)
for operator_extractor in operator_extractors:
token, s = operator_extractor(s)
if token is not None:
tokens += [token]
break
if len(s) == previous_len:
raise ValueError(f'Could not tokenize string {s}')
return tokens
def parse(s: str) -> Expression:
tokens = tokenize(s)
return build_expression(tokens)
def window_new(configs: list[dict], debug: bool):
async def callback(ipc: i3ipc.aio.Connection, e: i3ipc.WorkspaceEvent):
assert e.change == 'new'
if debug:
print(json.dumps(e.ipc_data))
for i, cfg in enumerate(configs):
filter = cfg['filter']
workspace = cfg['workspace']
if filter.reduce(e.ipc_data):
container_id = e.ipc_data['container']['id']
await ipc.command(f'for_window [con_id="{container_id}"] focus')
await ipc.command(f'move container to workspace {workspace}')
configs.pop(i)
if not configs:
ipc.main_quit()
return callback
async def wait_signal(s):
event = asyncio.Event()
asyncio.get_running_loop().add_signal_handler(s, lambda: event.set())
await event.wait()
event.clear()
asyncio.get_running_loop().remove_signal_handler(s)
async def run(configs: list[dict], *, timeout: int, debug: bool):
window_configs = [c for c in configs if c.get('workspace') is not None]
ipc = await i3ipc.aio.Connection().connect() # we only wait for configs which spawn a window
ipc.on('window::new', window_new(window_configs, debug=debug))
variables = {
'pid': os.getpid(),
}
coroutines = []
for cfg in configs:
cfg['filter'] = parse(cfg['filter'])
p = cfg['program']
if isinstance(p, collections.abc.Iterable) and not isinstance(p, str):
p = ' '.join(p)
p = p.format(**variables)
coro = ipc.command(f'exec {p}')
if cfg.get('signal_continue', False):
await coro
await wait_signal(signal.Signals(cfg.get('signal_continue')))
else:
coroutines += [coro]
await asyncio.gather(*coroutines)
try:
if window_configs:
# run main loop only if we wait for something
await asyncio.wait_for(ipc.main(), timeout=timeout/1000)
except asyncio.TimeoutError:
return 1
return 0
@click.group()
@click.pass_context
@click.option('--debug', '-d', default=False, is_flag=True, help="Enable debug mode, will log ipc dictionary.")
def main(ctx, debug):
ctx.ensure_object(dict)
ctx.obj['DEBUG'] = debug
@main.command()
@click.pass_context
@click.option('--filter', '-f', default='True', help="A filter expression for the raw ipc dictionary.")
@click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.")
@click.option('--workspace', '-w', default=None, help="The workspace to move to.")
@click.argument('program', nargs=-1)
def simple(ctx, filter, timeout, workspace, program):
"""
Start a program and move it's created window to the desired i3 workspace.
\b
Exist status:
0 on success,
1 when no window has been found.
"""
debug = ctx.obj['DEBUG']
configs=[{"filter": filter, "workspace": workspace, "program": program}]
ctx.exit(asyncio.run(run(configs, timeout=timeout, debug=debug)))
@main.command()
@click.pass_context
@click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.")
@click.argument('config', type=click.File('r'), default='-')
def config(ctx, timeout, config):
"""
Start a program and move it's created window to the desired i3 workspace.
\b
Exist status:
0 on success,
1 when no window has been found.
"""
debug = ctx.obj['DEBUG']
config = yaml.load(config, Loader=SafeLoader)
ctx.exit(asyncio.run(run(config, timeout=timeout, debug=debug)))
if __name__ == '__main__':
main()