Improve stability and input validation.

This commit is contained in:
redxef 2022-10-29 14:04:31 +02:00
parent ba5d560a01
commit 794b032cca
Signed by: redxef
GPG key ID: 7DAC3AA211CBD921
2 changed files with 168 additions and 51 deletions

View file

@ -3,6 +3,7 @@
import collections.abc import collections.abc
import typing
import asyncio import asyncio
import signal import signal
import os import os
@ -12,6 +13,7 @@ import json
import yaml import yaml
import click import click
import pydantic
import i3ipc import i3ipc
import i3ipc.aio import i3ipc.aio
@ -273,74 +275,185 @@ def parse(s: str) -> Expression:
tokens = tokenize(s) tokens = tokenize(s)
return build_expression(tokens) return build_expression(tokens)
def window_new(configs: list[dict], debug: bool, *, lock): class Filter(Expression):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def __modify_schema__(cls, field_schema):
pass
@classmethod
def validate(cls, v):
if not isinstance(v, str):
raise TypeError('Must be string')
return parse(v)
class Command(str):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def __modify_schema__(cls, field_schema):
pass
@classmethod
def validate(cls, v):
if not isinstance(v, (str, list)):
raise TypeError('Must be string or list')
if isinstance(v, list):
v = ' '.join([f"'{x}'" for x in v])
return v
class Signal(int):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def __modify_schema__(cls, field_schema):
pass
@classmethod
def validate(cls, v):
if not isinstance(v, (str, int)):
raise TypeError('Must be string or int')
if isinstance(v, str) and v.isnumeric():
return signal.Signals(int(v))
elif isinstance(v, int):
return signal.Signals(v)
return getattr(signal.Signals, v)
class Lock(asyncio.Lock):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def __modify_schema__(cls, field_schema):
pass
@classmethod
def validate(cls, v):
if not isinstance(v, asyncio.Lock):
raise TypeError('Must be a asyncio.Lock')
return v
class Event(asyncio.Event):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def __modify_schema__(cls, field_schema):
pass
@classmethod
def validate(cls, v):
if not isinstance(v, asyncio.Event):
raise TypeError('Must be a asyncio.Event')
return v
class Connection(i3ipc.aio.Connection):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def __modify_schema__(cls, field_schema):
pass
@classmethod
def validate(cls, v):
if not isinstance(v, i3ipc.aio.Connection):
raise TypeError('Must be a i3ipc.aio.Connection')
return v
class ProgramConfig(pydantic.BaseModel):
cmd: Command
workspace: typing.Optional[str] = None
signal: bool = False
timeout: int = 1000
match: Filter
class Config(pydantic.BaseModel):
signal: typing.Optional[Signal] = None
timeout: int = 3000
programs: typing.List[ProgramConfig]
class RuntimeData(pydantic.BaseModel):
programs: typing.List[ProgramConfig] = []
lock: Lock
event: Event
ipc: Connection
def window_new(runtime_data: RuntimeData, *, debug):
async def callback(ipc: i3ipc.aio.Connection, e: i3ipc.WorkspaceEvent): async def callback(ipc: i3ipc.aio.Connection, e: i3ipc.WorkspaceEvent):
assert e.change == 'new' assert e.change == 'new'
if debug: if debug:
print(json.dumps(e.ipc_data)) print(json.dumps(e.ipc_data))
for i, cfg in enumerate(configs): async with runtime_data.lock:
filter = cfg['filter'] for i, cfg in enumerate(runtime_data.programs):
workspace = cfg['workspace'] if cfg.match.reduce(e.ipc_data):
if filter.reduce(e.ipc_data):
container_id = e.ipc_data['container']['id'] container_id = e.ipc_data['container']['id']
async with lock:
await ipc.command(f'for_window [con_id="{container_id}"] focus') await ipc.command(f'for_window [con_id="{container_id}"] focus')
await ipc.command(f'move container to workspace {workspace}') await ipc.command(f'move container to workspace {cfg.workspace}')
await asyncio.sleep(1) await asyncio.sleep(1)
configs.pop(i) runtime_data.programs.pop(i)
if not configs: if not runtime_data.programs:
ipc.main_quit() ipc.main_quit()
return callback return callback
async def wait_signal(s): async def wait_signal(rt: RuntimeData):
event = asyncio.Event() await rt.event.wait()
asyncio.get_running_loop().add_signal_handler(s, lambda: event.set()) rt.event.clear()
await event.wait()
event.clear()
asyncio.get_running_loop().remove_signal_handler(s)
asyncio.get_running_loop().add_signal_handler(s, lambda: None)
async def coro_wait_signal(coro, s): async def coro_wait_signal(coro, rt: RuntimeData):
await coro await coro
await wait_signal(s) await wait_signal(rt)
async def run(configs: list[dict], *, timeout: int, debug: bool): async def init(config: Config, *, debug: bool) -> RuntimeData:
window_configs = [c for c in configs if c.get('workspace') is not None] rd = RuntimeData(
lock = asyncio.Lock() programs=[p for p in config.programs if p.workspace is not None],
ipc = await i3ipc.aio.Connection().connect() # we only wait for configs which spawn a window lock=Lock(),
ipc.on('window::new', window_new(window_configs, debug=debug, lock=lock)) event=Event(),
ipc=Connection(),
)
if config.signal is not None:
asyncio.get_running_loop().add_signal_handler(config.signal, lambda: rd.event.set())
return rd
async def run(config: Config, *, debug: bool):
runtime_data = await init(config, debug=debug)
await runtime_data.ipc.connect()
runtime_data.ipc.on('window::new', window_new(runtime_data, debug=debug))
variables = { variables = {
'pid': os.getpid(), 'pid': os.getpid(),
} }
coroutines = [] coroutines = []
timeouts = [timeout] timeout = config.timeout
started_at = time.monotonic_ns() started_at = time.monotonic_ns()
for cfg in configs: for cfg in config.programs:
cfg['filter'] = parse(cfg['filter']) p = cfg.cmd.format(**variables)
p = cfg['program'] coro = runtime_data.ipc.command(f'exec {p}')
t = cfg.get('timeout', timeout) if cfg.signal:
timeouts += [t] coro = coro_wait_signal(coro, runtime_data)
if isinstance(p, collections.abc.Iterable) and not isinstance(p, str): if cfg.timeout is not None:
p = ' '.join(p) timeout = max(timeout, cfg.timeout)
p = p.format(**variables)
coro = ipc.command(f'exec {p}')
if cfg.get('signal_continue', False):
sig = signal.Signals(cfg.get('signal_continue'))
coro = coro_wait_signal(coro, sig)
try: try:
await asyncio.wait_for(coro, timeout=t/1000) await asyncio.wait_for(coro, timeout=cfg.timeout/1000 if cfg.timeout is not None else 0)
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
else: else:
coroutines += [coro] coroutines += [coro]
await asyncio.gather(*coroutines) await asyncio.gather(*coroutines)
try: try:
if window_configs: if runtime_data.programs:
# run main loop only if we wait for something # run main loop only if we wait for something
diff = (time.monotonic_ns() - started_at) / (1000*1000) diff = (time.monotonic_ns() - started_at) / (1000*1000)
new_timeout = max(max(*timeouts) - diff, 0) new_timeout = max(timeout - diff, 0)
await asyncio.wait_for(ipc.main(), timeout=new_timeout/1000) await asyncio.wait_for(runtime_data.ipc.main(), timeout=new_timeout/1000)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return 1 return 1
return 0 return 0
@ -357,8 +470,8 @@ def main(ctx, debug):
@click.option('--filter', '-f', default='True', help="A filter expression for the raw ipc dictionary.") @click.option('--filter', '-f', default='True', help="A filter expression for the raw ipc dictionary.")
@click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.") @click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.")
@click.option('--workspace', '-w', default=None, help="The workspace to move to.") @click.option('--workspace', '-w', default=None, help="The workspace to move to.")
@click.argument('program', nargs=-1) @click.argument('command', nargs=-1)
def simple(ctx, filter, timeout, workspace, program): def simple(ctx, filter, timeout, workspace, command):
""" """
Start a program and move it's created window to the desired i3 workspace. Start a program and move it's created window to the desired i3 workspace.
@ -368,14 +481,17 @@ def simple(ctx, filter, timeout, workspace, program):
1 when no window has been found. 1 when no window has been found.
""" """
debug = ctx.obj['DEBUG'] debug = ctx.obj['DEBUG']
configs=[{"filter": filter, "workspace": workspace, "program": program}] config = Config(programs=[ProgramConfig(
ctx.exit(asyncio.run(run(configs, timeout=timeout, debug=debug))) cmd=command,
workspace=workspace,
match=filter,
)], timeout=timeout)
ctx.exit(asyncio.run(run(config, debug=debug)))
@main.command() @main.command()
@click.pass_context @click.pass_context
@click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.")
@click.argument('config', type=click.File('r'), default='-') @click.argument('config', type=click.File('r'), default='-')
def config(ctx, timeout, config): def config(ctx, config):
""" """
Start a program and move it's created window to the desired i3 workspace. Start a program and move it's created window to the desired i3 workspace.
@ -385,8 +501,8 @@ def config(ctx, timeout, config):
1 when no window has been found. 1 when no window has been found.
""" """
debug = ctx.obj['DEBUG'] debug = ctx.obj['DEBUG']
config = yaml.load(config, Loader=SafeLoader) config = Config(**yaml.load(config, Loader=SafeLoader))
ctx.exit(asyncio.run(run(config, timeout=timeout, debug=debug))) ctx.exit(asyncio.run(run(config, debug=debug)))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View file

@ -1,4 +1,5 @@
click click
i3ipc pydantic
pyyaml pyyaml
i3ipc