Add signal processing, more logs.

This commit is contained in:
redxef 2023-10-15 14:39:59 +02:00
parent 1b6c9e46be
commit 5189ba323b
Signed by: redxef
GPG key ID: 7DAC3AA211CBD921
5 changed files with 197 additions and 63 deletions

View file

@ -16,5 +16,6 @@ rust_lisp = { git = "https://github.com/brundonsmith/rust_lisp.git", branch = "a
serde = { version = "1.0.188", features = ["std", "derive", "serde_derive"] } serde = { version = "1.0.188", features = ["std", "derive", "serde_derive"] }
serde_json = "1.0.107" serde_json = "1.0.107"
serde_yaml = "0.9.25" serde_yaml = "0.9.25"
strfmt = "0.2.4"
tokio = { version = "1.33.0", features = ["full"] } tokio = { version = "1.33.0", features = ["full"] }
xdg = "2.5.2" xdg = "2.5.2"

View file

@ -1,6 +1,7 @@
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use serde::{Serialize, Deserialize, Serializer, Deserializer};
use rust_lisp::model::Value as RValue; use rust_lisp::model::Value as RValue;
use serde::{Deserialize, Deserializer};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Value(Vec<RValue>); pub struct Value(Vec<RValue>);
@ -34,16 +35,17 @@ impl Display for Value {
impl<'de> Deserialize<'de> for Value { impl<'de> Deserialize<'de> for Value {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where where
D: Deserializer<'de> D: Deserializer<'de>,
{ {
let s: String = Deserialize::deserialize(deserializer)?; let s: String = Deserialize::deserialize(deserializer)?;
let r: Vec<RValue> = rust_lisp::parser::parse(&s).filter_map(|x| x.ok()).collect(); let r: Vec<RValue> = rust_lisp::parser::parse(&s)
.filter_map(|x| x.ok())
.collect();
Ok(Value(r)) Ok(Value(r))
} }
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Deserialize)]
#[derive(Deserialize)]
pub struct Program { pub struct Program {
#[serde(rename = "match")] #[serde(rename = "match")]
pub match_: Value, pub match_: Value,
@ -51,29 +53,53 @@ pub struct Program {
#[serde(default)] #[serde(default)]
pub run: Option<String>, pub run: Option<String>,
} }
unsafe impl Send for Program {}
unsafe impl Sync for Program {}
#[derive(Clone, Debug)] #[derive(Clone, Debug, Deserialize)]
#[derive(Deserialize)] pub struct Signal {
#[serde(default)]
pub run: Option<String>,
#[serde(default = "Signal::default_timeout")]
pub timeout: u64,
}
impl Signal {
fn default_timeout() -> u64 {
500
}
}
#[derive(Clone, Debug, Deserialize)]
#[serde(untagged)]
pub enum ProgramEntry {
Program(Program),
Signal(Signal),
}
// Program is only unsafe because Value has dyn Any in it (via Foreign).
// if we don't use !Send in Foreign everything is fine.
unsafe impl Send for Program {}
#[derive(Clone, Debug, Deserialize)]
pub struct Config { pub struct Config {
#[serde(default = "Config::default_timeout")] #[serde(default = "Config::default_timeout")]
pub timeout: u32, pub timeout: u64,
#[serde(default = "Config::default_init")] #[serde(default = "Config::default_init")]
pub init: Value, pub init: Value,
#[serde(default = "Config::default_programs")] #[serde(default = "Config::default_programs")]
pub programs: Vec<Program>, pub programs: Vec<ProgramEntry>,
#[serde(default)]
pub cmd: Option<String>,
} }
// Config is only unsafe because Value has dyn Any in it (via Foreign).
// if we don't use !Send in Foreign everything is fine.
unsafe impl Send for Config {} unsafe impl Send for Config {}
unsafe impl Sync for Config {}
impl Config { impl Config {
fn default_timeout() -> u32 { fn default_timeout() -> u64 {
3000 3000
} }
fn default_init() -> Value { fn default_init() -> Value {
Value(vec![]) Value(vec![])
} }
fn default_programs() -> Vec<Program> { fn default_programs() -> Vec<ProgramEntry> {
vec![] vec![]
} }
} }

View file

@ -1,10 +1,11 @@
use anyhow::Result;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream};
use tokio::net::UnixStream; use tokio::net::UnixStream;
@ -192,9 +193,18 @@ impl<'a> Connection<'a> {
(*cb.unwrap())(*subscription, response).await (*cb.unwrap())(*subscription, response).await
} }
pub async fn run(&mut self) -> Result<(), anyhow::Error> { pub async fn run(
&mut self,
rx: &mut tokio::sync::broadcast::Receiver<()>,
) -> Result<(), anyhow::Error> {
loop { loop {
let (message_type, response) = self.receive_message().await?; let stop_task = rx.recv();
let receive_message_task = self.receive_message();
let result = tokio::select! {
_ = stop_task => {return Ok(())},
result = receive_message_task => result?,
};
let (message_type, response) = result;
if !message_type.is_subscription() { if !message_type.is_subscription() {
continue; continue;
} }

View file

@ -1,7 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use rust_lisp::model::{IntType, FloatType, Value, List, Env, reference, reference::Reference};
use rust_lisp::model::{reference, reference::Reference, Env, FloatType, IntType, List, Value};
fn serde_lisp_value(value: &serde_json::Value) -> Value { fn serde_lisp_value(value: &serde_json::Value) -> Value {
match value { match value {

View file

@ -1,21 +1,21 @@
use anyhow::Result; use std::collections::HashMap;
use tokio::time::{timeout, Duration};
use tokio::io::AsyncReadExt;
use clap::Parser;
use log::{info, debug, warn};
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use anyhow::Result;
use clap::Parser;
use log::{debug, info, warn};
use tokio::io::AsyncReadExt;
use tokio::time::{timeout, Duration};
mod config; mod config;
mod i3ipc; mod i3ipc;
mod lisp; mod lisp;
use config::Config; use config::{Config, ProgramEntry};
use i3ipc::{Connection, MessageType}; use i3ipc::{Connection, MessageType};
#[derive(Debug, Clone, Parser)]
#[derive(Debug, Clone)]
#[derive(Parser)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
struct Args { struct Args {
#[arg(short, long, value_name = "FILE")] #[arg(short, long, value_name = "FILE")]
@ -26,64 +26,138 @@ impl Args {
fn finish(&mut self) { fn finish(&mut self) {
// TODO maybe return separate type // TODO maybe return separate type
if self.config.is_none() { if self.config.is_none() {
self.config = Some(xdg::BaseDirectories::with_prefix("i3toolwait").unwrap().get_config_file("config.yaml")); self.config = Some(
xdg::BaseDirectories::with_prefix("i3toolwait")
.unwrap()
.get_config_file("config.yaml"),
);
} }
} }
} }
fn new_window_cb( fn new_window_cb(
b: MessageType, _b: MessageType,
c: serde_json::Value, c: serde_json::Value,
config: &Config, config: &Config,
args: &Args, _args: &Args,
programs: &std::sync::Arc<tokio::sync::Mutex<Vec<ProgramEntry>>>,
tx: &tokio::sync::broadcast::Sender<()>,
) -> futures::future::BoxFuture<'static, Vec<(MessageType, Vec<u8>)>> { ) -> futures::future::BoxFuture<'static, Vec<(MessageType, Vec<u8>)>> {
let config_ = config.clone(); let config_ = config.clone();
let tx_ = tx.clone();
let programs_ = programs.clone();
Box::pin(async move { Box::pin(async move {
let mut command = None;
let mut index = None;
debug!("Received window event: {}", &c); debug!("Received window event: {}", &c);
for p in config_.programs.iter() { for (i, p) in programs_.lock().await.iter().enumerate() {
match p {
ProgramEntry::Program(p) => {
debug!("Evaluating program: {}", &p.match_); debug!("Evaluating program: {}", &p.match_);
let e = lisp::env(&c); let e = lisp::env(&c);
let init: Vec<rust_lisp::model::Value> = config_.init.clone().into(); let init: Vec<rust_lisp::model::Value> = config_.init.clone().into();
let prog: Vec<rust_lisp::model::Value> = p.match_.clone().into(); let prog: Vec<rust_lisp::model::Value> = p.match_.clone().into();
let m = init.into_iter().chain(prog.into_iter()); let m = init.into_iter().chain(prog.into_iter());
let result = rust_lisp::interpreter::eval_block(rust_lisp::model::reference::new(e), m); let result =
if let Ok(rust_lisp::model::Value::True) = result { rust_lisp::interpreter::eval_block(rust_lisp::model::reference::new(e), m);
debug!("Match found"); if let Ok(v) = &result {
return vec![(MessageType::Command, p.cmd.clone().into_bytes())]; debug!("Received result: {}", v);
if *v == rust_lisp::model::Value::False {
continue;
} }
debug!("Match found");
let mut vars = HashMap::with_capacity(1);
vars.insert("result".to_string(), v.to_string());
let cmd = strfmt::strfmt(&p.cmd, &vars).unwrap();
debug!("Command: {}", &cmd);
index = Some(i);
command = Some(cmd);
break;
} else {
warn!("Program produced an error: {:?}", &result);
}
}
_ => {
// Ignore signal entries
()
}
};
}
if let Some(index) = index {
let mut plock = programs_.lock().await;
plock.remove(index);
if plock.len() == 0 {
tx_.send(()).unwrap();
}
return vec![(MessageType::Command, command.unwrap().into_bytes())];
} }
debug!("No match found"); debug!("No match found");
Vec::new() Vec::new()
}) })
} }
async fn run<'a>(connection: &mut Connection<'a>, config: &Config) -> Result<(), anyhow::Error> { async fn run_command<'a>(
let (_, resp) = connection.communicate(&MessageType::Version, b"").await?; connection: &mut Connection<'a>,
info!("i3 version is {}", resp.get("human_readable").unwrap()); command: &str,
) -> Result<(), anyhow::Error> {
for p in config.programs.iter() { let (_, responses) = connection
if let Some(r) = &p.run { .communicate(&MessageType::Command, command.as_bytes())
let (_, responses) = connection.communicate(&MessageType::Command, r.as_bytes()).await?; .await?;
match responses { match responses {
serde_json::Value::Array(responses) => { serde_json::Value::Array(responses) => {
for response in responses { for response in responses {
if let serde_json::Value::Bool(v) = response.get("success").unwrap() { if let serde_json::Value::Bool(v) = response.get("success").unwrap() {
if !v { if !v {
warn!("Failed to run command {}: {}", r, response); warn!("Failed to run command {}: {}", command, response);
}
} }
} }
} }
},
_ => panic!("invalid response"), _ => panic!("invalid response"),
}; };
Ok(())
}
async fn run<'a>(connection: &mut Connection<'a>, config: &Config) -> Result<(), anyhow::Error> {
let (_, resp) = connection.communicate(&MessageType::Version, b"").await?;
info!("i3 version is {}", resp.get("human_readable").unwrap());
let mut signal_stream =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined1())?;
for p in config.programs.iter() {
match p {
ProgramEntry::Program(p) => {
if let Some(r) = &p.run {
run_command(connection, r).await?;
} }
} }
ProgramEntry::Signal(p) => {
if let Some(r) = &p.run {
run_command(connection, r).await?;
}
if let Err(_) =
timeout(Duration::from_millis(p.timeout), signal_stream.recv()).await
{
warn!(
"Ran into timeout when waiting for signal, program: {:?}",
p.run
);
}
}
};
}
Ok(()) Ok(())
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
env_logger::init_from_env(env_logger::Env::new().filter("I3TOOLWAIT_LOG").write_style("I3TOOLWAIT_LOG_STYLE")); env_logger::init_from_env(
env_logger::Env::new()
.filter("I3TOOLWAIT_LOG")
.write_style("I3TOOLWAIT_LOG_STYLE"),
);
let mut args = Args::parse(); let mut args = Args::parse();
args.finish(); args.finish();
@ -92,24 +166,48 @@ async fn main() -> Result<()> {
if args.config.as_ref().unwrap() == &PathBuf::from_str("-").unwrap() { if args.config.as_ref().unwrap() == &PathBuf::from_str("-").unwrap() {
tokio::io::stdin().read_to_string(&mut config).await?; tokio::io::stdin().read_to_string(&mut config).await?;
} else { } else {
tokio::fs::File::open(args.config.as_ref().unwrap()).await?.read_to_string(&mut config).await?; tokio::fs::File::open(args.config.as_ref().unwrap())
.await?
.read_to_string(&mut config)
.await?;
} }
let config: Config = serde_yaml::from_str(&config)?; let config: Config = serde_yaml::from_str(&config)?;
let config = std::sync::Arc::new(config); let config = std::sync::Arc::new(config);
let programs = std::sync::Arc::new(tokio::sync::Mutex::new(config.programs.clone()));
let mut connection = Connection::connect((i3ipc::get_socket_path().await?).as_ref())?; let mut connection = Connection::connect((i3ipc::get_socket_path().await?).as_ref())?;
let mut sub_connection = connection.clone(); let mut sub_connection = connection.clone();
let cb_config = config.clone(); let cb_config = config.clone();
let cb_args = args.clone(); let cb_args = args.clone();
let cb = move |a, b| {new_window_cb(a, b, &cb_config, &cb_args)};
let (tx, mut rx) = tokio::sync::broadcast::channel::<()>(1);
let cb_programs = programs.clone();
let cb = move |a, b| new_window_cb(a, b, &cb_config, &cb_args, &cb_programs, &tx);
sub_connection sub_connection
.subscribe(&[MessageType::SubWindow], &cb) .subscribe(&[MessageType::SubWindow], &cb)
.await?; .await?;
tokio::join!( tokio::join!(
timeout(Duration::from_millis(config.timeout as u64), sub_connection.run()), timeout(
Duration::from_millis(config.timeout),
sub_connection.run(&mut rx)
),
run(&mut connection, &config), run(&mut connection, &config),
) )
.1?; .1?;
{
let p = programs.lock().await;
if p.len() != 0 {
warn!("Not all programs consumed: {:?}", &p);
info!("Maybe the timouts are too short?");
}
}
if let Some(cmd) = &config.cmd {
connection
.communicate(&MessageType::Command, cmd.as_bytes())
.await?;
}
Ok(()) Ok(())
} }