rework signals handler. status OK

pull/9/head
prplV 2024-09-09 14:40:50 +03:00
parent 75d348f3b8
commit deebb77ffb
5 changed files with 47 additions and 29 deletions

2
Cargo.lock generated
View File

@ -499,7 +499,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]] [[package]]
name = "runner-rs" name = "runner-rs"
version = "0.5.1" version = "0.5.5"
dependencies = [ dependencies = [
"chrono", "chrono",
"env_logger", "env_logger",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "runner-rs" name = "runner-rs"
version = "0.5.1" version = "0.5.5"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -9,7 +9,7 @@ mod signals;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use log::error; use log::{error, info};
use structs::*; use structs::*;
use config::*; use config::*;
use utils::*; use utils::*;
@ -38,6 +38,8 @@ async fn main() {
return; return;
} }
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![]; let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// is in need to send to the signals handler thread
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
for proc in processes.processes.iter() { for proc in processes.processes.iter() {
log::info!("Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", log::info!("Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
@ -50,9 +52,11 @@ async fn main() {
// creating msg channel // creating msg channel
// can or should be executed in new thread // can or should be executed in new thread
let (tx, mut rx) = mpsc::channel::<u8>(1); let (tx, mut rx) = mpsc::channel::<u8>(1);
let proc = Arc::new(proc.clone()); let proc = Arc::new(proc.clone());
let tx = Arc::new(tx.clone()); let tx = Arc::new(tx.clone());
senders.push(Arc::clone(&tx.clone()));
let event = tokio::spawn(async move { let event = tokio::spawn(async move {
run_daemons(proc.clone(), tx.clone(), &mut rx).await; run_daemons(proc.clone(), tx.clone(), &mut rx).await;
}); });
@ -61,7 +65,7 @@ async fn main() {
// destructor addition // destructor addition
handler.push( handler.push(
tokio::spawn(async move { tokio::spawn(async move {
if let Err(_) = set_valid_destructor(Arc::new(&processes.processes)).await { if let Err(_) = set_valid_destructor(Arc::new(senders)).await {
error!("Linux signals handler creation failed. Returning..."); error!("Linux signals handler creation failed. Returning...");
return; return;
} }
@ -70,5 +74,6 @@ async fn main() {
for i in handler { for i in handler {
i.await.unwrap(); i.await.unwrap();
} }
info!("End of job. Terminating main thread...");
return; return;
} }

View File

@ -1,28 +1,27 @@
use crate::structs::{CustomError, TrackingProcess}; use crate::structs::CustomError;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::io; use tokio::io;
use tokio::{ use tokio::{
select, select,
signal::unix::{signal, Signal, SignalKind}, signal::unix::{signal, Signal, SignalKind},
}; };
use tokio::sync::mpsc;
type PendingProcesses<'a> = Arc<&'a Vec<TrackingProcess>>; type SendersVec = Arc<Vec<Arc<mpsc::Sender<u8>>>>;
pub async fn set_valid_destructor<'a>(prcs: PendingProcesses<'a>) -> Result<(), CustomError> { pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> {
let (mut int, mut term, mut stop) = ( let (mut int, mut term, mut stop) = (
Sig::new(Signals::Sigint), Sig::new(Signals::Sigint, senders.clone()),
Sig::new(Signals::Sigterm), Sig::new(Signals::Sigterm, senders.clone()),
Sig::new(Signals::Sigstop), Sig::new(Signals::Sigstop, senders.clone()),
); );
// todo: select! for handlers and exec destructor
select! { select! {
_ = int.post_processing(prcs.clone()) => {log::info!("Interrupting main thread...")}, _ = int.post_processing() => {log::info!("Initializing interruption...")},
_ = term.post_processing(prcs.clone()) => {log::info!("Terminating main thread...")}, _ = term.post_processing() => {log::info!("Initializing termination...")},
_ = stop.post_processing(prcs.clone()) => {log::info!("Freezing main thread...")}, _ = stop.post_processing() => {log::info!("Initializing freezing...")},
} }
std::process::exit(1); Ok(())
} }
enum Signals { enum Signals {
Sigint, Sigint,
@ -32,12 +31,14 @@ enum Signals {
struct Sig { struct Sig {
signal: Signal, signal: Signal,
sig_type: Signals, sig_type: Signals,
senders: SendersVec,
} }
impl Sig { impl Sig {
fn new(signal_type: Signals) -> Self { fn new(signal_type: Signals, sends: SendersVec) -> Self {
Sig { Sig {
signal: signal_type.get_signal().unwrap(), signal: signal_type.get_signal().unwrap(),
sig_type: signal_type, sig_type: signal_type,
senders: sends,
} }
} }
} }
@ -59,18 +60,17 @@ impl Signals {
} }
} }
} }
trait SigPostProcessing<'a> { trait SigPostProcessing {
async fn post_processing(&mut self, prcs: PendingProcesses<'a>) -> io::Result<()>; async fn post_processing(&mut self) -> io::Result<()>;
} }
impl<'a> SigPostProcessing<'a> for Sig { impl SigPostProcessing for Sig {
async fn post_processing(&mut self, prcs: PendingProcesses<'a>) -> io::Result<()> { async fn post_processing(&mut self) -> io::Result<()> {
// manipulations ... // manipulations ...
if let Some(_) = self.signal.recv().await { if let Some(_) = self.signal.recv().await {
log::info!("Got {}", self.sig_type); log::info!("Got {}", self.sig_type);
prcs.iter().for_each(|proc| { for prc in self.senders.clone().iter() {
log::info!("Terminating {}", &proc.name); prc.send(111).await.unwrap();
let _ = crate::prcs::terminate_process(&proc.name); }
});
} }
Ok(()) Ok(())
} }

View File

@ -124,6 +124,20 @@ pub async fn run_daemons(
} }
break; break;
}, },
// 111 - global thread termination with killing current child in a face
// of a current process
111 => {
warn!("Terminating {}'s child processes...", &proc.name);
match is_active(&proc.name).await {
true => {
terminate_process(&proc.name).await;
},
false => {
log::info!("Process {} is already terminated!", proc.name);
},
}
break;
},
_ => {}, _ => {},
} }
}, },
@ -139,7 +153,6 @@ pub async fn running_handler
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>> watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
) )
{ {
// println!("running daemon on {}", prc.name);
// services and files check (once) // services and files check (once)
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone()); let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone());
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
@ -157,7 +170,7 @@ pub async fn running_handler
tx.send(10).await.unwrap(); tx.send(10).await.unwrap();
return; return;
} }
tokio::time::sleep(Duration::from_millis(100)).await; // tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }