From 794b032ccae5e640d0dce9c5e0846c0e152d13db Mon Sep 17 00:00:00 2001 From: redxef Date: Sat, 29 Oct 2022 14:04:31 +0200 Subject: [PATCH] Improve stability and input validation. --- i3toolwait | 216 ++++++++++++++++++++++++++++++++++++----------- requirements.txt | 3 +- 2 files changed, 168 insertions(+), 51 deletions(-) diff --git a/i3toolwait b/i3toolwait index 6fc5c42..c24396f 100755 --- a/i3toolwait +++ b/i3toolwait @@ -3,6 +3,7 @@ import collections.abc +import typing import asyncio import signal import os @@ -12,6 +13,7 @@ import json import yaml import click +import pydantic import i3ipc import i3ipc.aio @@ -273,74 +275,185 @@ def parse(s: str) -> Expression: tokens = tokenize(s) return build_expression(tokens) -def window_new(configs: list[dict], debug: bool, *, lock): +class Filter(Expression): + + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def __modify_schema__(cls, field_schema): + pass + + @classmethod + def validate(cls, v): + if not isinstance(v, str): + raise TypeError('Must be string') + return parse(v) + +class Command(str): + + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def __modify_schema__(cls, field_schema): + pass + + @classmethod + def validate(cls, v): + if not isinstance(v, (str, list)): + raise TypeError('Must be string or list') + if isinstance(v, list): + v = ' '.join([f"'{x}'" for x in v]) + return v + +class Signal(int): + + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def __modify_schema__(cls, field_schema): + pass + + @classmethod + def validate(cls, v): + if not isinstance(v, (str, int)): + raise TypeError('Must be string or int') + if isinstance(v, str) and v.isnumeric(): + return signal.Signals(int(v)) + elif isinstance(v, int): + return signal.Signals(v) + return getattr(signal.Signals, v) + +class Lock(asyncio.Lock): + @classmethod + def __get_validators__(cls): + yield cls.validate + @classmethod + def __modify_schema__(cls, field_schema): + pass + @classmethod + def validate(cls, v): + if not isinstance(v, asyncio.Lock): + raise TypeError('Must be a asyncio.Lock') + return v + +class Event(asyncio.Event): + @classmethod + def __get_validators__(cls): + yield cls.validate + @classmethod + def __modify_schema__(cls, field_schema): + pass + @classmethod + def validate(cls, v): + if not isinstance(v, asyncio.Event): + raise TypeError('Must be a asyncio.Event') + return v + +class Connection(i3ipc.aio.Connection): + @classmethod + def __get_validators__(cls): + yield cls.validate + @classmethod + def __modify_schema__(cls, field_schema): + pass + @classmethod + def validate(cls, v): + if not isinstance(v, i3ipc.aio.Connection): + raise TypeError('Must be a i3ipc.aio.Connection') + return v + + +class ProgramConfig(pydantic.BaseModel): + cmd: Command + workspace: typing.Optional[str] = None + signal: bool = False + timeout: int = 1000 + match: Filter + +class Config(pydantic.BaseModel): + signal: typing.Optional[Signal] = None + timeout: int = 3000 + programs: typing.List[ProgramConfig] + +class RuntimeData(pydantic.BaseModel): + programs: typing.List[ProgramConfig] = [] + lock: Lock + event: Event + ipc: Connection + +def window_new(runtime_data: RuntimeData, *, debug): async def callback(ipc: i3ipc.aio.Connection, e: i3ipc.WorkspaceEvent): assert e.change == 'new' if debug: print(json.dumps(e.ipc_data)) - for i, cfg in enumerate(configs): - filter = cfg['filter'] - workspace = cfg['workspace'] - if filter.reduce(e.ipc_data): - container_id = e.ipc_data['container']['id'] - async with lock: + async with runtime_data.lock: + for i, cfg in enumerate(runtime_data.programs): + if cfg.match.reduce(e.ipc_data): + container_id = e.ipc_data['container']['id'] await ipc.command(f'for_window [con_id="{container_id}"] focus') - await ipc.command(f'move container to workspace {workspace}') + await ipc.command(f'move container to workspace {cfg.workspace}') await asyncio.sleep(1) - configs.pop(i) - if not configs: - ipc.main_quit() + runtime_data.programs.pop(i) + if not runtime_data.programs: + ipc.main_quit() return callback -async def wait_signal(s): - event = asyncio.Event() - asyncio.get_running_loop().add_signal_handler(s, lambda: event.set()) - await event.wait() - event.clear() - asyncio.get_running_loop().remove_signal_handler(s) - asyncio.get_running_loop().add_signal_handler(s, lambda: None) +async def wait_signal(rt: RuntimeData): + await rt.event.wait() + rt.event.clear() -async def coro_wait_signal(coro, s): +async def coro_wait_signal(coro, rt: RuntimeData): await coro - await wait_signal(s) + await wait_signal(rt) -async def run(configs: list[dict], *, timeout: int, debug: bool): - window_configs = [c for c in configs if c.get('workspace') is not None] - lock = asyncio.Lock() - ipc = await i3ipc.aio.Connection().connect() # we only wait for configs which spawn a window - ipc.on('window::new', window_new(window_configs, debug=debug, lock=lock)) +async def init(config: Config, *, debug: bool) -> RuntimeData: + rd = RuntimeData( + programs=[p for p in config.programs if p.workspace is not None], + lock=Lock(), + event=Event(), + ipc=Connection(), + ) + if config.signal is not None: + asyncio.get_running_loop().add_signal_handler(config.signal, lambda: rd.event.set()) + return rd + +async def run(config: Config, *, debug: bool): + runtime_data = await init(config, debug=debug) + await runtime_data.ipc.connect() + runtime_data.ipc.on('window::new', window_new(runtime_data, debug=debug)) variables = { 'pid': os.getpid(), } coroutines = [] - timeouts = [timeout] + timeout = config.timeout started_at = time.monotonic_ns() - for cfg in configs: - cfg['filter'] = parse(cfg['filter']) - p = cfg['program'] - t = cfg.get('timeout', timeout) - timeouts += [t] - if isinstance(p, collections.abc.Iterable) and not isinstance(p, str): - p = ' '.join(p) - p = p.format(**variables) - coro = ipc.command(f'exec {p}') - if cfg.get('signal_continue', False): - sig = signal.Signals(cfg.get('signal_continue')) - coro = coro_wait_signal(coro, sig) + for cfg in config.programs: + p = cfg.cmd.format(**variables) + coro = runtime_data.ipc.command(f'exec {p}') + if cfg.signal: + coro = coro_wait_signal(coro, runtime_data) + if cfg.timeout is not None: + timeout = max(timeout, cfg.timeout) try: - await asyncio.wait_for(coro, timeout=t/1000) + await asyncio.wait_for(coro, timeout=cfg.timeout/1000 if cfg.timeout is not None else 0) except asyncio.TimeoutError: pass else: coroutines += [coro] await asyncio.gather(*coroutines) try: - if window_configs: + if runtime_data.programs: # run main loop only if we wait for something diff = (time.monotonic_ns() - started_at) / (1000*1000) - new_timeout = max(max(*timeouts) - diff, 0) - await asyncio.wait_for(ipc.main(), timeout=new_timeout/1000) + new_timeout = max(timeout - diff, 0) + await asyncio.wait_for(runtime_data.ipc.main(), timeout=new_timeout/1000) except asyncio.TimeoutError: return 1 return 0 @@ -357,8 +470,8 @@ def main(ctx, debug): @click.option('--filter', '-f', default='True', help="A filter expression for the raw ipc dictionary.") @click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.") @click.option('--workspace', '-w', default=None, help="The workspace to move to.") -@click.argument('program', nargs=-1) -def simple(ctx, filter, timeout, workspace, program): +@click.argument('command', nargs=-1) +def simple(ctx, filter, timeout, workspace, command): """ Start a program and move it's created window to the desired i3 workspace. @@ -368,14 +481,17 @@ def simple(ctx, filter, timeout, workspace, program): 1 when no window has been found. """ debug = ctx.obj['DEBUG'] - configs=[{"filter": filter, "workspace": workspace, "program": program}] - ctx.exit(asyncio.run(run(configs, timeout=timeout, debug=debug))) + config = Config(programs=[ProgramConfig( + cmd=command, + workspace=workspace, + match=filter, + )], timeout=timeout) + ctx.exit(asyncio.run(run(config, debug=debug))) @main.command() @click.pass_context -@click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.") @click.argument('config', type=click.File('r'), default='-') -def config(ctx, timeout, config): +def config(ctx, config): """ Start a program and move it's created window to the desired i3 workspace. @@ -385,8 +501,8 @@ def config(ctx, timeout, config): 1 when no window has been found. """ debug = ctx.obj['DEBUG'] - config = yaml.load(config, Loader=SafeLoader) - ctx.exit(asyncio.run(run(config, timeout=timeout, debug=debug))) + config = Config(**yaml.load(config, Loader=SafeLoader)) + ctx.exit(asyncio.run(run(config, debug=debug))) if __name__ == '__main__': main() diff --git a/requirements.txt b/requirements.txt index 45c65c9..634d488 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ click -i3ipc +pydantic pyyaml +i3ipc