From 5189ba323b90df198563c306fe39c79a8a0e1509 Mon Sep 17 00:00:00 2001 From: redxef Date: Sun, 15 Oct 2023 14:39:59 +0200 Subject: [PATCH] Add signal processing, more logs. --- Cargo.toml | 1 + src/config.rs | 54 +++++++++++---- src/i3ipc.rs | 22 ++++-- src/lisp.rs | 3 +- src/main.rs | 180 ++++++++++++++++++++++++++++++++++++++------------ 5 files changed, 197 insertions(+), 63 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b034ae7..f832c4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,5 +16,6 @@ rust_lisp = { git = "https://github.com/brundonsmith/rust_lisp.git", branch = "a serde = { version = "1.0.188", features = ["std", "derive", "serde_derive"] } serde_json = "1.0.107" serde_yaml = "0.9.25" +strfmt = "0.2.4" tokio = { version = "1.33.0", features = ["full"] } xdg = "2.5.2" diff --git a/src/config.rs b/src/config.rs index 00b60be..7605c56 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,7 @@ use std::fmt::{Display, Formatter}; -use serde::{Serialize, Deserialize, Serializer, Deserializer}; + use rust_lisp::model::Value as RValue; +use serde::{Deserialize, Deserializer}; #[derive(Clone, Debug)] pub struct Value(Vec); @@ -34,16 +35,17 @@ impl Display for Value { impl<'de> Deserialize<'de> for Value { fn deserialize(deserializer: D) -> Result where - D: Deserializer<'de> + D: Deserializer<'de>, { let s: String = Deserialize::deserialize(deserializer)?; - let r: Vec = rust_lisp::parser::parse(&s).filter_map(|x| x.ok()).collect(); + let r: Vec = rust_lisp::parser::parse(&s) + .filter_map(|x| x.ok()) + .collect(); Ok(Value(r)) } } -#[derive(Clone, Debug)] -#[derive(Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Program { #[serde(rename = "match")] pub match_: Value, @@ -51,29 +53,53 @@ pub struct Program { #[serde(default)] pub run: Option, } -unsafe impl Send for Program {} -unsafe impl Sync for Program {} -#[derive(Clone, Debug)] -#[derive(Deserialize)] +#[derive(Clone, Debug, Deserialize)] +pub struct Signal { + #[serde(default)] + pub run: Option, + #[serde(default = "Signal::default_timeout")] + pub timeout: u64, +} +impl Signal { + fn default_timeout() -> u64 { + 500 + } +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(untagged)] +pub enum ProgramEntry { + Program(Program), + Signal(Signal), +} + +// Program is only unsafe because Value has dyn Any in it (via Foreign). +// if we don't use !Send in Foreign everything is fine. +unsafe impl Send for Program {} + +#[derive(Clone, Debug, Deserialize)] pub struct Config { #[serde(default = "Config::default_timeout")] - pub timeout: u32, + pub timeout: u64, #[serde(default = "Config::default_init")] pub init: Value, #[serde(default = "Config::default_programs")] - pub programs: Vec, + pub programs: Vec, + #[serde(default)] + pub cmd: Option, } +// Config is only unsafe because Value has dyn Any in it (via Foreign). +// if we don't use !Send in Foreign everything is fine. unsafe impl Send for Config {} -unsafe impl Sync for Config {} impl Config { - fn default_timeout() -> u32 { + fn default_timeout() -> u64 { 3000 } fn default_init() -> Value { Value(vec![]) } - fn default_programs() -> Vec { + fn default_programs() -> Vec { vec![] } } diff --git a/src/i3ipc.rs b/src/i3ipc.rs index 4a6cd7c..b4c2024 100644 --- a/src/i3ipc.rs +++ b/src/i3ipc.rs @@ -1,10 +1,11 @@ -use anyhow::Result; -use futures::future::BoxFuture; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::pin::Pin; use std::str::FromStr; -use std::sync::Arc; + + +use anyhow::Result; + +use serde::{Deserialize, Serialize}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream}; use tokio::net::UnixStream; @@ -192,9 +193,18 @@ impl<'a> Connection<'a> { (*cb.unwrap())(*subscription, response).await } - pub async fn run(&mut self) -> Result<(), anyhow::Error> { + pub async fn run( + &mut self, + rx: &mut tokio::sync::broadcast::Receiver<()>, + ) -> Result<(), anyhow::Error> { loop { - let (message_type, response) = self.receive_message().await?; + let stop_task = rx.recv(); + let receive_message_task = self.receive_message(); + let result = tokio::select! { + _ = stop_task => {return Ok(())}, + result = receive_message_task => result?, + }; + let (message_type, response) = result; if !message_type.is_subscription() { continue; } diff --git a/src/lisp.rs b/src/lisp.rs index 2715cc9..3e2a0b4 100644 --- a/src/lisp.rs +++ b/src/lisp.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; -use rust_lisp::model::{IntType, FloatType, Value, List, Env, reference, reference::Reference}; - +use rust_lisp::model::{reference, reference::Reference, Env, FloatType, IntType, List, Value}; fn serde_lisp_value(value: &serde_json::Value) -> Value { match value { diff --git a/src/main.rs b/src/main.rs index 11b83c0..c88504d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,21 @@ -use anyhow::Result; -use tokio::time::{timeout, Duration}; -use tokio::io::AsyncReadExt; -use clap::Parser; -use log::{info, debug, warn}; +use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; +use anyhow::Result; +use clap::Parser; +use log::{debug, info, warn}; +use tokio::io::AsyncReadExt; +use tokio::time::{timeout, Duration}; + mod config; mod i3ipc; mod lisp; -use config::Config; +use config::{Config, ProgramEntry}; use i3ipc::{Connection, MessageType}; - -#[derive(Debug, Clone)] -#[derive(Parser)] +#[derive(Debug, Clone, Parser)] #[command(author, version, about, long_about = None)] struct Args { #[arg(short, long, value_name = "FILE")] @@ -26,64 +26,138 @@ impl Args { fn finish(&mut self) { // TODO maybe return separate type if self.config.is_none() { - self.config = Some(xdg::BaseDirectories::with_prefix("i3toolwait").unwrap().get_config_file("config.yaml")); + self.config = Some( + xdg::BaseDirectories::with_prefix("i3toolwait") + .unwrap() + .get_config_file("config.yaml"), + ); } } } fn new_window_cb( - b: MessageType, + _b: MessageType, c: serde_json::Value, config: &Config, - args: &Args, + _args: &Args, + programs: &std::sync::Arc>>, + tx: &tokio::sync::broadcast::Sender<()>, ) -> futures::future::BoxFuture<'static, Vec<(MessageType, Vec)>> { let config_ = config.clone(); + let tx_ = tx.clone(); + let programs_ = programs.clone(); Box::pin(async move { + let mut command = None; + let mut index = None; debug!("Received window event: {}", &c); - for p in config_.programs.iter() { - debug!("Evaluating program: {}", &p.match_); - let e = lisp::env(&c); - let init: Vec = config_.init.clone().into(); - let prog: Vec = p.match_.clone().into(); - let m = init.into_iter().chain(prog.into_iter()); - let result = rust_lisp::interpreter::eval_block(rust_lisp::model::reference::new(e), m); - if let Ok(rust_lisp::model::Value::True) = result { - debug!("Match found"); - return vec![(MessageType::Command, p.cmd.clone().into_bytes())]; + for (i, p) in programs_.lock().await.iter().enumerate() { + match p { + ProgramEntry::Program(p) => { + debug!("Evaluating program: {}", &p.match_); + let e = lisp::env(&c); + let init: Vec = config_.init.clone().into(); + let prog: Vec = p.match_.clone().into(); + let m = init.into_iter().chain(prog.into_iter()); + let result = + rust_lisp::interpreter::eval_block(rust_lisp::model::reference::new(e), m); + if let Ok(v) = &result { + debug!("Received result: {}", v); + if *v == rust_lisp::model::Value::False { + continue; + } + debug!("Match found"); + let mut vars = HashMap::with_capacity(1); + vars.insert("result".to_string(), v.to_string()); + let cmd = strfmt::strfmt(&p.cmd, &vars).unwrap(); + debug!("Command: {}", &cmd); + + index = Some(i); + command = Some(cmd); + break; + } else { + warn!("Program produced an error: {:?}", &result); + } + } + _ => { + // Ignore signal entries + () + } + }; + } + if let Some(index) = index { + let mut plock = programs_.lock().await; + plock.remove(index); + if plock.len() == 0 { + tx_.send(()).unwrap(); } + return vec![(MessageType::Command, command.unwrap().into_bytes())]; } debug!("No match found"); Vec::new() }) } +async fn run_command<'a>( + connection: &mut Connection<'a>, + command: &str, +) -> Result<(), anyhow::Error> { + let (_, responses) = connection + .communicate(&MessageType::Command, command.as_bytes()) + .await?; + match responses { + serde_json::Value::Array(responses) => { + for response in responses { + if let serde_json::Value::Bool(v) = response.get("success").unwrap() { + if !v { + warn!("Failed to run command {}: {}", command, response); + } + } + } + } + _ => panic!("invalid response"), + }; + Ok(()) +} + async fn run<'a>(connection: &mut Connection<'a>, config: &Config) -> Result<(), anyhow::Error> { let (_, resp) = connection.communicate(&MessageType::Version, b"").await?; info!("i3 version is {}", resp.get("human_readable").unwrap()); + let mut signal_stream = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined1())?; + for p in config.programs.iter() { - if let Some(r) = &p.run { - let (_, responses) = connection.communicate(&MessageType::Command, r.as_bytes()).await?; - match responses { - serde_json::Value::Array(responses) => { - for response in responses { - if let serde_json::Value::Bool(v) = response.get("success").unwrap() { - if !v { - warn!("Failed to run command {}: {}", r, response); - } - } - } - }, - _ => panic!("invalid response"), - }; - } + match p { + ProgramEntry::Program(p) => { + if let Some(r) = &p.run { + run_command(connection, r).await?; + } + } + ProgramEntry::Signal(p) => { + if let Some(r) = &p.run { + run_command(connection, r).await?; + } + if let Err(_) = + timeout(Duration::from_millis(p.timeout), signal_stream.recv()).await + { + warn!( + "Ran into timeout when waiting for signal, program: {:?}", + p.run + ); + } + } + }; } Ok(()) } #[tokio::main] async fn main() -> Result<()> { - env_logger::init_from_env(env_logger::Env::new().filter("I3TOOLWAIT_LOG").write_style("I3TOOLWAIT_LOG_STYLE")); + env_logger::init_from_env( + env_logger::Env::new() + .filter("I3TOOLWAIT_LOG") + .write_style("I3TOOLWAIT_LOG_STYLE"), + ); let mut args = Args::parse(); args.finish(); @@ -92,24 +166,48 @@ async fn main() -> Result<()> { if args.config.as_ref().unwrap() == &PathBuf::from_str("-").unwrap() { tokio::io::stdin().read_to_string(&mut config).await?; } else { - tokio::fs::File::open(args.config.as_ref().unwrap()).await?.read_to_string(&mut config).await?; + tokio::fs::File::open(args.config.as_ref().unwrap()) + .await? + .read_to_string(&mut config) + .await?; } let config: Config = serde_yaml::from_str(&config)?; let config = std::sync::Arc::new(config); + let programs = std::sync::Arc::new(tokio::sync::Mutex::new(config.programs.clone())); let mut connection = Connection::connect((i3ipc::get_socket_path().await?).as_ref())?; let mut sub_connection = connection.clone(); let cb_config = config.clone(); let cb_args = args.clone(); - let cb = move |a, b| {new_window_cb(a, b, &cb_config, &cb_args)}; + + let (tx, mut rx) = tokio::sync::broadcast::channel::<()>(1); + + let cb_programs = programs.clone(); + let cb = move |a, b| new_window_cb(a, b, &cb_config, &cb_args, &cb_programs, &tx); sub_connection .subscribe(&[MessageType::SubWindow], &cb) .await?; tokio::join!( - timeout(Duration::from_millis(config.timeout as u64), sub_connection.run()), + timeout( + Duration::from_millis(config.timeout), + sub_connection.run(&mut rx) + ), run(&mut connection, &config), ) .1?; + { + let p = programs.lock().await; + if p.len() != 0 { + warn!("Not all programs consumed: {:?}", &p); + info!("Maybe the timouts are too short?"); + } + } + + if let Some(cmd) = &config.cmd { + connection + .communicate(&MessageType::Command, cmd.as_bytes()) + .await?; + } Ok(()) }