diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..a7c714d --- /dev/null +++ b/src/config.rs @@ -0,0 +1,214 @@ +use std::fs; +use crate::structs::*; +use log::{error, info, warn}; +use redis::{Client, Commands, Connection}; +use tokio::time::Duration; + +static CONFIG_PATH : &'static str = "settings.json"; +// 4ever sync +fn load_processes(json_filename: &str) -> Option{ + match fs::read_to_string(json_filename) { + Ok(res) => { + match serde_json::from_str::(&res) { + Ok(conf) => { + return Some(conf); + }, + Err(_) => { + return None; + }, + } + }, + Err(_) => { + return None; + }, + } +} + +pub fn get_actual_config() -> Option { + + // todo: local check Some|None -> redis check + // * if no conf -> loop and +inf getting conf from redis server + // let mut local = load_processes(&CONFIG_PATH); + match load_processes(&CONFIG_PATH) { + Some(local_conf) => { + if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") { + match config_comparing(&local_conf, &remote_conf) { + ConfigActuality::Local => { + info!("Local config is actual"); + return Some(local_conf); + }, + ConfigActuality::Remote => { + info!("Pulled config is more actual. Saving changes!"); + if save_new_config(&remote_conf, &CONFIG_PATH).is_err() { + error!("Saving changes process failed due to unexpected error...") + } + return Some(remote_conf); + }, + } + } + return Some(local_conf); + }, + None => { + // ? ? OUTSTANDING CONSTRUCTION ? + let mut conn = get_connection_watcher(&open_watcher("redis://localhost")); + get_stream_info_watcher(&mut conn); + let remote_config = invalid_config_watcher(&mut conn); + let _ = save_new_config(&remote_config, &CONFIG_PATH); + Some(remote_config) + }, + } +} +// ! once iter exec +// ! only for situation when local isnt None (no need to fck redis server) +fn once_get_remote_configuration(serv_info: &str) -> Option { + match redis::Client::open(serv_info) { + Ok(client) => { + match client.get_connection() { + Ok(mut conn) => { + if let Ok(len) = conn.xlen::<&str, usize>("config_stream") { + if len == 0 { + warn!("No configuration in DB yet"); + return None; + } else { + let conf: redis::RedisResult)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); + let config: &Vec<(String, Vec<(String, String)>)>; + + if conf.is_ok() { + // guarranted safe unwrapping + let conf = conf.unwrap(); + config = &conf[0]; + if config.is_empty() { + error!("Empty config was pulled. Check stream and configs state!"); + return None; + } + match parse_extern_config(&config[0].1[0].1) { + Some(prcs) => return Some(prcs), + None => { + error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); + return None; + }, + } + } else { + error!("Configuration pulling from Redis stream failed. Check stream state!"); + return None; + } + } + } else { + error!("Cannot find config_stream. Check Redis-stream accessibility!"); + return None; + } + }, + Err(_) => { + error!("Redis connection attempt is failed. Check Redis configuration!"); + return None; + }, + } + }, + Err(_) => { + error!("Redis-Client opening attempt is failed. Check network configuration!"); + return None; + }, + } +} + +// ! watchers + +fn open_watcher(serv_info: &str) -> redis::Client { + loop { + match redis::Client::open(serv_info) { + Ok(redis) => { + info!("Succesfully opened Redis-Client"); + return redis + }, + Err(_) => { + error!("Redis-Client opening attempt is failed. Check network configuration! Retrying..."); + std::thread::sleep(Duration::from_secs(4)); + } + } + } +} + +fn get_connection_watcher(client: &Client) -> Connection { + loop { + match client.get_connection() { + Ok(conn) => { + info!("Succesfully got Redis connection object"); + return conn; + }, + Err(_) => { + error!("Redis connection attempt is failed. Check Redis configuration! Retrying..."); + std::thread::sleep(Duration::from_secs(4)); + } + } + } +} +fn get_stream_info_watcher(conn: &mut Connection) { + loop { + if let Ok(val) = conn.xlen::<&str, usize>("config_stream") { + if val != 0 { + info!("Redis stream is able and not empty now"); + return; + } + } + error!("Configuration pulling from Redis stream failed. Check stream state! Retrying..."); + std::thread::sleep(Duration::from_secs(4)); + } +} +fn invalid_config_watcher(conn: &mut Connection) -> Processes { + // let res: redis::RedisResult)>>>; + loop { + let res: redis::RedisResult)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); + if res.is_ok() { + let config = &res.unwrap()[0]; + if !config.is_empty() { + if let Some(conf) = parse_extern_config(&config[0].1[0].1) { + return conf; + } + } + } + error!("Got INVALID configuration. Update config! Retrying..."); + std::thread::sleep(Duration::from_secs(4)); + } +} + +// ! end of watchers + +fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { + let local_date: u64 = local.date_of_creation.parse().unwrap(); + let remote_date: u64 = remote.date_of_creation.parse().unwrap(); + + match local_date.cmp(&remote_date) { + std::cmp::Ordering::Equal | + std::cmp::Ordering::Greater => return ConfigActuality::Local, + std::cmp::Ordering::Less => return ConfigActuality::Remote, + } +} + +// ! TEMPORARLY DEPRICATED ! +// fn native_date_from_milis(mls: &str) -> Option> { +// match mls.parse::(){ +// Ok(val) => return chrono::DateTime::from_timestamp_millis(val), +// Err(_) => return None, +// } +// } + +fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> { + match serde_json::to_string_pretty(&config) { + Ok(st) => { + match fs::write(config_file, st) { + Ok(_) => return Ok(()), + Err(_) => return Err(CustomError::Fatal), + } + }, + Err(_) => return Err(CustomError::Fatal), + } +} + +fn parse_extern_config(json_string: &str) -> Option { + let des = serde_json::from_str::(json_string); + if des.is_err() { + return None; + } else { + return Some(des.unwrap()); + } +} \ No newline at end of file diff --git a/src/files.rs b/src/files.rs new file mode 100644 index 0000000..1047a9e --- /dev/null +++ b/src/files.rs @@ -0,0 +1,129 @@ +use crate::structs::{Files, CustomError}; +use inotify::{ EventMask, Inotify, WatchMask }; +use log::error; +use tokio::sync::mpsc; +use crate::prcs::{is_frozen, is_active}; +use tokio::time::Duration; +use std::sync::Arc; +use std::path::Path; +use std::borrow::BorrowMut; + +pub async fn create_watcher(filename: &str, path: &str) -> Result { + let src = format!("{}{}", path, filename); + let mut inotify = Inotify::init().unwrap_or_else(|_| { + error!("{}",format!("Cannot create watcher for {}", &src)); + std::process::exit(101); + }); + _ = inotify + .watches() + .add( + &src, + WatchMask::ALL_EVENTS + ); + + Ok(inotify) +} + +pub async fn file_handler +( + name: &str, + files: &Vec, + tx: Arc>, + watchers: Arc>> +) -> Result<(), CustomError> +{ + // println!("file daemon on {}", name); + for (i, file) in files.iter().enumerate() { + // let src = format!("{}{}", file.src, file.filename); + if check_file(&file.filename, &file.src).await.is_err() { + if !is_active(name).await || is_frozen(name).await { + return Err(CustomError::Fatal); + } + match file.triggers.on_delete.as_str() { + "stay" => { + continue; + }, + "stop" => { + if is_active(name).await { + tx.send(1).await.unwrap(); + } + return Err(CustomError::Fatal); + }, + "hold" => { + if is_active(name).await { + tx.send(2).await.unwrap(); + return Err(CustomError::Fatal); + } + }, + _ => { + tokio::time::sleep(Duration::from_millis(50)).await; + tx.send(101).await.unwrap(); + return Err(CustomError::Fatal); + }, + } + } else if is_active(name).await && !is_frozen(name).await{ + let watchers = watchers.clone(); + // println!("mutex: {:?}", watchers); + let mut buffer = [0; 128]; + let mut mutex_guard = watchers.lock().await; + if let Some(notify) = mutex_guard.get_mut(i) { + let events = notify.read_events(&mut buffer); + // println!("{:?}", events); + if events.is_ok(){ + let events: Vec = events.unwrap() + .into_iter() + .map(|mask| {mask.mask}) + .filter(|mask| { + *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF + }) + .collect(); + for event in events { + if let EventMask::DELETE_SELF = event { + // ! warning (DELETE_SELF event) ! + // println!("! warning (DELETE_SELF event) !"); + // * watcher recreation after dealing with file recreation mech in text editors + let mutex = notify.borrow_mut(); + + *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); + } + match file.triggers.on_change.as_str() { + "stop" => { + let _ = tx.send(7).await; + }, + "restart" => { + let _ = tx.send(8).await; + }, + "stay" => { + let _ = tx.send(9).await; + }, + _ => { + let _ = tx.send(101).await; + }, + } + } + } + } + } + } + tokio::task::yield_now().await; + Ok(()) +} + + +pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { + let arc_name = Arc::new(filename.to_string()); + let arc_path = Arc::new(path.to_string()); + tokio::task::spawn_blocking(move || { + let fileconcat = format!("{}{}", arc_path, arc_name); + let path = Path::new(&fileconcat); + if path.exists() { + Ok(()) + } else { + Err(CustomError::Fatal) + } + }) + .await + .unwrap_or_else(|_| { + panic!("Corrupted while file check process"); + }) +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 8214122..2709728 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,109 +1,23 @@ -use redis::{Client, Commands, Connection}; -// json parsing -use serde::{ Deserialize, Serialize }; -use serde_json; -// async multi-threaded execution -use tokio::time::{ Duration, Instant }; +mod structs; +mod config; +mod files; +mod prcs; +mod utils; +mod services; + use tokio::sync::mpsc; -use tokio::join; -// fatal errors handler -use core::panic; -use std::borrow::BorrowMut; -// utils -use std::fmt::Debug; -use std::fs; -use std::path::Path; -use std::process::{ Command, Output }; +use std::process::Command; use std::sync::Arc; -// file change handler -use inotify::{ EventMask, Inotify, WatchMask }; -// logging use std::io::Write; use chrono::Local; use env_logger::Builder; -use log::{error, info, warn, LevelFilter}; -use std::net::{TcpStream, ToSocketAddrs}; +use log::{error, LevelFilter}; +use structs::*; +use config::*; +use utils::*; -static CONFIG_PATH : &'static str = "settings.json"; static GET_ID_CMD : &'static str = "cat /proc/self/mountinfo | grep '/docker/containers/' | head -1 | awk -F '/' '{print $5}'"; -/// # an Error enum (nextly will be deleted and replaced) -enum CustomError { - Fatal, -} -enum ConfigActuality { - Local, - Remote, -} - -/// # struct for the 1st level in json conf file -/// > (needed in serialization and deserialization) -#[derive(Debug, Serialize, Deserialize, Clone)] -struct Processes { - // #[serde(rename="id")] - // runner_id: usize, - #[serde(rename="dateOfCreation")] - date_of_creation : String, - #[serde(default)] - processes : Vec, -} - -/// # struct for each process to contain info, such as name, path and dependencies -/// > (needed in serialization and deserialization) -#[derive(Debug, Serialize, Deserialize, Clone)] -struct TrackingProcess { - name : String, - path : String, - dependencies: Dependencies, -} - -/// # struct for processes' dependecies including files and services -/// > (needed in serialization and deserialization) -#[derive(Debug, Serialize, Deserialize, Clone)] -struct Dependencies { - #[serde(default)] - files : Vec, - #[serde(default)] - services: Vec, -} - -/// # struct for containing file object with its triggers to manipulate in daemons -/// > (needed in serialization and deserialization) -#[derive(Debug, Serialize, Deserialize, Clone)] -struct Files { - filename : String, - src : String, - triggers : FIleTriggers, -} - -/// # struct for containing service object with its triggers to manipulate in daemons -/// > (needed in serialization and deserialization) -#[derive(Debug, Serialize, Deserialize, Clone)] -struct Services { - hostname : String, - port : u32, - triggers : ServiceTriggers, -} - -/// # struct for instancing each service's policies such as on lost or time to wait till reachable -/// > (needed in serialization and deserialization) -#[derive(Debug, Serialize, Deserialize, Clone)] -struct ServiceTriggers { - wait : u32, - delay: u32, - #[serde(rename="onLost")] - on_lost : String, -} - -/// # struct for instancing each file's policies such as ondelete or onupdate events -/// > (needed in serialization and deserialization) -#[derive(Debug, Serialize, Deserialize, Clone)] -struct FIleTriggers { - #[serde(rename="onDelete")] - on_delete : String, - #[serde(rename="onChange")] - on_change : String, -} #[tokio::main(flavor = "multi_thread")] async fn main() { @@ -165,673 +79,6 @@ async fn main() { return; } -fn get_actual_config() -> Option { - - // todo: local check Some|None -> redis check - // * if no conf -> loop and +inf getting conf from redis server - // let mut local = load_processes(&CONFIG_PATH); - match load_processes(&CONFIG_PATH) { - Some(local_conf) => { - if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") { - match config_comparing(&local_conf, &remote_conf) { - ConfigActuality::Local => { - info!("Local config is actual"); - return Some(local_conf); - }, - ConfigActuality::Remote => { - info!("Pulled config is more actual. Saving changes!"); - if save_new_config(&remote_conf, &CONFIG_PATH).is_err() { - error!("Saving changes process failed due to unexpected error...") - } - return Some(remote_conf); - }, - } - } - return Some(local_conf); - }, - None => { - // ? ? OUTSTANDING CONSTRUCTION ? - let mut conn = get_connection_watcher(&open_watcher("redis://localhost")); - get_stream_info_watcher(&mut conn); - let remote_config = invalid_config_watcher(&mut conn); - let _ = save_new_config(&remote_config, &CONFIG_PATH); - Some(remote_config) - }, - } -} -// ! once iter exec -// ! only for situation when local isnt None (no need to fck redis server) -fn once_get_remote_configuration(serv_info: &str) -> Option { - match redis::Client::open(serv_info) { - Ok(client) => { - match client.get_connection() { - Ok(mut conn) => { - if let Ok(len) = conn.xlen::<&str, usize>("config_stream") { - if len == 0 { - warn!("No configuration in DB yet"); - return None; - } else { - let conf: redis::RedisResult)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); - let config: &Vec<(String, Vec<(String, String)>)>; - - if conf.is_ok() { - // guarranted safe unwrapping - let conf = conf.unwrap(); - config = &conf[0]; - if config.is_empty() { - error!("Empty config was pulled. Check stream and configs state!"); - return None; - } - match parse_extern_config(&config[0].1[0].1) { - Some(prcs) => return Some(prcs), - None => { - error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); - return None; - }, - } - } else { - error!("Configuration pulling from Redis stream failed. Check stream state!"); - return None; - } - } - } else { - error!("Cannot find config_stream. Check Redis-stream accessibility!"); - return None; - } - }, - Err(_) => { - error!("Redis connection attempt is failed. Check Redis configuration!"); - return None; - }, - } - }, - Err(_) => { - error!("Redis-Client opening attempt is failed. Check network configuration!"); - return None; - }, - } -} - -// ! watchers - -fn open_watcher(serv_info: &str) -> redis::Client { - loop { - match redis::Client::open(serv_info) { - Ok(redis) => { - info!("Succesfully opened Redis-Client"); - return redis - }, - Err(_) => { - error!("Redis-Client opening attempt is failed. Check network configuration! Retrying..."); - std::thread::sleep(Duration::from_secs(4)); - } - } - } -} - -fn get_connection_watcher(client: &Client) -> Connection { - loop { - match client.get_connection() { - Ok(conn) => { - info!("Succesfully got Redis connection object"); - return conn; - }, - Err(_) => { - error!("Redis connection attempt is failed. Check Redis configuration! Retrying..."); - std::thread::sleep(Duration::from_secs(4)); - } - } - } -} -fn get_stream_info_watcher(conn: &mut Connection) { - loop { - if let Ok(val) = conn.xlen::<&str, usize>("config_stream") { - if val != 0 { - info!("Redis stream is able and not empty now"); - return; - } - } - error!("Configuration pulling from Redis stream failed. Check stream state! Retrying..."); - std::thread::sleep(Duration::from_secs(4)); - } -} -fn invalid_config_watcher(conn: &mut Connection) -> Processes { - // let res: redis::RedisResult)>>>; - loop { - let res: redis::RedisResult)>>> = conn.xrevrange_count("config_stream", "+", "-", 1); - if res.is_ok() { - let config = &res.unwrap()[0]; - if !config.is_empty() { - if let Some(conf) = parse_extern_config(&config[0].1[0].1) { - return conf; - } - } - } - error!("Got INVALID configuration. Update config! Retrying..."); - std::thread::sleep(Duration::from_secs(4)); - } -} - -// ! end of watchers - -fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { - let local_date: u64 = local.date_of_creation.parse().unwrap(); - let remote_date: u64 = remote.date_of_creation.parse().unwrap(); - - match local_date.cmp(&remote_date) { - std::cmp::Ordering::Equal | - std::cmp::Ordering::Greater => return ConfigActuality::Local, - std::cmp::Ordering::Less => return ConfigActuality::Remote, - } -} - -// ! TEMPORARLY DEPRICATED ! -// fn native_date_from_milis(mls: &str) -> Option> { -// match mls.parse::(){ -// Ok(val) => return chrono::DateTime::from_timestamp_millis(val), -// Err(_) => return None, -// } -// } - -fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> { - match serde_json::to_string_pretty(&config) { - Ok(st) => { - match fs::write(config_file, st) { - Ok(_) => return Ok(()), - Err(_) => return Err(CustomError::Fatal), - } - }, - Err(_) => return Err(CustomError::Fatal), - } -} - -async fn create_watcher(filename: &str, path: &str) -> Result { - let src = format!("{}{}", path, filename); - let mut inotify = Inotify::init().unwrap_or_else(|_| { - error!("{}",format!("Cannot create watcher for {}", &src)); - std::process::exit(101); - }); - _ = inotify - .watches() - .add( - &src, - WatchMask::ALL_EVENTS - ); - - Ok(inotify) -} - -/// # async func to run 3 main daemons (now its more like tree-form than classiacl 0.1.0 form ) -/// > hint : give mpsc with capacity 1 to jump over potential errors during running process -/// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") ** -async fn run_daemons( - proc: Arc, - tx: Arc>, - rx: &mut mpsc::Receiver -) -{ - // creating watchers + ---buffers--- - let mut watchers: Vec = vec![]; - for file in proc.dependencies.files.clone().into_iter() { - watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); - } - let watchers_clone: Arc>> = Arc::new(tokio::sync::Mutex::new(watchers)); - - loop { - let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); - tokio::select! { - _ = run_hand => {}, - _val = rx.recv() => { - match _val.unwrap() { - // 1 - File-dependency handling error -> terminating (after waiting) - 1 => { - if is_active(&proc.name).await { - error!("Dependency handling error: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - // break; - }, - // 2 - File-dependency handling error -> holding (after waiting) - 2 => { - if !is_frozen(&proc.name).await { - error!("Dependency handling error: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - }, - // 3 - Running process error - 3 => { - error!("Error due to starting {} process", &proc.name); - }, - // 4 - Timeout of waiting service-dependency -> staying (after waiting) - 4 => { - warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); - }, - // 5 - Timeout of waiting service-dependency -> terminating (after waiting) - 5 => { - if is_active(&proc.name).await { - error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - // break; - }, - // 6 - Timeout of waiting service-dependency -> holding (after waiting) - 6 => { - if !is_frozen(&proc.name).await { - error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - }, - // // 7 - File-dependency change -> terminating (after check) - 7 => { - error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // // 8 - File-dependency change -> restarting (after check) - 8 => { - warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); - let _ = restart_process(&proc.name, &proc.path).await; - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // // 9 - File-dependency change -> staying (after check) - 9 => { - warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name); - }, - - // 10 - Process unfreaze call via file handler - 10 => { - if is_frozen(&proc.name).await { - warn!("Unfreezing process {} call...", &proc.name); - unfreeze_process(&proc.name).await; - } - }, - // 11 - Process unfreaze call via service handler - 11 => { - if is_frozen(&proc.name).await { - warn!("Unfreezing process {} call...", &proc.name); - unfreeze_process(&proc.name).await; - } - }, - // 101 - Impermissible trigger values in JSON - 101 => { - error!("Impermissible trigger values in JSON"); - if is_active(&proc.name).await { - terminate_process(&proc.name).await; - } - break; - }, - _ => {}, - } - }, - } - // tokio::task::yield_now().await; - } -} - -// 4ever sync -fn load_processes(json_filename: &str) -> Option{ - match fs::read_to_string(json_filename) { - Ok(res) => { - match serde_json::from_str::(&res) { - Ok(conf) => { - return Some(conf); - }, - Err(_) => { - return None; - }, - } - }, - Err(_) => { - return None; - }, - } -} -fn parse_extern_config(json_string: &str) -> Option { - let des = serde_json::from_str::(json_string); - if des.is_err() { - return None; - } else { - return Some(des.unwrap()); - } -} - -async fn get_pid(name: &str) -> Output { - let name = Arc::new(name.to_string()); - tokio::task::spawn_blocking(move || { - Command::new("pidof") - .arg(&*name) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute command 'pidof'"); - std::process::exit(101); - }) - }) - .await - .unwrap() -} -// ! can be with bug !!! -// * APPROVED -async fn is_active(name: &str)-> bool { - let arc_name = Arc::new(name.to_string()); - tokio::task::spawn_blocking(move || { - let output = Command::new("pidof") - .arg(&*arc_name) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute command 'pidof'"); - std::process::exit(101); - }); - !String::from_utf8_lossy(&output.stdout).trim().is_empty() - }) - .await - .unwrap() -} - -// T is for stopped processes -async fn is_frozen(name: &str) -> bool { - let temp = get_pid(name).await; - let pid = String::from_utf8_lossy(&temp.stdout); - let pid = pid.trim(); - let arc_pid = Arc::new(pid.to_string()); - if pid.is_empty(){ - return false; - } else { - tokio::task::spawn_blocking(move || { - let cmd = Command::new("ps") - .args(["-o", "stat=", "-p", &arc_pid]) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute ps command"); - std::process::exit(101); - }); - String::from_utf8_lossy(&cmd.stdout).contains("T") - }) - .await - .unwrap() - } -} -async fn terminate_process (name: &str) { - let _ = Command::new("pkill") - .arg(name) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute command 'pkill'"); - std::process::exit(101); - }); -} -// another test -async fn freeze_process(name: &str) { - let _ = Command::new("pkill") - .args(["-STOP", name]) - .output() - .unwrap_or_else(|_| { - error!("Failed to freeze process"); - std::process::exit(101); - }); -} -async fn unfreeze_process(name: &str) { - let _ = Command::new("pkill") - .args(["-CONT", name]) - .output() - .unwrap_or_else(|_| { - error!("Failed to unfreeze process"); - std::process::exit(101); - }); -} -async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { - terminate_process(name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - return start_process(name, path).await; -} - -async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { - let runsh = format!("{}{}", path, "/run.sh"); - let mut command = Command::new("bash"); - command.arg(runsh); - - match command.spawn() { - Ok(_) => { - warn!("Process {} is running now!", name); - Ok(()) - }, - Err(_) => { - return Err(CustomError::Fatal) - }, - } -} -// check process status daemon -async fn running_handler -( - prc: Arc, - tx: Arc>, - watchers: Arc>> -) -{ - // println!("running daemon on {}", prc.name); - // services and files check (once) - let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone()); - let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - - let res = join!(files_check, services_check); - // if inactive -> spawn checks -> active is true - if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ - if start_process(&prc.name, &prc.path).await.is_err() { - tx.send(3).await.unwrap(); - return; - } - } - // if frozen -> spawn checks -> unfreeze is true - else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ - tx.send(10).await.unwrap(); - return; - } - tokio::time::sleep(Duration::from_millis(100)).await; - tokio::task::yield_now().await; -} - -async fn file_handler -( - name: &str, - files: &Vec, - tx: Arc>, - watchers: Arc>> -) -> Result<(), CustomError> -{ - // println!("file daemon on {}", name); - for (i, file) in files.iter().enumerate() { - // let src = format!("{}{}", file.src, file.filename); - if check_file(&file.filename, &file.src).await.is_err() { - if !is_active(name).await || is_frozen(name).await { - return Err(CustomError::Fatal); - } - match file.triggers.on_delete.as_str() { - "stay" => { - continue; - }, - "stop" => { - if is_active(name).await { - tx.send(1).await.unwrap(); - } - return Err(CustomError::Fatal); - }, - "hold" => { - if is_active(name).await { - tx.send(2).await.unwrap(); - return Err(CustomError::Fatal); - } - }, - _ => { - tokio::time::sleep(Duration::from_millis(50)).await; - tx.send(101).await.unwrap(); - return Err(CustomError::Fatal); - }, - } - } else if is_active(name).await && !is_frozen(name).await{ - let watchers = watchers.clone(); - // println!("mutex: {:?}", watchers); - let mut buffer = [0; 128]; - let mut mutex_guard = watchers.lock().await; - if let Some(notify) = mutex_guard.get_mut(i) { - let events = notify.read_events(&mut buffer); - // println!("{:?}", events); - if events.is_ok(){ - let events: Vec = events.unwrap() - .into_iter() - .map(|mask| {mask.mask}) - .filter(|mask| { - *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF - }) - .collect(); - for event in events { - if let EventMask::DELETE_SELF = event { - // ! warning (DELETE_SELF event) ! - // println!("! warning (DELETE_SELF event) !"); - // * watcher recreation after dealing with file recreation mech in text editors - let mutex = notify.borrow_mut(); - - *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); - } - match file.triggers.on_change.as_str() { - "stop" => { - let _ = tx.send(7).await; - }, - "restart" => { - let _ = tx.send(8).await; - }, - "stay" => { - let _ = tx.send(9).await; - }, - _ => { - let _ = tx.send(101).await; - }, - } - } - } - } - } - } - tokio::task::yield_now().await; - Ok(()) -} - -async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { - let arc_name = Arc::new(filename.to_string()); - let arc_path = Arc::new(path.to_string()); - tokio::task::spawn_blocking(move || { - let fileconcat = format!("{}{}", arc_path, arc_name); - let path = Path::new(&fileconcat); - if path.exists() { - Ok(()) - } else { - Err(CustomError::Fatal) - } - }) - .await - .unwrap_or_else(|_| { - panic!("Corrupted while file check process"); - }) -} - -async fn service_handler(name: &str, services: &Vec, tx: Arc>) -> Result<(), CustomError> { - // println!("service daemon on {}", name); - for serv in services { - if check_service(&serv.hostname, &serv.port).await.is_err() { - if !is_active(name).await || is_frozen(name).await { - return Err(CustomError::Fatal); - } - error!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); - match serv.triggers.on_lost.as_str() { - "stay" => { - }, - "stop" => { - if looped_service_connecting(name, serv).await.is_err() { - tx.send(5).await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - return Err(CustomError::Fatal); - } - }, - "hold" => { - if is_frozen(name).await { - return Err(CustomError::Fatal); - } - if looped_service_connecting(name, serv).await.is_err() { - tx.send(6).await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - return Err(CustomError::Fatal); - } - }, - _ => { - tx.send(101).await.unwrap(); - return Err(CustomError::Fatal); - }, - } - } - } - tokio::time::sleep(Duration::from_millis(100)).await; - tokio::task::yield_now().await; - Ok(()) -} - -async fn looped_service_connecting( - name: &str, - serv: &Services -) -> Result<(), CustomError> -{ - if serv.triggers.wait == 0 { - loop { - tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; - warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); - match check_service(&serv.hostname, &serv.port).await { - Ok(_) => { - log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name); - break; - }, - Err(_) => { - continue; - }, - } - } - return Ok(()); - } else { - let start = Instant::now(); - while start.elapsed().as_secs() < serv.triggers.wait.into() { - tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; - warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); - match check_service(&serv.hostname, &serv.port).await { - Ok(_) => { - log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name); - return Ok(()); - }, - Err(_) => { - continue; - }, - } - } - return Err(CustomError::Fatal); - } -} - -// ! have to be rewritten -// todo: rewrite use -async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { - let addr = format!("{}:{}", hostname, port); - - match addr.to_socket_addrs() { - Ok(mut addrs) => { - if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) { - return Ok(()); - } else { - return Err(CustomError::Fatal); - } - }, - Err(_) => return Err(CustomError::Fatal), - } -} - // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' fn get_container_id() -> Option { match Command::new(GET_ID_CMD).output() { diff --git a/src/prcs.rs b/src/prcs.rs new file mode 100644 index 0000000..b6ee183 --- /dev/null +++ b/src/prcs.rs @@ -0,0 +1,110 @@ +use std::sync::Arc; +use std::process::{ Command, Output }; +use log::{error, warn}; +use tokio::time::Duration; +use crate::structs::CustomError; + +pub async fn get_pid(name: &str) -> Output { + let name = Arc::new(name.to_string()); + tokio::task::spawn_blocking(move || { + Command::new("pidof") + .arg(&*name) + .output() + .unwrap_or_else(|_| { + error!("Failed to execute command 'pidof'"); + std::process::exit(101); + }) + }) + .await + .unwrap() +} +// ! can be with bug !!! +// * APPROVED +pub async fn is_active(name: &str)-> bool { + let arc_name = Arc::new(name.to_string()); + tokio::task::spawn_blocking(move || { + let output = Command::new("pidof") + .arg(&*arc_name) + .output() + .unwrap_or_else(|_| { + error!("Failed to execute command 'pidof'"); + std::process::exit(101); + }); + !String::from_utf8_lossy(&output.stdout).trim().is_empty() + }) + .await + .unwrap() +} + +// T is for stopped processes +pub async fn is_frozen(name: &str) -> bool { + let temp = get_pid(name).await; + let pid = String::from_utf8_lossy(&temp.stdout); + let pid = pid.trim(); + let arc_pid = Arc::new(pid.to_string()); + if pid.is_empty(){ + return false; + } else { + tokio::task::spawn_blocking(move || { + let cmd = Command::new("ps") + .args(["-o", "stat=", "-p", &arc_pid]) + .output() + .unwrap_or_else(|_| { + error!("Failed to execute ps command"); + std::process::exit(101); + }); + String::from_utf8_lossy(&cmd.stdout).contains("T") + }) + .await + .unwrap() + } +} +pub async fn terminate_process (name: &str) { + let _ = Command::new("pkill") + .arg(name) + .output() + .unwrap_or_else(|_| { + error!("Failed to execute command 'pkill'"); + std::process::exit(101); + }); +} +// another test +pub async fn freeze_process(name: &str) { + let _ = Command::new("pkill") + .args(["-STOP", name]) + .output() + .unwrap_or_else(|_| { + error!("Failed to freeze process"); + std::process::exit(101); + }); +} +pub async fn unfreeze_process(name: &str) { + let _ = Command::new("pkill") + .args(["-CONT", name]) + .output() + .unwrap_or_else(|_| { + error!("Failed to unfreeze process"); + std::process::exit(101); + }); +} +pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { + terminate_process(name).await; + tokio::time::sleep(Duration::from_millis(100)).await; + return start_process(name, path).await; +} + +pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { + let runsh = format!("{}{}", path, "/run.sh"); + let mut command = Command::new("bash"); + command.arg(runsh); + + match command.spawn() { + Ok(_) => { + warn!("Process {} is running now!", name); + Ok(()) + }, + Err(_) => { + return Err(CustomError::Fatal) + }, + } +} \ No newline at end of file diff --git a/src/services.rs b/src/services.rs new file mode 100644 index 0000000..c4025e8 --- /dev/null +++ b/src/services.rs @@ -0,0 +1,104 @@ +use std::sync::Arc; +use tokio::time::{ Duration, Instant }; +use tokio::sync::mpsc; +use crate::structs::{Services, CustomError}; +use crate::prcs::{is_active, is_frozen}; +use log::{error, warn}; +use std::net::{TcpStream, ToSocketAddrs}; + + +pub async fn service_handler(name: &str, services: &Vec, tx: Arc>) -> Result<(), CustomError> { + // println!("service daemon on {}", name); + for serv in services { + if check_service(&serv.hostname, &serv.port).await.is_err() { + if !is_active(name).await || is_frozen(name).await { + return Err(CustomError::Fatal); + } + error!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); + match serv.triggers.on_lost.as_str() { + "stay" => { + }, + "stop" => { + if looped_service_connecting(name, serv).await.is_err() { + tx.send(5).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + return Err(CustomError::Fatal); + } + }, + "hold" => { + if is_frozen(name).await { + return Err(CustomError::Fatal); + } + if looped_service_connecting(name, serv).await.is_err() { + tx.send(6).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + return Err(CustomError::Fatal); + } + }, + _ => { + tx.send(101).await.unwrap(); + return Err(CustomError::Fatal); + }, + } + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + tokio::task::yield_now().await; + Ok(()) +} + +async fn looped_service_connecting( + name: &str, + serv: &Services +) -> Result<(), CustomError> +{ + if serv.triggers.wait == 0 { + loop { + tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; + warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); + match check_service(&serv.hostname, &serv.port).await { + Ok(_) => { + log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name); + break; + }, + Err(_) => { + continue; + }, + } + } + return Ok(()); + } else { + let start = Instant::now(); + while start.elapsed().as_secs() < serv.triggers.wait.into() { + tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; + warn!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); + match check_service(&serv.hostname, &serv.port).await { + Ok(_) => { + log::info!("Successfully connected to {} from {} process!", &serv.hostname, &name); + return Ok(()); + }, + Err(_) => { + continue; + }, + } + } + return Err(CustomError::Fatal); + } +} + +// ! have to be rewritten +// todo: rewrite use +async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { + let addr = format!("{}:{}", hostname, port); + + match addr.to_socket_addrs() { + Ok(mut addrs) => { + if let Some(_) = addrs.find(|a| TcpStream::connect_timeout(a, std::time::Duration::new(1, 0)).is_ok()) { + return Ok(()); + } else { + return Err(CustomError::Fatal); + } + }, + Err(_) => return Err(CustomError::Fatal), + } +} \ No newline at end of file diff --git a/src/structs.rs b/src/structs.rs new file mode 100644 index 0000000..74f8bfd --- /dev/null +++ b/src/structs.rs @@ -0,0 +1,79 @@ +use serde::{ Deserialize, Serialize }; + +/// # an Error enum (nextly will be deleted and replaced) +pub enum CustomError { + Fatal, +} +pub enum ConfigActuality { + Local, + Remote, +} + +/// # struct for the 1st level in json conf file +/// > (needed in serialization and deserialization) +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Processes { + // #[serde(rename="id")] + // runner_id: usize, + #[serde(rename="dateOfCreation")] + pub date_of_creation : String, + #[serde(default)] + pub processes : Vec, +} + +/// # struct for each process to contain info, such as name, path and dependencies +/// > (needed in serialization and deserialization) +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct TrackingProcess { + pub name : String, + pub path : String, + pub dependencies: Dependencies, +} + +/// # struct for processes' dependecies including files and services +/// > (needed in serialization and deserialization) +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Dependencies { + #[serde(default)] + pub files : Vec, + #[serde(default)] + pub services: Vec, +} + +/// # struct for containing file object with its triggers to manipulate in daemons +/// > (needed in serialization and deserialization) +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Files { + pub filename : String, + pub src : String, + pub triggers : FIleTriggers, +} + +/// # struct for containing service object with its triggers to manipulate in daemons +/// > (needed in serialization and deserialization) +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Services { + pub hostname : String, + pub port : u32, + pub triggers : ServiceTriggers, +} + +/// # struct for instancing each service's policies such as on lost or time to wait till reachable +/// > (needed in serialization and deserialization) +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ServiceTriggers { + pub wait : u32, + pub delay: u32, + #[serde(rename="onLost")] + pub on_lost : String, +} + +/// # struct for instancing each file's policies such as ondelete or onupdate events +/// > (needed in serialization and deserialization) +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct FIleTriggers { + #[serde(rename="onDelete")] + pub on_delete : String, + #[serde(rename="onChange")] + pub on_change : String, +} \ No newline at end of file diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..f899086 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,159 @@ +use std::sync::Arc; +use crate::structs::TrackingProcess; +use tokio::sync::mpsc; +use inotify::Inotify; +use crate::files::create_watcher; +use log::{error, warn}; +use crate::prcs::{ + is_active, + is_frozen, + terminate_process, + restart_process, + freeze_process, + unfreeze_process, + start_process +}; +use tokio::time::Duration; +use tokio::join; +use crate::files::file_handler; +use crate::services::service_handler; + +/// # async func to run 3 main daemons (now its more like tree-form than classiacl 0.1.0 form ) +/// > hint : give mpsc with capacity 1 to jump over potential errors during running process +/// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") ** +pub async fn run_daemons( + proc: Arc, + tx: Arc>, + rx: &mut mpsc::Receiver +) +{ + // creating watchers + ---buffers--- + let mut watchers: Vec = vec![]; + for file in proc.dependencies.files.clone().into_iter() { + watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); + } + let watchers_clone: Arc>> = Arc::new(tokio::sync::Mutex::new(watchers)); + + loop { + let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); + tokio::select! { + _ = run_hand => {}, + _val = rx.recv() => { + match _val.unwrap() { + // 1 - File-dependency handling error -> terminating (after waiting) + 1 => { + if is_active(&proc.name).await { + error!("Dependency handling error: Terminating {} process ..." , &proc.name); + terminate_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + // break; + }, + // 2 - File-dependency handling error -> holding (after waiting) + 2 => { + if !is_frozen(&proc.name).await { + error!("Dependency handling error: Freezing {} process ..." , &proc.name); + freeze_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + }, + // 3 - Running process error + 3 => { + error!("Error due to starting {} process", &proc.name); + }, + // 4 - Timeout of waiting service-dependency -> staying (after waiting) + 4 => { + warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); + }, + // 5 - Timeout of waiting service-dependency -> terminating (after waiting) + 5 => { + if is_active(&proc.name).await { + error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); + terminate_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + // break; + }, + // 6 - Timeout of waiting service-dependency -> holding (after waiting) + 6 => { + if !is_frozen(&proc.name).await { + error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); + freeze_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + }, + // // 7 - File-dependency change -> terminating (after check) + 7 => { + error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); + terminate_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(100)).await; + }, + // // 8 - File-dependency change -> restarting (after check) + 8 => { + warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); + let _ = restart_process(&proc.name, &proc.path).await; + tokio::time::sleep(Duration::from_millis(100)).await; + }, + // // 9 - File-dependency change -> staying (after check) + 9 => { + warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name); + }, + + // 10 - Process unfreaze call via file handler + 10 => { + if is_frozen(&proc.name).await { + warn!("Unfreezing process {} call...", &proc.name); + unfreeze_process(&proc.name).await; + } + }, + // 11 - Process unfreaze call via service handler + 11 => { + if is_frozen(&proc.name).await { + warn!("Unfreezing process {} call...", &proc.name); + unfreeze_process(&proc.name).await; + } + }, + // 101 - Impermissible trigger values in JSON + 101 => { + error!("Impermissible trigger values in JSON"); + if is_active(&proc.name).await { + terminate_process(&proc.name).await; + } + break; + }, + _ => {}, + } + }, + } + // tokio::task::yield_now().await; + } +} +// check process status daemon +pub async fn running_handler +( + prc: Arc, + tx: Arc>, + watchers: Arc>> +) +{ + // println!("running daemon on {}", prc.name); + // services and files check (once) + let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone()); + let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); + + let res = join!(files_check, services_check); + // if inactive -> spawn checks -> active is true + if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ + if start_process(&prc.name, &prc.path).await.is_err() { + tx.send(3).await.unwrap(); + return; + } + } + // if frozen -> spawn checks -> unfreeze is true + else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ + tx.send(10).await.unwrap(); + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + tokio::task::yield_now().await; +}