From 59edb218217ab7e0c26b9a289c9a221ddd47241e Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 5 Sep 2024 10:51:29 +0300 Subject: [PATCH 1/2] #refactoring --- src/config.rs | 43 ++++++++++++++++--------------------------- src/files.rs | 9 ++++----- src/logger.rs | 9 +++++---- src/main.rs | 1 + src/services.rs | 10 +++++----- src/signals.rs | 0 src/structs.rs | 6 +++--- 7 files changed, 34 insertions(+), 44 deletions(-) create mode 100644 src/signals.rs diff --git a/src/config.rs b/src/config.rs index a7c714d..615a53f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,26 +7,15 @@ use tokio::time::Duration; static CONFIG_PATH : &'static str = "settings.json"; // 4ever sync fn load_processes(json_filename: &str) -> Option{ - match fs::read_to_string(json_filename) { - Ok(res) => { - match serde_json::from_str::(&res) { - Ok(conf) => { - return Some(conf); - }, - Err(_) => { - return None; - }, - } - }, - Err(_) => { - return None; - }, + if let Ok(res) = fs::read_to_string(json_filename) { + if let Ok(conf) = serde_json::from_str::(&res) { + return Some(conf); + } } + None } pub fn get_actual_config() -> Option { - - // todo: local check Some|None -> redis check // * if no conf -> loop and +inf getting conf from redis server // let mut local = load_processes(&CONFIG_PATH); match load_processes(&CONFIG_PATH) { @@ -46,7 +35,7 @@ pub fn get_actual_config() -> Option { }, } } - return Some(local_conf); + Some(local_conf) }, None => { // ? ? OUTSTANDING CONSTRUCTION ? @@ -59,7 +48,7 @@ pub fn get_actual_config() -> Option { } } // ! once iter exec -// ! only for situation when local isnt None (no need to fck redis server) +// ! only for situation when local isn't None (no need to fck redis server) fn once_get_remote_configuration(serv_info: &str) -> Option { match redis::Client::open(serv_info) { Ok(client) => { @@ -68,13 +57,13 @@ fn once_get_remote_configuration(serv_info: &str) -> Option { if let Ok(len) = conn.xlen::<&str, usize>("config_stream") { if len == 0 { warn!("No configuration in DB yet"); - return None; + None } else { let conf: redis::RedisResult)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); let config: &Vec<(String, Vec<(String, String)>)>; if conf.is_ok() { - // guarranted safe unwrapping + // guaranteed and safe unwrapping let conf = conf.unwrap(); config = &conf[0]; if config.is_empty() { @@ -82,31 +71,31 @@ fn once_get_remote_configuration(serv_info: &str) -> Option { return None; } match parse_extern_config(&config[0].1[0].1) { - Some(prcs) => return Some(prcs), + Some(prcs) => Some(prcs), None => { error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); - return None; + None }, } } else { error!("Configuration pulling from Redis stream failed. Check stream state!"); - return None; + None } } } else { error!("Cannot find config_stream. Check Redis-stream accessibility!"); - return None; + None } }, Err(_) => { error!("Redis connection attempt is failed. Check Redis configuration!"); - return None; + None }, } }, Err(_) => { error!("Redis-Client opening attempt is failed. Check network configuration!"); - return None; + None }, } } @@ -117,7 +106,7 @@ fn open_watcher(serv_info: &str) -> redis::Client { loop { match redis::Client::open(serv_info) { Ok(redis) => { - info!("Succesfully opened Redis-Client"); + info!("Successfully opened Redis-Client"); return redis }, Err(_) => { diff --git a/src/files.rs b/src/files.rs index 1047a9e..d1d10ee 100644 --- a/src/files.rs +++ b/src/files.rs @@ -10,7 +10,7 @@ use std::borrow::BorrowMut; pub async fn create_watcher(filename: &str, path: &str) -> Result { let src = format!("{}{}", path, filename); - let mut inotify = Inotify::init().unwrap_or_else(|_| { + let inotify: Inotify = Inotify::init().unwrap_or_else(|_| { error!("{}",format!("Cannot create watcher for {}", &src)); std::process::exit(101); }); @@ -32,7 +32,6 @@ pub async fn file_handler watchers: Arc>> ) -> Result<(), CustomError> { - // println!("file daemon on {}", name); for (i, file) in files.iter().enumerate() { // let src = format!("{}{}", file.src, file.filename); if check_file(&file.filename, &file.src).await.is_err() { @@ -81,7 +80,7 @@ pub async fn file_handler if let EventMask::DELETE_SELF = event { // ! warning (DELETE_SELF event) ! // println!("! warning (DELETE_SELF event) !"); - // * watcher recreation after dealing with file recreation mech in text editors + // * watcher recreation after dealing with file recreation mechanism in text editors let mutex = notify.borrow_mut(); *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); @@ -114,8 +113,8 @@ pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let arc_name = Arc::new(filename.to_string()); let arc_path = Arc::new(path.to_string()); tokio::task::spawn_blocking(move || { - let fileconcat = format!("{}{}", arc_path, arc_name); - let path = Path::new(&fileconcat); + let file_concat = format!("{}{}", arc_path, arc_name); + let path = Path::new(&file_concat); if path.exists() { Ok(()) } else { diff --git a/src/logger.rs b/src/logger.rs index 2e0a449..10e92b1 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -1,11 +1,11 @@ -use std::fs::OpenOptions; -use std::process::Command; +// use std::fs::OpenOptions; +// use std::process::Command; use chrono::Local; use env_logger::Builder; use std::io::Write; use log::LevelFilter; -use tokio::fs::File; -use tokio::io::BufWriter; +// use tokio::fs::File; +// use tokio::io::BufWriter; use crate::utils::get_container_id; // struct CustomLogger { @@ -38,6 +38,7 @@ pub fn setup_logger() -> Result<(), crate::structs::CustomError> { }) .filter(None, LevelFilter::Info) .target(env_logger::Target::Stdout) + // temporary deprecated // .target(env_logger::Target::Pipe(log_target)) .init(); diff --git a/src/main.rs b/src/main.rs index 0c557d5..7376b83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod prcs; mod utils; mod services; mod logger; +mod signals; use tokio::sync::mpsc; use std::sync::Arc; diff --git a/src/services.rs b/src/services.rs index c4025e8..8f16bcb 100644 --- a/src/services.rs +++ b/src/services.rs @@ -66,7 +66,7 @@ async fn looped_service_connecting( }, } } - return Ok(()); + Ok(()) } else { let start = Instant::now(); while start.elapsed().as_secs() < serv.triggers.wait.into() { @@ -82,7 +82,7 @@ async fn looped_service_connecting( }, } } - return Err(CustomError::Fatal); + Err(CustomError::Fatal) } } @@ -94,11 +94,11 @@ async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { match addr.to_socket_addrs() { Ok(mut addrs) => { if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) { - return Ok(()); + Ok(()) } else { - return Err(CustomError::Fatal); + Err(CustomError::Fatal) } }, - Err(_) => return Err(CustomError::Fatal), + Err(_) => Err(CustomError::Fatal), } } \ No newline at end of file diff --git a/src/signals.rs b/src/signals.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/structs.rs b/src/structs.rs index 74f8bfd..3872e3d 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -1,6 +1,6 @@ use serde::{ Deserialize, Serialize }; -/// # an Error enum (nextly will be deleted and replaced) +/// # an Error enum (next will be deleted and replaced) pub enum CustomError { Fatal, } @@ -30,7 +30,7 @@ pub struct TrackingProcess { pub dependencies: Dependencies, } -/// # struct for processes' dependecies including files and services +/// # struct for processes' dependencies including files and services /// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Dependencies { @@ -68,7 +68,7 @@ pub struct ServiceTriggers { pub on_lost : String, } -/// # struct for instancing each file's policies such as ondelete or onupdate events +/// # struct for instancing each file's policies such as on-delete or onupdate events /// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct FIleTriggers { From c63d7177ba716b860e361528f596572920167d27 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 5 Sep 2024 14:55:38 +0300 Subject: [PATCH 2/2] added linux signals handling + new aliases --- .cargo/config.toml | 3 +- src/main.rs | 10 ++++++ src/signals.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++ src/utils.rs | 6 +--- 4 files changed, 89 insertions(+), 6 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index e02e0f9..f09e756 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -3,8 +3,9 @@ runner = "riscv64-unknown-elf-gdb -q -x gdb_init" linker = "riscv64-linux-gnu-gcc" [alias] -x86_64 = "build --release" +x86_64 = "build --release --target x86_64-unknown-linux-gnu" riscv64 = "build --release --target riscv64gc-unknown-linux-gnu" +rbuild = "build --release" [build] target = "x86_64-unknown-linux-gnu" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 7376b83..737c3d2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use structs::*; use config::*; use utils::*; use logger::setup_logger; +use signals::set_valid_destructor; #[tokio::main(flavor = "multi_thread")] @@ -57,6 +58,15 @@ async fn main() { }); handler.push(event); } + // destructor addition + handler.push( + tokio::spawn(async move { + if let Err(_) = set_valid_destructor(Arc::new(&processes.processes)).await { + error!("Linux signals handler creation failed. Returning..."); + return; + } + }) + ); for i in handler { i.await.unwrap(); } diff --git a/src/signals.rs b/src/signals.rs index e69de29..ee13053 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -0,0 +1,76 @@ +use crate::structs::{CustomError, TrackingProcess}; +use std::sync::Arc; +use tokio::io; +use tokio::{ + select, + signal::unix::{signal, Signal, SignalKind}, +}; + +type PendingProcesses<'a> = Arc<&'a Vec>; + +pub async fn set_valid_destructor<'a>(prcs: PendingProcesses<'a>) -> Result<(), CustomError> { + let (mut int, mut term, mut stop) = ( + Sig::new(Signals::Sigint), + Sig::new(Signals::Sigterm), + Sig::new(Signals::Sigstop), + ); + + // todo: select! for handlers and exec destructor + select! { + _ = int.post_processing(prcs.clone()) => {log::info!("Interrupting main thread...")}, + _ = term.post_processing(prcs.clone()) => {log::info!("Terminating main thread...")}, + _ = stop.post_processing(prcs.clone()) => {log::info!("Freezing main thread...")}, + } + std::process::exit(1); +} +enum Signals { + Sigint, + Sigterm, + Sigstop, +} +struct Sig { + signal: Signal, + sig_type: Signals, +} +impl Sig { + fn new(signal_type: Signals) -> Self { + Sig { + signal: signal_type.get_signal().unwrap(), + sig_type: signal_type, + } + } +} +impl std::fmt::Display for Signals { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Signals::Sigint => write!(f, "SIGINT"), + Signals::Sigterm => write!(f, "SIGTERM"), + Signals::Sigstop => write!(f, "SIGSTOP"), + } + } +} +impl Signals { + fn get_signal(&self) -> io::Result { + match self { + Signals::Sigint => signal(SignalKind::interrupt()), + Signals::Sigterm => signal(SignalKind::terminate()), + Signals::Sigstop => signal(SignalKind::quit()), + } + } +} +trait SigPostProcessing<'a> { + async fn post_processing(&mut self, prcs: PendingProcesses<'a>) -> io::Result<()>; +} +impl<'a> SigPostProcessing<'a> for Sig { + async fn post_processing(&mut self, prcs: PendingProcesses<'a>) -> io::Result<()> { + // manipulations ... + if let Some(_) = self.signal.recv().await { + log::info!("Got {}", self.sig_type); + prcs.iter().for_each(|proc| { + log::info!("Terminating {}", &proc.name); + let _ = crate::prcs::terminate_process(&proc.name); + }) + } + Ok(()) + } +} diff --git a/src/utils.rs b/src/utils.rs index 171b957..ee18711 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,13 +1,10 @@ -use std::fs::OpenOptions; use std::sync::Arc; use crate::structs::TrackingProcess; use tokio::sync::mpsc; use inotify::Inotify; use std::process::Command; -use chrono::Local; -use env_logger::Builder; use crate::files::create_watcher; -use log::{error, warn, LevelFilter}; +use log::{error, warn}; use crate::prcs::{ is_active, is_frozen, @@ -19,7 +16,6 @@ use crate::prcs::{ }; use tokio::time::Duration; use tokio::join; -use std::io::Write; use crate::files::file_handler; use crate::services::service_handler;