From 8c71453f2930b1589fb0b930e8194e89176de0ea Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 10 Sep 2024 17:43:19 +0300 Subject: [PATCH] refactor + service checking bug with repeating twice --- src/config.rs | 118 ++++++++++++++++++++++++------------------------ src/files.rs | 70 +++++++++++++--------------- src/logger.rs | 23 +++++----- src/main.rs | 45 +++++++++--------- src/prcs.rs | 96 +++++++++++++++++++-------------------- src/services.rs | 89 +++++++++++++++++++++--------------- src/signals.rs | 6 +-- src/structs.rs | 46 +++++++++---------- src/utils.rs | 88 +++++++++++++++++------------------- 9 files changed, 290 insertions(+), 291 deletions(-) diff --git a/src/config.rs b/src/config.rs index 615a53f..e8701ae 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,12 +1,14 @@ -use std::fs; use crate::structs::*; use log::{error, info, warn}; -use redis::{Client, Commands, Connection}; +use redis::{Client, Commands, Connection, RedisResult}; +use std::fs; use tokio::time::Duration; -static CONFIG_PATH : &'static str = "settings.json"; +static CONFIG_PATH: &str = "settings.json"; + +type Res = RedisResult)>>>; // 4ever sync -fn load_processes(json_filename: &str) -> Option{ +fn load_processes(json_filename: &str) -> Option { if let Ok(res) = fs::read_to_string(json_filename) { if let Ok(conf) = serde_json::from_str::(&res) { return Some(conf); @@ -16,41 +18,41 @@ fn load_processes(json_filename: &str) -> Option{ } pub fn get_actual_config() -> Option { - // * if no conf -> loop and +inf getting conf from redis server - // let mut local = load_processes(&CONFIG_PATH); - match load_processes(&CONFIG_PATH) { + // * if no conf -> loop and +inf getting conf from redis server + // let mut local = load_processes(&CONFIG_PATH); + match load_processes(CONFIG_PATH) { Some(local_conf) => { if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") { - match config_comparing(&local_conf, &remote_conf) { + return match config_comparing(&local_conf, &remote_conf) { ConfigActuality::Local => { info!("Local config is actual"); - return Some(local_conf); - }, + Some(local_conf) + } ConfigActuality::Remote => { info!("Pulled config is more actual. Saving changes!"); - if save_new_config(&remote_conf, &CONFIG_PATH).is_err() { + if save_new_config(&remote_conf, CONFIG_PATH).is_err() { error!("Saving changes process failed due to unexpected error...") } - return Some(remote_conf); - }, - } + Some(remote_conf) + } + }; } Some(local_conf) - }, + } None => { - // ? ? OUTSTANDING CONSTRUCTION ? + // ? ? OUTSTANDING CONSTRUCTION ? let mut conn = get_connection_watcher(&open_watcher("redis://localhost")); get_stream_info_watcher(&mut conn); - let remote_config = invalid_config_watcher(&mut conn); - let _ = save_new_config(&remote_config, &CONFIG_PATH); + let remote_config = invalid_config_watcher(&mut conn); + let _ = save_new_config(&remote_config, CONFIG_PATH); Some(remote_config) - }, + } } } // ! once iter exec // ! only for situation when local isn't None (no need to fck redis server) fn once_get_remote_configuration(serv_info: &str) -> Option { - match redis::Client::open(serv_info) { + match Client::open(serv_info) { Ok(client) => { match client.get_connection() { Ok(mut conn) => { @@ -59,15 +61,17 @@ fn once_get_remote_configuration(serv_info: &str) -> Option { warn!("No configuration in DB yet"); None } else { - let conf: redis::RedisResult)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); + let conf: Res = conn.xrevrange_count("config_stream", "+", "-", 1); let config: &Vec<(String, Vec<(String, String)>)>; - + if conf.is_ok() { // guaranteed and safe unwrapping let conf = conf.unwrap(); config = &conf[0]; if config.is_empty() { - error!("Empty config was pulled. Check stream and configs state!"); + error!( + "Empty config was pulled. Check stream and configs state!" + ); return None; } match parse_extern_config(&config[0].1[0].1) { @@ -75,7 +79,7 @@ fn once_get_remote_configuration(serv_info: &str) -> Option { None => { error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); None - }, + } } } else { error!("Configuration pulling from Redis stream failed. Check stream state!"); @@ -86,29 +90,29 @@ fn once_get_remote_configuration(serv_info: &str) -> Option { error!("Cannot find config_stream. Check Redis-stream accessibility!"); None } - }, + } Err(_) => { error!("Redis connection attempt is failed. Check Redis configuration!"); None - }, + } } - }, + } Err(_) => { error!("Redis-Client opening attempt is failed. Check network configuration!"); None - }, + } } } -// ! watchers +// ! watchers -fn open_watcher(serv_info: &str) -> redis::Client { +fn open_watcher(serv_info: &str) -> Client { loop { - match redis::Client::open(serv_info) { + match Client::open(serv_info) { Ok(redis) => { info!("Successfully opened Redis-Client"); - return redis - }, + return redis; + } Err(_) => { error!("Redis-Client opening attempt is failed. Check network configuration! Retrying..."); std::thread::sleep(Duration::from_secs(4)); @@ -121,11 +125,13 @@ fn get_connection_watcher(client: &Client) -> Connection { loop { match client.get_connection() { Ok(conn) => { - info!("Succesfully got Redis connection object"); + info!("Successfully got Redis connection object"); return conn; - }, + } Err(_) => { - error!("Redis connection attempt is failed. Check Redis configuration! Retrying..."); + error!( + "Redis connection attempt is failed. Check Redis configuration! Retrying..." + ); std::thread::sleep(Duration::from_secs(4)); } } @@ -144,15 +150,14 @@ fn get_stream_info_watcher(conn: &mut Connection) { } } fn invalid_config_watcher(conn: &mut Connection) -> Processes { - // let res: redis::RedisResult)>>>; loop { - let res: redis::RedisResult)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); + let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1); if res.is_ok() { let config = &res.unwrap()[0]; if !config.is_empty() { - if let Some(conf) = parse_extern_config(&config[0].1[0].1) { - return conf; - } + if let Some(conf) = parse_extern_config(&config[0].1[0].1) { + return conf; + } } } error!("Got INVALID configuration. Update config! Retrying..."); @@ -160,21 +165,20 @@ fn invalid_config_watcher(conn: &mut Connection) -> Processes { } } -// ! end of watchers +// ! end of watchers fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { let local_date: u64 = local.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap(); match local_date.cmp(&remote_date) { - std::cmp::Ordering::Equal | - std::cmp::Ordering::Greater => return ConfigActuality::Local, - std::cmp::Ordering::Less => return ConfigActuality::Remote, + std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => ConfigActuality::Local, + std::cmp::Ordering::Less => ConfigActuality::Remote, } } -// ! TEMPORARLY DEPRICATED ! -// fn native_date_from_milis(mls: &str) -> Option> { +// ! TEMPORARILY DEPRECATED ! +// fn native_date_from_millis(mls: &str) -> Option> { // match mls.parse::(){ // Ok(val) => return chrono::DateTime::from_timestamp_millis(val), // Err(_) => return None, @@ -183,21 +187,17 @@ fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> { match serde_json::to_string_pretty(&config) { - Ok(st) => { - match fs::write(config_file, st) { - Ok(_) => return Ok(()), - Err(_) => return Err(CustomError::Fatal), - } + Ok(st) => match fs::write(config_file, st) { + Ok(_) => Ok(()), + Err(_) => Err(CustomError::Fatal), }, - Err(_) => return Err(CustomError::Fatal), + Err(_) => Err(CustomError::Fatal), } } fn parse_extern_config(json_string: &str) -> Option { - let des = serde_json::from_str::(json_string); - if des.is_err() { - return None; - } else { - return Some(des.unwrap()); + if let Ok(des) = serde_json::from_str::(json_string){ + return Some(des); } -} \ No newline at end of file + None +} diff --git a/src/files.rs b/src/files.rs index d1d10ee..88c5331 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1,37 +1,30 @@ -use crate::structs::{Files, CustomError}; -use inotify::{ EventMask, Inotify, WatchMask }; +use crate::prcs::{is_active, is_frozen}; +use crate::structs::{CustomError, Files}; +use inotify::{EventMask, Inotify, WatchMask}; use log::error; -use tokio::sync::mpsc; -use crate::prcs::{is_frozen, is_active}; -use tokio::time::Duration; -use std::sync::Arc; -use std::path::Path; use std::borrow::BorrowMut; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::time::Duration; pub async fn create_watcher(filename: &str, path: &str) -> Result { let src = format!("{}{}", path, filename); let inotify: Inotify = Inotify::init().unwrap_or_else(|_| { - error!("{}",format!("Cannot create watcher for {}", &src)); + error!("{}", format!("Cannot create watcher for {}", &src)); std::process::exit(101); - }); - _ = inotify - .watches() - .add( - &src, - WatchMask::ALL_EVENTS - ); - + }); + _ = inotify.watches().add(&src, WatchMask::ALL_EVENTS); + Ok(inotify) } -pub async fn file_handler -( +pub async fn file_handler( name: &str, - files: &Vec, - tx: Arc>, - watchers: Arc>> -) -> Result<(), CustomError> -{ + files: &[Files], + tx: Arc>, + watchers: Arc>>, +) -> Result<(), CustomError> { for (i, file) in files.iter().enumerate() { // let src = format!("{}{}", file.src, file.filename); if check_file(&file.filename, &file.src).await.is_err() { @@ -41,37 +34,37 @@ pub async fn file_handler match file.triggers.on_delete.as_str() { "stay" => { continue; - }, + } "stop" => { if is_active(name).await { tx.send(1).await.unwrap(); } return Err(CustomError::Fatal); - }, + } "hold" => { if is_active(name).await { tx.send(2).await.unwrap(); return Err(CustomError::Fatal); } - }, + } _ => { tokio::time::sleep(Duration::from_millis(50)).await; tx.send(101).await.unwrap(); return Err(CustomError::Fatal); - }, + } } - } else if is_active(name).await && !is_frozen(name).await{ + } else if is_active(name).await && !is_frozen(name).await { let watchers = watchers.clone(); // println!("mutex: {:?}", watchers); - let mut buffer = [0; 128]; + let mut buffer = [0; 128]; let mut mutex_guard = watchers.lock().await; if let Some(notify) = mutex_guard.get_mut(i) { let events = notify.read_events(&mut buffer); // println!("{:?}", events); - if events.is_ok(){ - let events: Vec = events.unwrap() - .into_iter() - .map(|mask| {mask.mask}) + if events.is_ok() { + let events: Vec = events + .unwrap() + .map(|mask| mask.mask) .filter(|mask| { *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF }) @@ -88,16 +81,16 @@ pub async fn file_handler match file.triggers.on_change.as_str() { "stop" => { let _ = tx.send(7).await; - }, + } "restart" => { let _ = tx.send(8).await; - }, + } "stay" => { let _ = tx.send(9).await; - }, + } _ => { let _ = tx.send(101).await; - }, + } } } } @@ -108,7 +101,6 @@ pub async fn file_handler Ok(()) } - pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let arc_name = Arc::new(filename.to_string()); let arc_path = Arc::new(path.to_string()); @@ -125,4 +117,4 @@ pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { .unwrap_or_else(|_| { panic!("Corrupted while file check process"); }) -} \ No newline at end of file +} diff --git a/src/logger.rs b/src/logger.rs index 10e92b1..b8a486d 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -2,8 +2,8 @@ // use std::process::Command; use chrono::Local; use env_logger::Builder; -use std::io::Write; use log::LevelFilter; +use std::io::Write; // use tokio::fs::File; // use tokio::io::BufWriter; use crate::utils::get_container_id; @@ -24,24 +24,23 @@ pub fn setup_logger() -> Result<(), crate::structs::CustomError> { // std::process::exit(1); // })); - // building logger with current output format Builder::new() - .format(move |buf, record|{ - writeln!(buf, - "Container-{}| {} [{}] - {}", - get_container_id().unwrap_or("||NODE|".to_string()), - Local::now().format("%d-%m-%Y %H:%M:%S"), - record.level(), - record.args(), + .format(move |buf, record| { + writeln!( + buf, + "Container-{}| {} [{}] - {}", + get_container_id().unwrap_or("||NODE|".to_string()), + Local::now().format("%d-%m-%Y %H:%M:%S"), + record.level(), + record.args(), ) }) .filter(None, LevelFilter::Info) .target(env_logger::Target::Stdout) - // temporary deprecated + // temporary deprecated // .target(env_logger::Target::Pipe(log_target)) .init(); - Ok(()) -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index 2d79fa1..21e8c72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,20 @@ -mod structs; mod config; mod files; -mod prcs; -mod utils; -mod services; mod logger; +mod prcs; +mod services; mod signals; +mod structs; +mod utils; -use tokio::sync::mpsc; -use std::sync::Arc; -use log::{error, info}; -use structs::*; use config::*; -use utils::*; +use log::{error, info}; use logger::setup_logger; use signals::set_valid_destructor; - +use std::sync::Arc; +use structs::*; +use tokio::sync::mpsc; +use utils::*; #[tokio::main(flavor = "multi_thread")] async fn main() { @@ -30,7 +29,10 @@ async fn main() { std::process::exit(101); }); - log::info!("Current runner configuration: {}", &processes.date_of_creation); + log::info!( + "Current runner configuration: {}", + &processes.date_of_creation + ); log::info!("Runner is ready. Initializing..."); if processes.processes.len() == 0 { @@ -42,7 +44,8 @@ async fn main() { let mut senders: Vec>> = vec![]; for proc in processes.processes.iter() { - log::info!("Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", + log::info!( + "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", proc.name, proc.path, proc.dependencies.files.len(), @@ -62,18 +65,16 @@ async fn main() { }); handler.push(event); } - // destructor addition - handler.push( - tokio::spawn(async move { - if let Err(_) = set_valid_destructor(Arc::new(senders)).await { - error!("Linux signals handler creation failed. Returning..."); - return; - } - }) - ); + // destructor addition + handler.push(tokio::spawn(async move { + if let Err(_) = set_valid_destructor(Arc::new(senders)).await { + error!("Linux signals handler creation failed. Returning..."); + return; + } + })); for i in handler { i.await.unwrap(); } info!("End of job. Terminating main thread..."); return; -} \ No newline at end of file +} diff --git a/src/prcs.rs b/src/prcs.rs index b6ee183..a42b2ea 100644 --- a/src/prcs.rs +++ b/src/prcs.rs @@ -1,65 +1,65 @@ -use std::sync::Arc; -use std::process::{ Command, Output }; -use log::{error, warn}; -use tokio::time::Duration; use crate::structs::CustomError; +use log::{error, warn}; +use std::process::{Command, Output}; +use std::sync::Arc; +use tokio::time::Duration; pub async fn get_pid(name: &str) -> Output { let name = Arc::new(name.to_string()); tokio::task::spawn_blocking(move || { Command::new("pidof") - .arg(&*name) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute command 'pidof'"); - std::process::exit(101); - }) + .arg(&*name) + .output() + .unwrap_or_else(|_| { + error!("Failed to execute command 'pidof'"); + std::process::exit(101); + }) }) .await .unwrap() } -// ! can be with bug !!! +// ! can be with bug !!! // * APPROVED -pub async fn is_active(name: &str)-> bool { +pub async fn is_active(name: &str) -> bool { let arc_name = Arc::new(name.to_string()); tokio::task::spawn_blocking(move || { let output = Command::new("pidof") - .arg(&*arc_name) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute command 'pidof'"); - std::process::exit(101); - }); - !String::from_utf8_lossy(&output.stdout).trim().is_empty() + .arg(&*arc_name) + .output() + .unwrap_or_else(|_| { + error!("Failed to execute command 'pidof'"); + std::process::exit(101); + }); + !String::from_utf8_lossy(&output.stdout).trim().is_empty() }) .await .unwrap() } -// T is for stopped processes +// T is for stopped processes pub async fn is_frozen(name: &str) -> bool { let temp = get_pid(name).await; let pid = String::from_utf8_lossy(&temp.stdout); let pid = pid.trim(); let arc_pid = Arc::new(pid.to_string()); - if pid.is_empty(){ - return false; + if pid.is_empty() { + false } else { tokio::task::spawn_blocking(move || { let cmd = Command::new("ps") - .args(["-o", "stat=", "-p", &arc_pid]) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute ps command"); - std::process::exit(101); - }); + .args(["-o", "stat=", "-p", &arc_pid]) + .output() + .unwrap_or_else(|_| { + error!("Failed to execute ps command"); + std::process::exit(101); + }); String::from_utf8_lossy(&cmd.stdout).contains("T") }) .await .unwrap() } } -pub async fn terminate_process (name: &str) { +pub async fn terminate_process(name: &str) { let _ = Command::new("pkill") .arg(name) .output() @@ -70,27 +70,27 @@ pub async fn terminate_process (name: &str) { } // another test pub async fn freeze_process(name: &str) { - let _ = Command::new("pkill") - .args(["-STOP", name]) - .output() - .unwrap_or_else(|_| { - error!("Failed to freeze process"); - std::process::exit(101); - }); + let _ = Command::new("pkill") + .args(["-STOP", name]) + .output() + .unwrap_or_else(|_| { + error!("Failed to freeze process"); + std::process::exit(101); + }); } pub async fn unfreeze_process(name: &str) { - let _ = Command::new("pkill") - .args(["-CONT", name]) - .output() - .unwrap_or_else(|_| { - error!("Failed to unfreeze process"); - std::process::exit(101); - }); + let _ = Command::new("pkill") + .args(["-CONT", name]) + .output() + .unwrap_or_else(|_| { + error!("Failed to unfreeze process"); + std::process::exit(101); + }); } pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { terminate_process(name).await; tokio::time::sleep(Duration::from_millis(100)).await; - return start_process(name, path).await; + start_process(name, path).await } pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { @@ -102,9 +102,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { Ok(_) => { warn!("Process {} is running now!", name); Ok(()) - }, - Err(_) => { - return Err(CustomError::Fatal) - }, + } + Err(_) => Err(CustomError::Fatal), } -} \ No newline at end of file +} diff --git a/src/services.rs b/src/services.rs index 8f16bcb..b420ad7 100644 --- a/src/services.rs +++ b/src/services.rs @@ -1,69 +1,77 @@ -use std::sync::Arc; -use tokio::time::{ Duration, Instant }; -use tokio::sync::mpsc; -use crate::structs::{Services, CustomError}; use crate::prcs::{is_active, is_frozen}; +use crate::structs::{CustomError, Services}; use log::{error, warn}; use std::net::{TcpStream, ToSocketAddrs}; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; - -pub async fn service_handler(name: &str, services: &Vec, tx: Arc>) -> Result<(), CustomError> { +pub async fn service_handler( + name: &str, + services: &Vec, + tx: Arc>, +) -> Result<(), CustomError> { // println!("service daemon on {}", name); for serv in services { if check_service(&serv.hostname, &serv.port).await.is_err() { if !is_active(name).await || is_frozen(name).await { return Err(CustomError::Fatal); } - error!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); + error!( + "Service {}:{} is unreachable for process {}", + &serv.hostname, &serv.port, &name + ); match serv.triggers.on_lost.as_str() { - "stay" => { - }, + "stay" => {} "stop" => { if looped_service_connecting(name, serv).await.is_err() { tx.send(5).await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(400)).await; return Err(CustomError::Fatal); } - }, + } "hold" => { if is_frozen(name).await { return Err(CustomError::Fatal); } if looped_service_connecting(name, serv).await.is_err() { - tx.send(6).await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; + tx.send(6).await.unwrap(); + tokio::time::sleep(Duration::from_millis(400)).await; return Err(CustomError::Fatal); } - }, + } _ => { tx.send(101).await.unwrap(); return Err(CustomError::Fatal); - }, + } } - } + } } tokio::time::sleep(Duration::from_millis(100)).await; tokio::task::yield_now().await; - Ok(()) + Ok(()) } -async fn looped_service_connecting( - name: &str, - serv: &Services -) -> Result<(), CustomError> -{ +async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { if serv.triggers.wait == 0 { - loop { + loop { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; - warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); + warn!( + "Attempting to connect from {} process to {}:{}", + &name, &serv.hostname, &serv.port + ); match check_service(&serv.hostname, &serv.port).await { Ok(_) => { - log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name); + log::info!( + "Successfully connected to {} from {} process!", + &serv.hostname, + &name + ); break; - }, + } Err(_) => { continue; - }, + } } } Ok(()) @@ -71,34 +79,41 @@ async fn looped_service_connecting( let start = Instant::now(); while start.elapsed().as_secs() < serv.triggers.wait.into() { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; - warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); - match check_service(&serv.hostname, &serv.port).await { + warn!( + "Attempting to connect from {} process to {}:{}", + &name, &serv.hostname, &serv.port + ); + match check_service(&serv.hostname, &serv.port).await { Ok(_) => { - log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name); + log::info!( + "Successfully connected to {} from {} process!", + &serv.hostname, + &name + ); return Ok(()); - }, + } Err(_) => { continue; - }, + } } } Err(CustomError::Fatal) } } -// ! have to be rewritten -// todo: rewrite use +// ! have to be rewritten +// todo: rewrite use async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { let addr = format!("{}:{}", hostname, port); match addr.to_socket_addrs() { Ok(mut addrs) => { - if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) { + if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { Ok(()) } else { Err(CustomError::Fatal) } - }, + } Err(_) => Err(CustomError::Fatal), } -} \ No newline at end of file +} diff --git a/src/signals.rs b/src/signals.rs index f0a1a40..c7b3599 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -1,11 +1,11 @@ use crate::structs::CustomError; use std::sync::Arc; use tokio::io; +use tokio::sync::mpsc; use tokio::{ select, signal::unix::{signal, Signal, SignalKind}, }; -use tokio::sync::mpsc; type SendersVec = Arc>>>; @@ -15,7 +15,7 @@ pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError Sig::new(Signals::Sigterm, senders.clone()), Sig::new(Signals::Sigstop, senders.clone()), ); - + select! { _ = int.post_processing() => {log::info!("Initializing interruption...")}, _ = term.post_processing() => {log::info!("Initializing termination...")}, @@ -67,7 +67,7 @@ impl SigPostProcessing for Sig { async fn post_processing(&mut self) -> io::Result<()> { // manipulations ... if let Some(_) = self.signal.recv().await { - log::info!("Got {}", self.sig_type); + log::info!("Got {} signal", self.sig_type); for prc in self.senders.clone().iter() { prc.send(111).await.unwrap(); } diff --git a/src/structs.rs b/src/structs.rs index 3872e3d..724dfc0 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -1,11 +1,11 @@ -use serde::{ Deserialize, Serialize }; +use serde::{Deserialize, Serialize}; /// # an Error enum (next will be deleted and replaced) pub enum CustomError { Fatal, } pub enum ConfigActuality { - Local, + Local, Remote, } @@ -15,18 +15,18 @@ pub enum ConfigActuality { pub struct Processes { // #[serde(rename="id")] // runner_id: usize, - #[serde(rename="dateOfCreation")] - pub date_of_creation : String, + #[serde(rename = "dateOfCreation")] + pub date_of_creation: String, #[serde(default)] - pub processes : Vec, + pub processes: Vec, } /// # struct for each process to contain info, such as name, path and dependencies /// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TrackingProcess { - pub name : String, - pub path : String, + pub name: String, + pub path: String, pub dependencies: Dependencies, } @@ -35,7 +35,7 @@ pub struct TrackingProcess { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Dependencies { #[serde(default)] - pub files : Vec, + pub files: Vec, #[serde(default)] pub services: Vec, } @@ -44,36 +44,36 @@ pub struct Dependencies { /// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Files { - pub filename : String, - pub src : String, - pub triggers : FIleTriggers, + pub filename: String, + pub src: String, + pub triggers: FIleTriggers, } /// # struct for containing service object with its triggers to manipulate in daemons /// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Services { - pub hostname : String, - pub port : u32, - pub triggers : ServiceTriggers, -} + pub hostname: String, + pub port: u32, + pub triggers: ServiceTriggers, +} /// # struct for instancing each service's policies such as on lost or time to wait till reachable /// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ServiceTriggers { - pub wait : u32, + pub wait: u32, pub delay: u32, - #[serde(rename="onLost")] - pub on_lost : String, + #[serde(rename = "onLost")] + pub on_lost: String, } /// # struct for instancing each file's policies such as on-delete or onupdate events /// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct FIleTriggers { - #[serde(rename="onDelete")] - pub on_delete : String, - #[serde(rename="onChange")] - pub on_change : String, -} \ No newline at end of file + #[serde(rename = "onDelete")] + pub on_delete: String, + #[serde(rename = "onChange")] + pub on_change: String, +} diff --git a/src/utils.rs b/src/utils.rs index 62f24e5..c68e6e2 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,41 +1,37 @@ -use std::sync::Arc; -use crate::structs::TrackingProcess; -use tokio::sync::mpsc; -use inotify::Inotify; -use std::process::Command; use crate::files::create_watcher; -use log::{error, warn}; -use crate::prcs::{ - is_active, - is_frozen, - terminate_process, - restart_process, - freeze_process, - unfreeze_process, - start_process -}; -use tokio::time::Duration; -use tokio::join; use crate::files::file_handler; +use crate::prcs::{ + freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, + unfreeze_process, +}; use crate::services::service_handler; +use crate::structs::TrackingProcess; +use inotify::Inotify; +use log::{error, warn}; +use std::process::Command; +use std::sync::Arc; +use tokio::join; +use tokio::sync::mpsc; +use tokio::time::Duration; -static GET_ID_CMD : &'static str = r"cat /proc/self/mountinfo | grep '/docker/containers/' | head -1 | awk -F '/' '{print \$6}'"; +static GET_ID_CMD: &str = + r"cat /proc/self/mountinfo | grep '/docker/containers/' | head -1 | awk -F '/' '{print \$6}'"; /// # async func to run 3 main daemons (now it's more like tree-form than classical 0.1.0 form ) /// > hint : give mpsc with capacity 1 to jump over potential errors during running process /// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") ** pub async fn run_daemons( - proc: Arc, + proc: Arc, tx: Arc>, - rx: &mut mpsc::Receiver -) -{ + rx: &mut mpsc::Receiver, +) { // creating watchers + ---buffers--- let mut watchers: Vec = vec![]; for file in proc.dependencies.files.clone().into_iter() { watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); } - let watchers_clone: Arc>> = Arc::new(tokio::sync::Mutex::new(watchers)); + let watchers_clone: Arc>> = + Arc::new(tokio::sync::Mutex::new(watchers)); loop { let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); @@ -64,7 +60,7 @@ pub async fn run_daemons( 3 => { error!("Error due to starting {} process", &proc.name); }, - // 4 - Timeout of waiting service-dependency -> staying (after waiting) + // 4 - Timeout of waiting service-dependency -> staying (after waiting) 4 => { warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); }, @@ -138,35 +134,38 @@ pub async fn run_daemons( } break; }, - _ => {}, + _ => {}, } }, } // tokio::task::yield_now().await; - } + } } -// check process status daemon -pub async fn running_handler -( - prc: Arc, - tx: Arc>, - watchers: Arc>> -) -{ - // services and files check (once) - let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone()); +// check process status daemon +pub async fn running_handler( + prc: Arc, + tx: Arc>, + watchers: Arc>>, +) { + // services and files check (once) + let files_check = file_handler( + &prc.name, + &prc.dependencies.files, + tx.clone(), + watchers.clone(), + ); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); let res = join!(files_check, services_check); - // if inactive -> spawn checks -> active is true - if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ + // if inactive -> spawn checks -> active is true + if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { if start_process(&prc.name, &prc.path).await.is_err() { tx.send(3).await.unwrap(); return; } - } + } // if frozen -> spawn checks -> unfreeze is true - else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ + else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { tx.send(10).await.unwrap(); return; } @@ -174,15 +173,10 @@ pub async fn running_handler tokio::task::yield_now().await; } - // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' pub fn get_container_id() -> Option { match Command::new("sh -c").arg(GET_ID_CMD).output() { - Ok(output) => { - Some(String::from_utf8_lossy(&output.stdout).to_string()) - }, - Err(_) => { - None - }, + Ok(output) => Some(String::from_utf8_lossy(&output.stdout).to_string()), + Err(_) => None, } }