From deebb77ffb4a454278b447d24bd53c31bc973efa Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 9 Sep 2024 14:40:50 +0300 Subject: [PATCH] rework signals handler. status OK --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 11 ++++++++--- src/signals.rs | 44 ++++++++++++++++++++++---------------------- src/utils.rs | 17 +++++++++++++++-- 5 files changed, 47 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 029d9d5..2706e0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -499,7 +499,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "runner-rs" -version = "0.5.1" +version = "0.5.5" dependencies = [ "chrono", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index 4fac11d..258c689 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "runner-rs" -version = "0.5.1" +version = "0.5.5" edition = "2021" [dependencies] diff --git a/src/main.rs b/src/main.rs index 737c3d2..2d79fa1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ mod signals; use tokio::sync::mpsc; use std::sync::Arc; -use log::error; +use log::{error, info}; use structs::*; use config::*; use utils::*; @@ -38,6 +38,8 @@ async fn main() { return; } let mut handler: Vec> = vec![]; + // is in need to send to the signals handler thread + let mut senders: Vec>> = vec![]; for proc in processes.processes.iter() { log::info!("Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", @@ -50,9 +52,11 @@ async fn main() { // creating msg channel // can or should be executed in new thread let (tx, mut rx) = mpsc::channel::(1); - let proc = Arc::new(proc.clone()); let tx = Arc::new(tx.clone()); + + senders.push(Arc::clone(&tx.clone())); + let event = tokio::spawn(async move { run_daemons(proc.clone(), tx.clone(), &mut rx).await; }); @@ -61,7 +65,7 @@ async fn main() { // destructor addition handler.push( tokio::spawn(async move { - if let Err(_) = set_valid_destructor(Arc::new(&processes.processes)).await { + if let Err(_) = set_valid_destructor(Arc::new(senders)).await { error!("Linux signals handler creation failed. Returning..."); return; } @@ -70,5 +74,6 @@ async fn main() { for i in handler { i.await.unwrap(); } + info!("End of job. Terminating main thread..."); return; } \ No newline at end of file diff --git a/src/signals.rs b/src/signals.rs index 08f350e..f0a1a40 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -1,28 +1,27 @@ -use crate::structs::{CustomError, TrackingProcess}; +use crate::structs::CustomError; use std::sync::Arc; -use std::time::Duration; use tokio::io; use tokio::{ select, signal::unix::{signal, Signal, SignalKind}, }; +use tokio::sync::mpsc; -type PendingProcesses<'a> = Arc<&'a Vec>; +type SendersVec = Arc>>>; -pub async fn set_valid_destructor<'a>(prcs: PendingProcesses<'a>) -> Result<(), CustomError> { +pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> { let (mut int, mut term, mut stop) = ( - Sig::new(Signals::Sigint), - Sig::new(Signals::Sigterm), - Sig::new(Signals::Sigstop), + Sig::new(Signals::Sigint, senders.clone()), + Sig::new(Signals::Sigterm, senders.clone()), + Sig::new(Signals::Sigstop, senders.clone()), ); - - // todo: select! for handlers and exec destructor + select! { - _ = int.post_processing(prcs.clone()) => {log::info!("Interrupting main thread...")}, - _ = term.post_processing(prcs.clone()) => {log::info!("Terminating main thread...")}, - _ = stop.post_processing(prcs.clone()) => {log::info!("Freezing main thread...")}, + _ = int.post_processing() => {log::info!("Initializing interruption...")}, + _ = term.post_processing() => {log::info!("Initializing termination...")}, + _ = stop.post_processing() => {log::info!("Initializing freezing...")}, } - std::process::exit(1); + Ok(()) } enum Signals { Sigint, @@ -32,12 +31,14 @@ enum Signals { struct Sig { signal: Signal, sig_type: Signals, + senders: SendersVec, } impl Sig { - fn new(signal_type: Signals) -> Self { + fn new(signal_type: Signals, sends: SendersVec) -> Self { Sig { signal: signal_type.get_signal().unwrap(), sig_type: signal_type, + senders: sends, } } } @@ -59,18 +60,17 @@ impl Signals { } } } -trait SigPostProcessing<'a> { - async fn post_processing(&mut self, prcs: PendingProcesses<'a>) -> io::Result<()>; +trait SigPostProcessing { + async fn post_processing(&mut self) -> io::Result<()>; } -impl<'a> SigPostProcessing<'a> for Sig { - async fn post_processing(&mut self, prcs: PendingProcesses<'a>) -> io::Result<()> { +impl SigPostProcessing for Sig { + async fn post_processing(&mut self) -> io::Result<()> { // manipulations ... if let Some(_) = self.signal.recv().await { log::info!("Got {}", self.sig_type); - prcs.iter().for_each(|proc| { - log::info!("Terminating {}", &proc.name); - let _ = crate::prcs::terminate_process(&proc.name); - }); + for prc in self.senders.clone().iter() { + prc.send(111).await.unwrap(); + } } Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index ee18711..62f24e5 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -124,6 +124,20 @@ pub async fn run_daemons( } break; }, + // 111 - global thread termination with killing current child in a face + // of a current process + 111 => { + warn!("Terminating {}'s child processes...", &proc.name); + match is_active(&proc.name).await { + true => { + terminate_process(&proc.name).await; + }, + false => { + log::info!("Process {} is already terminated!", proc.name); + }, + } + break; + }, _ => {}, } }, @@ -139,7 +153,6 @@ pub async fn running_handler watchers: Arc>> ) { - // println!("running daemon on {}", prc.name); // services and files check (once) let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone()); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); @@ -157,7 +170,7 @@ pub async fn running_handler tx.send(10).await.unwrap(); return; } - tokio::time::sleep(Duration::from_millis(100)).await; + // tokio::time::sleep(Duration::from_millis(100)).await; tokio::task::yield_now().await; }