Switch to asyncio.

This commit is contained in:
redxef 2022-10-20 14:39:59 +02:00
parent 1d2623ec66
commit 6f4d9032b4
Signed by: redxef
GPG key ID: 7DAC3AA211CBD921

View file

@ -2,13 +2,14 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import asyncio
import functools import functools
import json import json
import time
import yaml import yaml
import click import click
import i3ipc import i3ipc
import i3ipc.aio
try: try:
from yaml import CSafeLoader as SafeLoader from yaml import CSafeLoader as SafeLoader
@ -153,7 +154,7 @@ expression_mapping = {
'<': LtExpression, '<': LtExpression,
} }
def group_tokens(tokens: list[str]): def group_tokens(tokens: list[str]) -> list[list[str]]:
groups = [] groups = []
current_group = [] current_group = []
@ -268,9 +269,8 @@ def parse(s: str) -> Expression:
tokens = tokenize(s) tokens = tokenize(s)
return build_expression(tokens) return build_expression(tokens)
def window_new(configs: list[dict], debug: bool):
def window_new(configs, debug): async def callback(ipc: i3ipc.aio.Connection, e: i3ipc.WorkspaceEvent):
def callback(ipc, e):
assert e.change == 'new' assert e.change == 'new'
if debug: if debug:
print(json.dumps(e.ipc_data)) print(json.dumps(e.ipc_data))
@ -279,13 +279,31 @@ def window_new(configs, debug):
workspace = cfg['workspace'] workspace = cfg['workspace']
if filter.reduce(e.ipc_data): if filter.reduce(e.ipc_data):
container_id = e.ipc_data['container']['id'] container_id = e.ipc_data['container']['id']
ipc.command(f'for_window [con_id="{container_id}"] focus') await ipc.command(f'for_window [con_id="{container_id}"] focus')
ipc.command(f'for_window [con_id="{container_id}"] move container to workspace {workspace}') await ipc.command(f'for_window [con_id="{container_id}"] move container to workspace {workspace}')
configs.pop(i) configs.pop(i)
if not configs: if not configs:
ipc.main_quit() ipc.main_quit()
return callback return callback
async def run(configs: list[dict], *, timeout: int, debug: bool):
ipc = await i3ipc.aio.Connection().connect()
ipc.on('window::new', window_new(configs, debug=debug))
coroutines = []
for cfg in configs:
cfg['filter'] = parse(cfg['filter'])
p = cfg['program']
if isinstance(p, list):
p = ' '.join(p)
coroutines += [ipc.command(f'exec {p}')]
await asyncio.gather(*coroutines)
try:
await asyncio.wait_for(ipc.main(), timeout=timeout/1000)
except asyncio.TimeoutError:
return 1
return 0
@click.group() @click.group()
@click.pass_context @click.pass_context
@click.option('--debug', '-d', default=False, is_flag=True, help="Enable debug mode, will log ipc dictionary.") @click.option('--debug', '-d', default=False, is_flag=True, help="Enable debug mode, will log ipc dictionary.")
@ -309,22 +327,8 @@ def simple(ctx, filter, timeout, workspace, program):
1 when no window has been found. 1 when no window has been found.
""" """
debug = ctx.obj['DEBUG'] debug = ctx.obj['DEBUG']
filter = parse(filter) configs=[{"filter": filter, "workspace": workspace, "program": program}]
program = ' '.join(program) ctx.exit(asyncio.run(run(configs, timeout=timeout, debug=debug)))
configs=[
{
"filter": filter,
"workspace": workspace,
},
]
ipc = i3ipc.Connection()
ipc.on('window::new', window_new(configs, debug=debug))
ipc.command(f'exec {program}')
started_at = time.monotonic_ns() // (1000*1000)
ipc.main(timeout=timeout / 1000)
total_time = time.monotonic_ns() // (1000*1000) - started_at
ctx.exit(int(total_time >= timeout))
@main.command() @main.command()
@click.pass_context @click.pass_context
@ -341,20 +345,7 @@ def config(ctx, timeout, configs):
""" """
debug = ctx.obj['DEBUG'] debug = ctx.obj['DEBUG']
configs = yaml.load(configs, Loader=SafeLoader) configs = yaml.load(configs, Loader=SafeLoader)
ctx.exit(asyncio.run(run(configs, timeout=timeout, debug=debug)))
ipc = i3ipc.Connection()
ipc.on('window::new', window_new(configs, debug=debug))
for cfg in configs:
cfg['filter'] = parse(cfg['filter'])
p = cfg['program']
if isinstance(p, list):
p = ' '.join(p)
ipc.command(f'exec {p}')
started_at = time.monotonic_ns() // (1000*1000)
ipc.main(timeout=timeout / 1000)
total_time = time.monotonic_ns() // (1000*1000) - started_at
ctx.exit(int(total_time >= timeout))
if __name__ == '__main__': if __name__ == '__main__':
main() main()