724 lines
22 KiB
Python
Executable file
724 lines
22 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
import string
|
|
import typing
|
|
import asyncio
|
|
import signal
|
|
import os
|
|
import time
|
|
import functools
|
|
import json
|
|
import logging
|
|
|
|
import yaml
|
|
import click
|
|
import pydantic
|
|
import i3ipc
|
|
import i3ipc.aio
|
|
|
|
try:
|
|
from yaml import CSafeLoader as SafeLoader
|
|
except ImportError:
|
|
from yaml import SafeLoader
|
|
|
|
LOGGER = logging.getLogger('i3toolwait' if __name__ == '__main__' else __name__)
|
|
|
|
def lazy_fc_if(env, local, a, b, c):
|
|
a.reduce(env, local)
|
|
if a.reduced:
|
|
b.reduce(env, local)
|
|
return b.reduced
|
|
c.reduce(env, local)
|
|
return c.reduced
|
|
|
|
def lazy_fc_nif(env, local, a, b, c):
|
|
a.reduce(env, local)
|
|
if not a.reduced:
|
|
b.reduce(env, local)
|
|
return b.reduced
|
|
c.reduce(env, local)
|
|
c.reduced
|
|
|
|
def lazy_fc_defun(env, local, name, variables, func):
|
|
_ = local
|
|
# need ugly hack, because variables are actually a function with n-1 args
|
|
varnames = [variables._fc] + [v._value for v in variables._args]
|
|
env.set_lisp_function(name._value, varnames, func)
|
|
|
|
def fc_load(env, local, path):
|
|
_ = local
|
|
ipc_value = env.input
|
|
for k in path.strip('.').split('.'):
|
|
ipc_value = ipc_value[k]
|
|
return ipc_value
|
|
|
|
def fc_has_key(env, local, path):
|
|
_ = local
|
|
ipc_value = env.input
|
|
for k in path.strip('.').split('.'):
|
|
try:
|
|
ipc_value = ipc_value[k]
|
|
except KeyError:
|
|
return False
|
|
return True
|
|
|
|
class Environment:
|
|
|
|
def __init__(self, input):
|
|
self._input = input
|
|
self._variables = {}
|
|
self._functions = {
|
|
'__last__': lambda _env, _local, *a: a[-1], # special function, if multiple expressions, execute all and return result of last one
|
|
'setq': lambda env, _, n, v: env.set_variable(n, v),
|
|
'let': lambda _, local, n, v: local.set_variable(n, v),
|
|
'write': lambda _env, _local, a: print(a),
|
|
'load': fc_load,
|
|
'has-key': fc_has_key,
|
|
'=': lambda _, _l, a, b: a == b,
|
|
'!=': lambda _, _l, a, b: a != b,
|
|
'>': lambda _, _l, a, b: a > b,
|
|
'<': lambda _, _l, a, b: a < b,
|
|
'>=': lambda _, _l, a, b: a >= b,
|
|
'<=': lambda _, _l, a, b: a <= b,
|
|
'+': lambda _, _l, *a: functools.reduce(lambda a, b: a + b, a),
|
|
'-': lambda _, _l, a, b: a - b,
|
|
'*': lambda _, _l, *a: functools.reduce(lambda a, b: a * b, a),
|
|
'/': lambda _, _l, a, b: a // b,
|
|
'|': lambda _, _l, *a: functools.reduce(lambda a, b: a or b, a),
|
|
'&': lambda _, _l, *a: functools.reduce(lambda a, b: a and b, a),
|
|
}
|
|
self._lazy_functions = {
|
|
'?': lazy_fc_if,
|
|
'!?': lazy_fc_nif,
|
|
'defun': lazy_fc_defun,
|
|
}
|
|
self._lisp_functions = {}
|
|
|
|
@property
|
|
def input(self):
|
|
return self._input
|
|
|
|
def set_variable(self, name: str, value: object):
|
|
self._variables[name] = value
|
|
|
|
def get_variable(self, name: str):
|
|
return self._variables[name]
|
|
|
|
def get_function(self, name: str):
|
|
return self._functions[name]
|
|
|
|
def get_lazy_function(self, name: str):
|
|
return self._lazy_functions[name]
|
|
|
|
def set_lisp_function(self, name: str, vars: list[object], e: object):
|
|
self._lisp_functions[name] = vars, e
|
|
|
|
def get_lisp_function(self, name: str) -> tuple[list[str], object]:
|
|
return self._lisp_functions[name]
|
|
|
|
class LocalEnvironment:
|
|
|
|
def __init__(self):
|
|
self._variables = {}
|
|
|
|
def copy(self) -> 'LocalEnvironment':
|
|
n = LocalEnvironment()
|
|
n._variables = self._variables.copy()
|
|
return n
|
|
|
|
def set_variable(self, name: str, value: object):
|
|
self._variables[name] = value
|
|
|
|
def get_variable(self, name: str):
|
|
return self._variables[name]
|
|
|
|
class Expression:
|
|
|
|
STATE_CONSTRUCTED = 0
|
|
STATE_REDUCED = 1
|
|
|
|
def __init__(self):
|
|
self._state = Expression.STATE_CONSTRUCTED
|
|
self._reduced = None
|
|
|
|
def _reduce(self, env: Environment, local: LocalEnvironment, args: list[object]):
|
|
_ = env, local, args
|
|
raise NotImplementedError('Implement in subclass')
|
|
|
|
def reduce(self, env: Environment, local: LocalEnvironment):
|
|
self._reduced = self._reduce(env, local, [])
|
|
self._state = Expression.STATE_REDUCED
|
|
|
|
@property
|
|
def reduced(self) -> object:
|
|
if self._state != Expression.STATE_REDUCED:
|
|
raise RuntimeError('Tried to get the reduced value before reducing')
|
|
return self._reduced
|
|
|
|
class Constant(Expression):
|
|
|
|
def __init__(self, value):
|
|
super().__init__()
|
|
self._value = value
|
|
|
|
def __repr__(self):
|
|
if isinstance(self._value, str):
|
|
return f'"{self._value}"'
|
|
return repr(self._value)
|
|
|
|
def _reduce(self, env: Environment, local: LocalEnvironment, args: list[Expression]):
|
|
_ = env, local, args
|
|
return self._value
|
|
|
|
class VariableSet(Constant):
|
|
|
|
def __repr__(self):
|
|
return self._value
|
|
|
|
class VariableGet(Constant):
|
|
|
|
def __repr__(self):
|
|
return self._value
|
|
|
|
def _reduce(self, env: Environment, local: LocalEnvironment, args: list[Expression]):
|
|
_ = args
|
|
try:
|
|
return local.get_variable(self._value)
|
|
except KeyError:
|
|
return env.get_variable(self._value)
|
|
|
|
class Function(Expression):
|
|
|
|
def __init__(self, fc, args: list[Expression]):
|
|
super().__init__()
|
|
self._fc = fc
|
|
self._args = args
|
|
|
|
def __repr__(self):
|
|
a = ' '.join([repr(a) for a in self._args])
|
|
return f'({self._fc} {a})'
|
|
|
|
def _reduce(self, env: Environment, local: LocalEnvironment, args: list[Expression]):
|
|
try:
|
|
argnames, fc = env.get_lisp_function(self._fc)
|
|
assert isinstance(fc, Expression)
|
|
l = local.copy()
|
|
for an, av in zip(argnames, args):
|
|
av.reduce(env, l)
|
|
l.set_variable(an, av.reduced)
|
|
fc.reduce(env, l)
|
|
r = fc.reduced
|
|
except KeyError as e:
|
|
try:
|
|
fc = env.get_function(self._fc)
|
|
[a.reduce(env, local) for a in args]
|
|
r = fc(env, local, *[a.reduced for a in args])
|
|
except KeyError:
|
|
fc = env.get_lazy_function(self._fc)
|
|
r = fc(env, local, *args)
|
|
return r
|
|
|
|
def reduce(self, env: Environment, local: LocalEnvironment):
|
|
self._reduced = self._reduce(env, local, self._args)
|
|
self._state = Expression.STATE_REDUCED
|
|
|
|
class Token:
|
|
|
|
CONSTANT_STRING = 0
|
|
CONSTANT_INTEGER = 10
|
|
CONSTANT_BOOLEAN = 20
|
|
KEYWORD = 30
|
|
VARIABLE_SET = 40
|
|
VARIABLE_GET = 50
|
|
FUNCTION = 60
|
|
GROUPING_OPEN = 70
|
|
GROUPING_CLOSE = 80
|
|
WHITESPACE = 90
|
|
|
|
def __init__(self, t, v):
|
|
self.t = t
|
|
self.v = v
|
|
|
|
def __repr__(self):
|
|
return f'{self.v}::{self.t}'
|
|
|
|
def to_expression(self):
|
|
if self.t == Token.CONSTANT_STRING:
|
|
return Constant(self.v[1:-1]) # slice away the quotes
|
|
if self.t == Token.CONSTANT_INTEGER:
|
|
return Constant(int(self.v, base=0))
|
|
if self.t == Token.CONSTANT_BOOLEAN:
|
|
return Constant(self.v == 'True')
|
|
if self.t == Token.KEYWORD:
|
|
raise RuntimeError(f'This is a meta token type and should be swallowed by the sanitizer: {self}')
|
|
if self.t == Token.VARIABLE_GET:
|
|
return VariableGet(self.v)
|
|
if self.t == Token.VARIABLE_SET:
|
|
return VariableSet(self.v)
|
|
if self.t == Token.FUNCTION:
|
|
raise RuntimeError('Cant construct function just from its token')
|
|
if self.t == Token.GROUPING_OPEN or self.t == Token.GROUPING_CLOSE:
|
|
raise RuntimeError('Groupings should never be constructed, this is a bug')
|
|
if self.t == Token.WHITESPACE:
|
|
raise RuntimeError('Whitespaces should not be present in this stage of the build')
|
|
raise RuntimeError(f'The token type {self.t} is not implemented')
|
|
|
|
def token_extract_string(stream: str) -> tuple[Token, str]:
|
|
if stream[0] != '"':
|
|
raise ValueError('No such token in stream')
|
|
i = stream.find('"', 1)
|
|
return Token(Token.CONSTANT_STRING, stream[:i+1]), stream[i+1:]
|
|
|
|
def token_extract_integer(stream: str) -> tuple[Token, str]:
|
|
i = 0
|
|
base = None
|
|
if stream[i] in '+-':
|
|
i += 1
|
|
if stream[i] in '0123456789':
|
|
i += 1
|
|
else:
|
|
raise ValueError('Malformed integer')
|
|
|
|
if stream[i] in 'xbo':
|
|
base = stream[i]
|
|
i += 1
|
|
int_set = {None: '0123456789', 'x': '0123456789abcdefABCDEF', 'b': '01', 'o': '01234567'}[base]
|
|
while stream[i] in int_set:
|
|
i += 1
|
|
return Token(Token.CONSTANT_INTEGER, stream[:i]), stream[i:]
|
|
|
|
def token_extract_boolean(stream: str) -> tuple[Token, str]:
|
|
if stream.startswith('True'):
|
|
return Token(Token.CONSTANT_BOOLEAN, stream[:4]), stream[4:]
|
|
elif stream.startswith('False'):
|
|
return Token(Token.CONSTANT_BOOLEAN, stream[:5]), stream[5:]
|
|
raise ValueError('No such token in stream')
|
|
|
|
def token_extract_keyword(stream: str) -> tuple[Token, str]:
|
|
i = 0
|
|
if stream[i] in string.ascii_letters + '_-><=!+-*/?&|':
|
|
i += 1
|
|
else:
|
|
raise ValueError('No keyword in stream')
|
|
while stream[i] in string.ascii_letters + string.digits + '_-><=!+-*/?&|':
|
|
i += 1
|
|
return Token(Token.KEYWORD, stream[:i]), stream[i:]
|
|
|
|
def token_extract_grouping_open(stream: str) -> tuple[Token, str]:
|
|
if stream[0] == '(':
|
|
return Token(Token.GROUPING_OPEN, '('), stream[1:]
|
|
raise ValueError('No such token in stream')
|
|
|
|
def token_extract_grouping_close(stream: str) -> tuple[Token, str]:
|
|
if stream[0] == ')':
|
|
return Token(Token.GROUPING_CLOSE, ')'), stream[1:]
|
|
raise ValueError('No such token in stream')
|
|
|
|
def token_extract_space(stream: str) -> tuple[Token, str]:
|
|
i = 0
|
|
try:
|
|
while stream[i] in string.whitespace:
|
|
i += 1
|
|
except IndexError:
|
|
pass
|
|
return Token(Token.WHITESPACE, stream[:i]), stream[i:]
|
|
|
|
def tokenize(program: str) -> list[Token]:
|
|
extractors = [
|
|
token_extract_boolean,
|
|
token_extract_integer,
|
|
token_extract_string,
|
|
token_extract_keyword,
|
|
token_extract_grouping_open,
|
|
token_extract_grouping_close,
|
|
token_extract_space,
|
|
]
|
|
p = program
|
|
tokens = []
|
|
while p:
|
|
success = False
|
|
for e in extractors:
|
|
try:
|
|
t, p = e(p)
|
|
tokens += [t]
|
|
success = True
|
|
break
|
|
except ValueError:
|
|
pass
|
|
if not success:
|
|
raise ValueError('Program is invalid')
|
|
return [t for t in tokens if t.t != Token.WHITESPACE]
|
|
|
|
def tokenize_sanitize_function(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None:
|
|
if token_before is None:
|
|
return
|
|
if token_before.t == Token.GROUPING_OPEN and token.t == Token.KEYWORD:
|
|
return Token(Token.FUNCTION, token.v)
|
|
|
|
def tokenize_sanitize_setvar(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None:
|
|
if token_before is None:
|
|
return
|
|
if (token_before.t == Token.FUNCTION and token_before.v in ('setq', 'let')) and token.t == Token.KEYWORD:
|
|
return Token(Token.VARIABLE_SET, token.v)
|
|
|
|
def tokenize_sanitize_getvar(token_before: Token | None, token: Token, token_after: Token | None) -> Token | None:
|
|
if token_before is None:
|
|
if token.t == Token.KEYWORD:
|
|
return Token(Token.VARIABLE_GET, token.v)
|
|
return
|
|
if (token_before.t != Token.FUNCTION or token_before.v not in ('setq', 'let')) and token.t == Token.KEYWORD:
|
|
return Token(Token.VARIABLE_GET, token.v)
|
|
|
|
def _tokenize_sanitize(tokens: list[Token]) -> tuple[bool, list[Token]]:
|
|
sanitizers = [
|
|
tokenize_sanitize_function,
|
|
tokenize_sanitize_setvar,
|
|
tokenize_sanitize_getvar,
|
|
]
|
|
new_tokens = []
|
|
changed = False
|
|
for i in range(len(tokens)):
|
|
for s in sanitizers:
|
|
p_token = new_tokens[i-1] if i > 0 else None
|
|
n_token = tokens[i+1] if i < (len(tokens)-1) else None
|
|
new_token = s(p_token, tokens[i], n_token)
|
|
if new_token is not None:
|
|
changed = True
|
|
new_tokens += [new_token]
|
|
break
|
|
else:
|
|
new_tokens += [tokens[i]]
|
|
return changed, new_tokens
|
|
|
|
def tokenize_sanitize(tokens: list[Token]) -> list[Token]:
|
|
_, tokens = _tokenize_sanitize(tokens)
|
|
return tokens
|
|
|
|
def take_token_group(tokens: list[Token], n: int = 1) -> list[Token]:
|
|
i = 0
|
|
start = i
|
|
group_count = 0
|
|
consider_groups = False
|
|
while n:
|
|
if tokens[i].t == Token.GROUPING_OPEN:
|
|
consider_groups = True
|
|
if group_count == 0:
|
|
start = i
|
|
group_count += 1
|
|
elif tokens[i].t == Token.GROUPING_CLOSE:
|
|
group_count -= 1
|
|
if group_count == 0:
|
|
consider_groups = False
|
|
else:
|
|
if not consider_groups:
|
|
start = i
|
|
if group_count == 0:
|
|
n -= 1
|
|
if group_count < 0:
|
|
raise ValueError('reached past end')
|
|
i += 1
|
|
return tokens[start:i]
|
|
|
|
def unwrap_token_group(tokens: list[Token]) -> list[Token]:
|
|
if tokens[0].t != Token.GROUPING_OPEN:
|
|
return tokens
|
|
|
|
brace_count = 0
|
|
for i, t in enumerate(tokens):
|
|
brace_count += int(t.t == Token.GROUPING_OPEN)
|
|
brace_count -= int(t.t == Token.GROUPING_CLOSE)
|
|
if i == len(tokens) - 2:
|
|
if brace_count > 0:
|
|
tokens = tokens[1:-1]
|
|
break
|
|
return tokens
|
|
|
|
def build(tokens: list[Token]) -> Expression:
|
|
tokens = unwrap_token_group(tokens)
|
|
token_groups: list[list[Token]] = []
|
|
i = 1
|
|
while True:
|
|
try:
|
|
token_groups += [take_token_group(tokens, n=i)]
|
|
i += 1
|
|
except IndexError:
|
|
break
|
|
|
|
# special function case
|
|
if len(token_groups[0]) == 1 and token_groups[0][0].t == Token.FUNCTION:
|
|
token_0 = token_groups[0][0]
|
|
args = [build(tg) for tg in token_groups[1:]]
|
|
return Function(token_0.v, args)
|
|
|
|
# combine to multiple statements
|
|
if len(token_groups) > 1:
|
|
return Function('__last__', [build(tg) for tg in token_groups])
|
|
|
|
# create a basic expression
|
|
if len(token_groups) == 1 and len(token_groups[0]) == 1:
|
|
return token_groups[0][0].to_expression()
|
|
|
|
raise RuntimeError(f'Did not handle token case in build function, token_groups: {token_groups}')
|
|
|
|
def parse(program: str) -> Expression:
|
|
tokens = tokenize_sanitize(tokenize(program))
|
|
expression = build(tokens)
|
|
return expression
|
|
|
|
class Filter(Expression):
|
|
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, str):
|
|
raise TypeError('Must be string')
|
|
return parse(v)
|
|
|
|
class Command(str):
|
|
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, (str, list, tuple)):
|
|
raise TypeError('Must be string or list')
|
|
if isinstance(v, (list, tuple)):
|
|
v = ' '.join([f"'{x}'" for x in v])
|
|
return v
|
|
|
|
class Signal(int):
|
|
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, (str, int)):
|
|
raise TypeError('Must be string or int')
|
|
if isinstance(v, str) and v.isnumeric():
|
|
return signal.Signals(int(v))
|
|
elif isinstance(v, int):
|
|
return signal.Signals(v)
|
|
return getattr(signal.Signals, v)
|
|
|
|
class Lock(asyncio.Lock):
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, asyncio.Lock):
|
|
raise TypeError('Must be a asyncio.Lock')
|
|
return v
|
|
|
|
class Event(asyncio.Event):
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, asyncio.Event):
|
|
raise TypeError('Must be a asyncio.Event')
|
|
return v
|
|
|
|
class Connection(i3ipc.aio.Connection):
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
@classmethod
|
|
def __modify_schema__(cls, field_schema):
|
|
pass
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not isinstance(v, i3ipc.aio.Connection):
|
|
raise TypeError('Must be a i3ipc.aio.Connection')
|
|
return v
|
|
|
|
|
|
class ProgramConfig(pydantic.BaseModel):
|
|
cmd: Command
|
|
workspace: typing.Optional[str] = None
|
|
signal: bool = False
|
|
timeout: int = 1000
|
|
match: Filter
|
|
|
|
class Config(pydantic.BaseModel):
|
|
signal: typing.Optional[Signal] = None
|
|
timeout: int = 3000
|
|
init: typing.Optional[Filter] = None
|
|
programs: typing.List[ProgramConfig]
|
|
final_workspace: typing.Optional[str] = None
|
|
final_workspace_delay: int = 100
|
|
|
|
class RuntimeData(pydantic.BaseModel):
|
|
init: typing.Optional[str]
|
|
programs: typing.List[ProgramConfig] = []
|
|
lock: Lock
|
|
event: Event
|
|
ipc: Connection
|
|
|
|
def window_new(runtime_data: RuntimeData, *, debug):
|
|
async def callback(ipc: i3ipc.aio.Connection, e: i3ipc.WorkspaceEvent):
|
|
assert e.change == 'new'
|
|
LOGGER.debug('New window: %s', json.dumps(e.ipc_data))
|
|
async with runtime_data.lock:
|
|
env = Environment(e.ipc_data)
|
|
local = LocalEnvironment()
|
|
if runtime_data.init is not None:
|
|
parse(runtime_data.init).reduce(env, local)
|
|
for i, cfg in enumerate(runtime_data.programs):
|
|
cfg.match.reduce(env, local)
|
|
LOGGER.debug('Tried to match %s, result: %s', cfg.match, cfg.match.reduced)
|
|
if cfg.match.reduced:
|
|
container_id = e.ipc_data['container']['id']
|
|
await ipc.command(f'for_window [con_id="{container_id}"] focus')
|
|
await ipc.command(f'move container to workspace {cfg.workspace}')
|
|
runtime_data.programs.pop(i)
|
|
if not runtime_data.programs:
|
|
ipc.main_quit()
|
|
return callback
|
|
|
|
async def wait_signal(rt: RuntimeData):
|
|
await rt.event.wait()
|
|
rt.event.clear()
|
|
|
|
async def coro_wait_signal(coro, rt: RuntimeData):
|
|
await coro
|
|
await wait_signal(rt)
|
|
|
|
async def init(config: Config, *, debug: bool) -> RuntimeData:
|
|
rd = RuntimeData(
|
|
init=str(config.init),
|
|
programs=[p for p in config.programs if p.workspace is not None],
|
|
lock=Lock(),
|
|
event=Event(),
|
|
ipc=Connection(),
|
|
)
|
|
logging.basicConfig(level=logging.WARNING)
|
|
if debug:
|
|
LOGGER.setLevel(logging.DEBUG)
|
|
else:
|
|
LOGGER.setLevel(logging.INFO)
|
|
if config.signal is not None:
|
|
asyncio.get_running_loop().add_signal_handler(config.signal, lambda: rd.event.set())
|
|
return rd
|
|
|
|
async def run(config: Config, *, debug: bool):
|
|
runtime_data = await init(config, debug=debug)
|
|
await runtime_data.ipc.connect()
|
|
handler = window_new(runtime_data, debug=debug)
|
|
runtime_data.ipc.on('window::new', handler)
|
|
|
|
variables = {
|
|
'pid': os.getpid(),
|
|
}
|
|
coroutines = []
|
|
timeout = config.timeout
|
|
started_at = time.monotonic_ns()
|
|
for cfg in config.programs:
|
|
p = cfg.cmd.format(**variables)
|
|
coro = runtime_data.ipc.command(f'exec {p}')
|
|
if cfg.signal:
|
|
coro = coro_wait_signal(coro, runtime_data)
|
|
if cfg.timeout is not None:
|
|
timeout = max(timeout, cfg.timeout)
|
|
try:
|
|
await asyncio.wait_for(coro, timeout=cfg.timeout/1000 if cfg.timeout is not None else 0)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
else:
|
|
coroutines += [coro]
|
|
await asyncio.gather(*coroutines)
|
|
try:
|
|
if runtime_data.programs:
|
|
# run main loop only if we wait for something
|
|
diff = (time.monotonic_ns() - started_at) / (1000*1000)
|
|
new_timeout = max(timeout - diff, 0)
|
|
await asyncio.wait_for(runtime_data.ipc.main(), timeout=new_timeout/1000)
|
|
except asyncio.TimeoutError:
|
|
runtime_data.ipc.off(handler)
|
|
if runtime_data.programs:
|
|
LOGGER.debug('Not all programs consumed: %s', runtime_data.programs)
|
|
LOGGER.debug('Maybe the timeouts are too short?')
|
|
return 1
|
|
finally:
|
|
if config.final_workspace is not None:
|
|
await asyncio.sleep(config.final_workspace_delay/1000)
|
|
await runtime_data.ipc.command(f'workspace {config.final_workspace}')
|
|
return 0
|
|
|
|
@click.group()
|
|
@click.pass_context
|
|
@click.option('--debug', '-d', default=False, is_flag=True, help="Enable debug mode, will log ipc dictionary.")
|
|
def main(ctx, debug):
|
|
ctx.ensure_object(dict)
|
|
ctx.obj['DEBUG'] = debug
|
|
|
|
@main.command()
|
|
@click.pass_context
|
|
@click.option('--filter', '-f', default='True', help="A filter expression for the raw ipc dictionary.")
|
|
@click.option('--timeout', '-t', default=3000, help="Wait time for a window to appear (and match) in milliseconds.")
|
|
@click.option('--workspace', '-w', default=None, help="The workspace to move to.")
|
|
@click.argument('command', nargs=-1)
|
|
def simple(ctx, filter, timeout, workspace, command):
|
|
"""
|
|
Start a program and move it's created window to the desired i3 workspace.
|
|
|
|
\b
|
|
Exist status:
|
|
0 on success,
|
|
1 when no window has been found.
|
|
"""
|
|
debug = ctx.obj['DEBUG']
|
|
config = Config(programs=[ProgramConfig(
|
|
cmd=command,
|
|
workspace=workspace,
|
|
match=filter,
|
|
)], timeout=timeout)
|
|
ctx.exit(asyncio.run(run(config, debug=debug)))
|
|
|
|
@main.command()
|
|
@click.pass_context
|
|
@click.argument('config', type=click.File('r'), default='-')
|
|
def config(ctx, config):
|
|
"""
|
|
Start a program and move it's created window to the desired i3 workspace.
|
|
|
|
\b
|
|
Exist status:
|
|
0 on success,
|
|
1 when no window has been found.
|
|
"""
|
|
debug = ctx.obj['DEBUG']
|
|
config = Config(**yaml.load(config, Loader=SafeLoader))
|
|
ctx.exit(asyncio.run(run(config, debug=debug)))
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|