510 lines
15 KiB
Executable file
510 lines
15 KiB
Executable file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import typing
import asyncio
import signal
import os
import time
import functools
import json
import yaml
import click
import pydantic
import i3ipc
import i3ipc.aio
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader
class Expression:
def __init__(self):
def reduce(self, ipc_data):
return functools.reduce(self.reduce_function(ipc_data), self.children)
def children(self):
raise NotImplemented('TODO: implement in subclass')
def reduce_function(self, ipc_data):
raise NotImplemented('TODO: implement in subclass')
class LiteralExpression(Expression):
def __init__(self, value):
self._value = value
def __repr__(self) -> str:
return f'"{self._value}"'
def children(self):
return [self._value]
def reduce_function(self, ipc_data):
def reduce(a, b):
raise NotImplemented('I should never be called')
class IntLiteralExpression(LiteralExpression):
def __repr__(self) -> str:
return str(self._value)
class BoolLiteralExpression(LiteralExpression):
def __repr__(self) -> str:
return str(self._value)
class AndExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(& {cs})'
def children(self):
return self._children
def reduce_function(self, ipc_data):
return lambda a, b: a.reduce(ipc_data) and b.reduce(ipc_data)
class OrExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(| {cs})'
def children(self):
return self._children
def reduce_function(self, ipc_data):
return lambda a, b: a.reduce(ipc_data) or b.reduce(ipc_data)
class EqExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(= {cs})'
def children(self):
return self._children
def reduce_function(self, ipc_data):
def reduce(key, value):
ipc_value = ipc_data
for k in key.reduce(ipc_data).strip('.').split('.'):
ipc_value = ipc_value[k]
return ipc_value == value.reduce(ipc_data)
return reduce
class NeqExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(!= {cs})'
def children(self):
return self._children
def reduce_function(self, ipc_data):
def reduce(key, value):
ipc_value = ipc_data
for k in key.reduce(ipc_data).strip('.').split('.'):
ipc_value = ipc_value[k]
return ipc_value != value.reduce(ipc_data)
return reduce
class GtExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(> {cs})'
def children(self):
return self._children
def reduce_function(self, ipc_data):
def reduce(key, value):
ipc_value = ipc_data
for k in key.reduce(ipc_data).strip('.').split('.'):
ipc_value = ipc_value[k]
return ipc_value > value.reduce(ipc_data)
return reduce
class LtExpression(Expression):
def __init__(self, children, *args, **kwargs):
self._children = children
super().__init__(*args, **kwargs)
def __repr__(self) -> str:
cs = ' '.join([repr(c) for c in self.children])
return f'(< {cs})'
def children(self):
return self._children
def reduce_function(self, ipc_data):
def reduce(key, value):
ipc_value = ipc_data
for k in key.reduce(ipc_data).strip('.').split('.'):
ipc_value = ipc_value[k]
return ipc_value < value.reduce(ipc_data)
return reduce
expression_mapping = {
'&': AndExpression,
'|': OrExpression,
'=': EqExpression,
'!=': NeqExpression,
'>': GtExpression,
'<': LtExpression,
def group_tokens(tokens: list[str]) -> list[list[str]]:
groups = []
current_group = []
brace_count = 0
for token in tokens:
if token == '(':
brace_count += 1
elif token == ')':
brace_count -= 1
if brace_count == 0:
groups += [current_group]
current_group = []
elif brace_count == 0:
groups += [[token]]
current_group += [token]
return groups
def build_expression(tokens: list[str]) -> Expression:
if tokens[0] == '(' and tokens[-1] == ')':
tokens = tokens[1:-1]
token_groups = group_tokens(tokens)
expressions = [build_expression(ts) for ts in token_groups[1:]] if len(token_groups) > 1 else []
root_expression = None
token = token_groups[0][0]
if token in expression_mapping:
root_expression = expression_mapping[token](expressions)
elif token.startswith('"'):
root_expression = LiteralExpression(token[1:-1])
elif token.isnumeric():
root_expression = IntLiteralExpression(int(token))
elif token in ('True', 'False'):
root_expression = BoolLiteralExpression(token == 'True')
assert isinstance(root_expression, Expression)
return root_expression
def take_space(s: str) -> tuple:
if s[0] in ' \n':
return None, s[1:]
return None, s
def take_operator(s: str) -> tuple:
token = ''
for c in s:
if c in ''.join(set(expression_mapping.keys())):
token += c
if token == '':
return None, s
return token, s[len(token):]
def take_brace(s: str) -> tuple:
if s[0] in '()':
return s[0], s[1:]
return None, s
def take_literal(s: str) -> tuple:
token = '"'
if s[0] != '"':
return None, s
for c in s[1:]:
token += c
if c == '"':
if not token.endswith('"'):
raise ValueError('Missing closing quotes (`"`)')
return token, s[len(token):]
def take_int_literal(s: str) -> tuple:
token = ''
for c in s:
if not c.isnumeric():
token += c
if token == '':
return None, s
return token, s[len(token):]
def take_bool_literal(s: str) -> tuple:
if s.startswith('True'):
return 'True', s[len('True'):]
if s.startswith('False'):
return 'False', s[len('False'):]
return None, s
def tokenize(s: str) -> list[str]:
operator_extractors = [
tokens = []
while s != '':
previous_len = len(s)
for operator_extractor in operator_extractors:
token, s = operator_extractor(s)
if token is not None:
tokens += [token]
if len(s) == previous_len:
raise ValueError(f'Could not tokenize string {s}')
return tokens
def parse(s: str) -> Expression:
tokens = tokenize(s)
return build_expression(tokens)
class Filter(Expression):
def __get_validators__(cls):
yield cls.validate
def __modify_schema__(cls, field_schema):
def validate(cls, v):
if not isinstance(v, str):
raise TypeError('Must be string')
return parse(v)
class Command(str):
def __get_validators__(cls):
yield cls.validate
def __modify_schema__(cls, field_schema):
def validate(cls, v):
if not isinstance(v, (str, list, tuple)):
raise TypeError('Must be string or list')
if isinstance(v, (list, tuple)):
v = ' '.join([f"'{x}'" for x in v])
return v
class Signal(int):
def __get_validators__(cls):
yield cls.validate
def __modify_schema__(cls, field_schema):
def validate(cls, v):
if not isinstance(v, (str, int)):
raise TypeError('Must be string or int')
if isinstance(v, str) and v.isnumeric():
return signal.Signals(int(v))
elif isinstance(v, int):
return signal.Signals(v)
return getattr(signal.Signals, v)
class Lock(asyncio.Lock):
def __get_validators__(cls):
yield cls.validate
def __modify_schema__(cls, field_schema):
def validate(cls, v):
if not isinstance(v, asyncio.Lock):
raise TypeError('Must be a asyncio.Lock')
return v
class Event(asyncio.Event):
def __get_validators__(cls):
yield cls.validate
def __modify_schema__(cls, field_schema):
def validate(cls, v):
if not isinstance(v, asyncio.Event):
raise TypeError('Must be a asyncio.Event')
return v
class Connection(i3ipc.aio.Connection):
def __get_validators__(cls):
yield cls.validate
def __modify_schema__(cls, field_schema):
def validate(cls, v):
if not isinstance(v, i3ipc.aio.Connection):
raise TypeError('Must be a i3ipc.aio.Connection')
return v
class ProgramConfig(pydantic.BaseModel):
cmd: Command
workspace: typing.Optional[str] = None
signal: bool = False
timeout: int = 1000
match: Filter
class Config(pydantic.BaseModel):
signal: typing.Optional[Signal] = None
timeout: int = 3000
programs: typing.List[ProgramConfig]
final_workspace: typing.Optional[str] = None
class RuntimeData(pydantic.BaseModel):
programs: typing.List[ProgramConfig] = []
lock: Lock
event: Event
ipc: Connection
def window_new(runtime_data: RuntimeData, *, debug):
async def callback(ipc: i3ipc.aio.Connection, e: i3ipc.WorkspaceEvent):
assert e.change == 'new'
if debug:
async with runtime_data.lock:
for i, cfg in enumerate(runtime_data.programs):
if cfg.match.reduce(e.ipc_data):
container_id = e.ipc_data['container']['id']
await ipc.command(f'for_window [con_id="{container_id}"] focus')
await ipc.command(f'move container to workspace {cfg.workspace}')
if not runtime_data.programs:
return callback
async def wait_signal(rt: RuntimeData):
await rt.event.wait()
async def coro_wait_signal(coro, rt: RuntimeData):
await coro
await wait_signal(rt)
async def init(config: Config, *, debug: bool) -> RuntimeData:
rd = RuntimeData(
programs=[p for p in config.programs if p.workspace is not None],
if config.signal is not None:
asyncio.get_running_loop().add_signal_handler(config.signal, lambda: rd.event.set())
return rd
async def run(config: Config, *, debug: bool):
runtime_data = await init(config, debug=debug)
await runtime_data.ipc.connect()
runtime_data.ipc.on('window::new', window_new(runtime_data, debug=debug))
variables = {
'pid': os.getpid(),
coroutines = []
timeout = config.timeout
started_at = time.monotonic_ns()
for cfg in config.programs:
p = cfg.cmd.format(**variables)
coro = runtime_data.ipc.command(f'exec {p}')
if cfg.signal:
coro = coro_wait_signal(coro, runtime_data)
if cfg.timeout is not None:
timeout = max(timeout, cfg.timeout)
await asyncio.wait_for(coro, timeout=cfg.timeout/1000 if cfg.timeout is not None else 0)
except asyncio.TimeoutError:
coroutines += [coro]
await asyncio.gather(*coroutines)
if runtime_data.programs:
# run main loop only if we wait for something
diff = (time.monotonic_ns() - started_at) / (1000*1000)
new_timeout = max(timeout - diff, 0)
await asyncio.wait_for(runtime_data.ipc.main(), timeout=new_timeout/1000)
except asyncio.TimeoutError:
return 1
if config.final_workspace is not None:
await runtime_data.ipc.command(f'workspace {config.final_workspace}')
return 0
@click.option('--debug', '-d', default=False, is_flag=True, help="Enable debug mode, will log ipc dictionary.")
def main(ctx, debug):
ctx.obj['DEBUG'] = debug
@click.option('--filter', '-f', default='True', help="A filter expression for the raw ipc dictionary.")
@click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.")
@click.option('--workspace', '-w', default=None, help="The workspace to move to.")
@click.argument('command', nargs=-1)
def simple(ctx, filter, timeout, workspace, command):
Start a program and move it's created window to the desired i3 workspace.
Exist status:
0 on success,
1 when no window has been found.
debug = ctx.obj['DEBUG']
config = Config(programs=[ProgramConfig(
)], timeout=timeout)
ctx.exit(asyncio.run(run(config, debug=debug)))
@click.argument('config', type=click.File('r'), default='-')
def config(ctx, config):
Start a program and move it's created window to the desired i3 workspace.
Exist status:
0 on success,
1 when no window has been found.
debug = ctx.obj['DEBUG']
config = Config(**yaml.load(config, Loader=SafeLoader))
ctx.exit(asyncio.run(run(config, debug=debug)))
if __name__ == '__main__':