510 lines
15 KiB
Python
Executable file
510 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
import typing
|
|
import asyncio
|
|
import signal
|
|
import os
|
|
import time
|
|
import functools
|
|
import json
|
|
|
|
import yaml
|
|
import click
|
|
import pydantic
|
|
import i3ipc
|
|
import i3ipc.aio
|
|
|
|
try:
|
|
from yaml import CSafeLoader as SafeLoader
|
|
except ImportError:
|
|
from yaml import SafeLoader
|
|
|
|
class Expression:
|
|
def __init__(self):
|
|
pass
|
|
def reduce(self, ipc_data):
|
|
return functools.reduce(self.reduce_function(ipc_data), self.children)
|
|
@property
|
|
def children(self):
|
|
raise NotImplemented('TODO: implement in subclass')
|
|
def reduce_function(self, ipc_data):
|
|
raise NotImplemented('TODO: implement in subclass')
|
|
|
|
class LiteralExpression(Expression):
|
|
def __init__(self, value):
|
|
self._value = value
|
|
def __repr__(self) -> str:
|
|
return f'"{self._value}"'
|
|
@property
|
|
def children(self):
|
|
return [self._value]
|
|
def reduce_function(self, ipc_data):
|
|
def reduce(a, b):
|
|
raise NotImplemented('I should never be called')
|
|
|
|
class IntLiteralExpression(LiteralExpression):
|
|
def __repr__(self) -> str:
|
|
return str(self._value)
|
|
|
|
class BoolLiteralExpression(LiteralExpression):
|
|
def __repr__(self) -> str:
|
|
return str(self._value)
|
|
|
|
class AndExpression(Expression):
|
|
def __init__(self, children, *args, **kwargs):
|
|
self._children = children
|
|
super().__init__(*args, **kwargs)
|
|
def __repr__(self) -> str:
|
|
cs = ' '.join([repr(c) for c in self.children])
|
|
return f'(& {cs})'
|
|
@property
|
|
def children(self):
|
|
return self._children
|
|
def reduce_function(self, ipc_data):
|
|
return lambda a, b: a.reduce(ipc_data) and b.reduce(ipc_data)
|
|
|
|
class OrExpression(Expression):
|
|
def __init__(self, children, *args, **kwargs):
|
|
self._children = children
|
|
super().__init__(*args, **kwargs)
|
|
def __repr__(self) -> str:
|
|
cs = ' '.join([repr(c) for c in self.children])
|
|
return f'(| {cs})'
|
|
@property
|
|
def children(self):
|
|
return self._children
|
|
def reduce_function(self, ipc_data):
|
|
return lambda a, b: a.reduce(ipc_data) or b.reduce(ipc_data)
|
|
|
|
class EqExpression(Expression):
|
|
def __init__(self, children, *args, **kwargs):
|
|
self._children = children
|
|
super().__init__(*args, **kwargs)
|
|
def __repr__(self) -> str:
|
|
cs = ' '.join([repr(c) for c in self.children])
|
|
return f'(= {cs})'
|
|
@property
|
|
def children(self):
|
|
return self._children
|
|
def reduce_function(self, ipc_data):
|
|
def reduce(key, value):
|
|
ipc_value = ipc_data
|
|
for k in key.reduce(ipc_data).strip('.').split('.'):
|
|
ipc_value = ipc_value[k]
|
|
return ipc_value == value.reduce(ipc_data)
|
|
return reduce
|
|
|
|
class NeqExpression(Expression):
|
|
def __init__(self, children, *args, **kwargs):
|
|
self._children = children
|
|
super().__init__(*args, **kwargs)
|
|
def __repr__(self) -> str:
|
|
cs = ' '.join([repr(c) for c in self.children])
|
|
return f'(!= {cs})'
|
|
@property
|
|
def children(self):
|
|
return self._children
|
|
def reduce_function(self, ipc_data):
|
|
def reduce(key, value):
|
|
ipc_value = ipc_data
|
|
for k in key.reduce(ipc_data).strip('.').split('.'):
|
|
ipc_value = ipc_value[k]
|
|
return ipc_value != value.reduce(ipc_data)
|
|
return reduce
|
|
|
|
class GtExpression(Expression):
|
|
def __init__(self, children, *args, **kwargs):
|
|
self._children = children
|
|
super().__init__(*args, **kwargs)
|
|
def __repr__(self) -> str:
|
|
cs = ' '.join([repr(c) for c in self.children])
|
|
return f'(> {cs})'
|
|
@property
|
|
def children(self):
|
|
return self._children
|
|
def reduce_function(self, ipc_data):
|
|
def reduce(key, value):
|
|
ipc_value = ipc_data
|
|
for k in key.reduce(ipc_data).strip('.').split('.'):
|
|
ipc_value = ipc_value[k]
|
|
return ipc_value > value.reduce(ipc_data)
|
|
return reduce
|
|
|
|
class LtExpression(Expression):
|
|
def __init__(self, children, *args, **kwargs):
|
|
self._children = children
|
|
super().__init__(*args, **kwargs)
|
|
def __repr__(self) -> str:
|
|
cs = ' '.join([repr(c) for c in self.children])
|
|
return f'(< {cs})'
|
|
@property
|
|
def children(self):
|
|
return self._children
|
|
def reduce_function(self, ipc_data):
|
|
def reduce(key, value):
|
|
ipc_value = ipc_data
|
|
for k in key.reduce(ipc_data).strip('.').split('.'):
|
|
ipc_value = ipc_value[k]
|
|
return ipc_value < value.reduce(ipc_data)
|
|
return reduce
|
|
|
|
expression_mapping = {
|
|
'&': AndExpression,
|
|
'|': OrExpression,
|
|
'=': EqExpression,
|
|
'!=': NeqExpression,
|
|
'>': GtExpression,
|
|
'<': LtExpression,
|
|
}
|
|
|
|
def group_tokens(tokens: list[str]) -> list[list[str]]:
|
|
groups = []
|
|
current_group = []
|
|
|
|
brace_count = 0
|
|
for token in tokens:
|
|
if token == '(':
|
|
brace_count += 1
|
|
elif token == ')':
|
|
brace_count -= 1
|
|
if brace_count == 0:
|
|
groups += [current_group]
|
|
current_group = []
|
|
elif brace_count == 0:
|
|
groups += [[token]]
|
|
else:
|
|
current_group += [token]
|
|
return groups
|
|
|
|
def build_expression(tokens: list[str]) -> Expression:
|
|
if tokens[0] == '(' and tokens[-1] == ')':
|
|
tokens = tokens[1:-1]
|
|
token_groups = group_tokens(tokens)
|
|
|
|
expressions = [build_expression(ts) for ts in token_groups[1:]] if len(token_groups) > 1 else []
|
|
root_expression = None
|
|
token = token_groups[0][0]
|
|
if token in expression_mapping:
|
|
root_expression = expression_mapping[token](expressions)
|
|
elif token.startswith('"'):
|
|
root_expression = LiteralExpression(token[1:-1])
|
|
elif token.isnumeric():
|
|
root_expression = IntLiteralExpression(int(token))
|
|
elif token in ('True', 'False'):
|
|
root_expression = BoolLiteralExpression(token == 'True')
|
|
assert isinstance(root_expression, Expression)
|
|
return root_expression
|
|
|
|
def take_space(s: str) -> tuple:
|
|
if s[0] in ' \n':
|
|
return None, s[1:]
|
|
return None, s
|
|
|
|
def take_operator(s: str) -> tuple:
|
|
token = ''
|
|
for c in s:
|
|
if c in ''.join(set(expression_mapping.keys())):
|
|
token += c
|
|
else:
|
|
break
|
|
if token == '':
|
|
return None, s
|
|
else:
|
|
return token, s[len(token):]
|
|
|
|
def take_brace(s: str) -> tuple:
|
|
if s[0] in '()':
|
|
return s[0], s[1:]
|
|
else:
|
|
return None, s
|
|
|
|
def take_literal(s: str) -> tuple:
|
|
token = '"'
|
|
if s[0] != '"':
|
|
return None, s
|
|
for c in s[1:]:
|
|
token += c
|
|
if c == '"':
|
|
break
|
|
if not token.endswith('"'):
|
|
raise ValueError('Missing closing quotes (`"`)')
|
|
return token, s[len(token):]
|
|
|
|
def take_int_literal(s: str) -> tuple:
|
|
token = ''
|
|
for c in s:
|
|
if not c.isnumeric():
|
|
break
|
|
token += c
|
|
if token == '':
|
|
return None, s
|
|
return token, s[len(token):]
|
|
|
|
def take_bool_literal(s: str) -> tuple:
|
|
if s.startswith('True'):
|
|
return 'True', s[len('True'):]
|
|
if s.startswith('False'):
|
|
return 'False', s[len('False'):]
|
|
return None, s
|
|
|
|
def tokenize(s: str) -> list[str]:
|
|
operator_extractors = [
|
|
take_operator,
|
|
take_brace,
|
|
take_literal,
|
|
take_int_literal,
|
|
take_bool_literal,
|
|
take_space,
|
|
]
|
|
tokens = []
|
|
while s != '':
|
|
previous_len = len(s)
|
|
for operator_extractor in operator_extractors:
|
|
token, s = operator_extractor(s)
|
|
if token is not None:
|
|
tokens += [token]
|
|
break
|
|
if len(s) == previous_len:
|
|
raise ValueError(f'Could not tokenize string {s}')
|
|
return tokens
|
|
|
|
def parse(s: str) -> Expression:
|
|
tokens = tokenize(s)
|
|
return build_expression(tokens)
|
|
|
|
class Filter(Expression):
|
|
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, str):
|
|
raise TypeError('Must be string')
|
|
return parse(v)
|
|
|
|
class Command(str):
|
|
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, (str, list, tuple)):
|
|
raise TypeError('Must be string or list')
|
|
if isinstance(v, (list, tuple)):
|
|
v = ' '.join([f"'{x}'" for x in v])
|
|
return v
|
|
|
|
class Signal(int):
|
|
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, (str, int)):
|
|
raise TypeError('Must be string or int')
|
|
if isinstance(v, str) and v.isnumeric():
|
|
return signal.Signals(int(v))
|
|
elif isinstance(v, int):
|
|
return signal.Signals(v)
|
|
return getattr(signal.Signals, v)
|
|
|
|
class Lock(asyncio.Lock):
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, asyncio.Lock):
|
|
raise TypeError('Must be a asyncio.Lock')
|
|
return v
|
|
|
|
class Event(asyncio.Event):
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, asyncio.Event):
|
|
raise TypeError('Must be a asyncio.Event')
|
|
return v
|
|
|
|
class Connection(i3ipc.aio.Connection):
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, i3ipc.aio.Connection):
|
|
raise TypeError('Must be a i3ipc.aio.Connection')
|
|
return v
|
|
|
|
|
|
class ProgramConfig(pydantic.BaseModel):
|
|
cmd: Command
|
|
workspace: typing.Optional[str] = None
|
|
signal: bool = False
|
|
timeout: int = 1000
|
|
match: Filter
|
|
|
|
class Config(pydantic.BaseModel):
|
|
signal: typing.Optional[Signal] = None
|
|
timeout: int = 3000
|
|
programs: typing.List[ProgramConfig]
|
|
final_workspace: typing.Optional[str] = None
|
|
|
|
class RuntimeData(pydantic.BaseModel):
|
|
programs: typing.List[ProgramConfig] = []
|
|
lock: Lock
|
|
event: Event
|
|
ipc: Connection
|
|
|
|
def window_new(runtime_data: RuntimeData, *, debug):
|
|
async def callback(ipc: i3ipc.aio.Connection, e: i3ipc.WorkspaceEvent):
|
|
assert e.change == 'new'
|
|
if debug:
|
|
print(json.dumps(e.ipc_data))
|
|
async with runtime_data.lock:
|
|
for i, cfg in enumerate(runtime_data.programs):
|
|
if cfg.match.reduce(e.ipc_data):
|
|
container_id = e.ipc_data['container']['id']
|
|
await ipc.command(f'for_window [con_id="{container_id}"] focus')
|
|
await ipc.command(f'move container to workspace {cfg.workspace}')
|
|
runtime_data.programs.pop(i)
|
|
if not runtime_data.programs:
|
|
ipc.main_quit()
|
|
return callback
|
|
|
|
async def wait_signal(rt: RuntimeData):
|
|
await rt.event.wait()
|
|
rt.event.clear()
|
|
|
|
async def coro_wait_signal(coro, rt: RuntimeData):
|
|
await coro
|
|
await wait_signal(rt)
|
|
|
|
async def init(config: Config, *, debug: bool) -> RuntimeData:
|
|
rd = RuntimeData(
|
|
programs=[p for p in config.programs if p.workspace is not None],
|
|
lock=Lock(),
|
|
event=Event(),
|
|
ipc=Connection(),
|
|
)
|
|
if config.signal is not None:
|
|
asyncio.get_running_loop().add_signal_handler(config.signal, lambda: rd.event.set())
|
|
return rd
|
|
|
|
async def run(config: Config, *, debug: bool):
|
|
runtime_data = await init(config, debug=debug)
|
|
await runtime_data.ipc.connect()
|
|
runtime_data.ipc.on('window::new', window_new(runtime_data, debug=debug))
|
|
|
|
variables = {
|
|
'pid': os.getpid(),
|
|
}
|
|
coroutines = []
|
|
timeout = config.timeout
|
|
started_at = time.monotonic_ns()
|
|
for cfg in config.programs:
|
|
p = cfg.cmd.format(**variables)
|
|
coro = runtime_data.ipc.command(f'exec {p}')
|
|
if cfg.signal:
|
|
coro = coro_wait_signal(coro, runtime_data)
|
|
if cfg.timeout is not None:
|
|
timeout = max(timeout, cfg.timeout)
|
|
try:
|
|
await asyncio.wait_for(coro, timeout=cfg.timeout/1000 if cfg.timeout is not None else 0)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
else:
|
|
coroutines += [coro]
|
|
await asyncio.gather(*coroutines)
|
|
try:
|
|
if runtime_data.programs:
|
|
# run main loop only if we wait for something
|
|
diff = (time.monotonic_ns() - started_at) / (1000*1000)
|
|
new_timeout = max(timeout - diff, 0)
|
|
await asyncio.wait_for(runtime_data.ipc.main(), timeout=new_timeout/1000)
|
|
except asyncio.TimeoutError:
|
|
return 1
|
|
finally:
|
|
if config.final_workspace is not None:
|
|
await runtime_data.ipc.command(f'workspace {config.final_workspace}')
|
|
return 0
|
|
|
|
@click.group()
|
|
@click.pass_context
|
|
@click.option('--debug', '-d', default=False, is_flag=True, help="Enable debug mode, will log ipc dictionary.")
|
|
def main(ctx, debug):
|
|
ctx.ensure_object(dict)
|
|
ctx.obj['DEBUG'] = debug
|
|
|
|
@main.command()
|
|
@click.pass_context
|
|
@click.option('--filter', '-f', default='True', help="A filter expression for the raw ipc dictionary.")
|
|
@click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.")
|
|
@click.option('--workspace', '-w', default=None, help="The workspace to move to.")
|
|
@click.argument('command', nargs=-1)
|
|
def simple(ctx, filter, timeout, workspace, command):
|
|
"""
|
|
Start a program and move it's created window to the desired i3 workspace.
|
|
|
|
\b
|
|
Exist status:
|
|
0 on success,
|
|
1 when no window has been found.
|
|
"""
|
|
debug = ctx.obj['DEBUG']
|
|
config = Config(programs=[ProgramConfig(
|
|
cmd=command,
|
|
workspace=workspace,
|
|
match=filter,
|
|
)], timeout=timeout)
|
|
ctx.exit(asyncio.run(run(config, debug=debug)))
|
|
|
|
@main.command()
|
|
@click.pass_context
|
|
@click.argument('config', type=click.File('r'), default='-')
|
|
def config(ctx, config):
|
|
"""
|
|
Start a program and move it's created window to the desired i3 workspace.
|
|
|
|
\b
|
|
Exist status:
|
|
0 on success,
|
|
1 when no window has been found.
|
|
"""
|
|
debug = ctx.obj['DEBUG']
|
|
config = Config(**yaml.load(config, Loader=SafeLoader))
|
|
ctx.exit(asyncio.run(run(config, debug=debug)))
|
|
|
|
if __name__ == '__main__':
|
|
main()
|