use log::{error, warn}; use std::process::{Command, Output}; use std::sync::Arc; use tokio::time::Duration; use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit}; use std::collections::HashSet; use tokio::sync::mpsc::Receiver as MpscReciever; use async_trait::async_trait; use serde::Serialize; pub mod v2 { use log::info; use tokio::time::sleep; use crate::options::structs::DependencyType; use std::path::Path; use super::*; #[derive(Debug, Serialize, Clone, Copy)] pub struct Pid(u32); impl std::fmt::Display for Pid { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { return write!(f, "{}", self.0); } } impl Pid { fn new() -> Self { Pid(0) } fn new_from_output(pid: Option) -> Self { let temp = { match pid { Some(pid) => String::from_utf8_lossy(pid.stdout.trim_ascii()).into_owned(), None => return Pid(0), } }; Pid(temp.parse::().unwrap_or_else(|_| { 0 })) } pub fn new_sysinfo_pid(&self) -> sysinfo::Pid { sysinfo::Pid::from_u32(self.0 as u32) } } #[derive(Debug)] pub struct ProcessesController { pub name: Arc, pub pid : Pid, bin: String, // obj: Arc, state: ProcessState, event_reader: MpscReciever, negative_events: HashSet>, } impl PartialEq for ProcessesController { fn eq(&self, other: &Self) -> bool { self.bin == other.bin } } impl ProcessesController { pub fn new(name: &str, event_reader: MpscReciever) -> ProcessesController { ProcessesController { name : Arc::from(name), pid : Pid::new(), bin : String::new(), state : ProcessState::Stopped, event_reader, negative_events : HashSet::new(), } } pub fn with_exe(mut self, bin: impl AsRef) -> ProcessesController { self.bin = bin.as_ref().to_string_lossy().into_owned(); self } pub fn get_pid(&self) -> Pid { self.pid } async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { match trigger { "stay" => { info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name); }, "stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); match terminate_process(&self.name).await { Ok(_) => { self.state = ProcessState::Stopped; self.pid = Pid::new(); }, Err(er) => { error!("Cannot stop process {} : {}", self.name, er); }, } } }, "user-stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); match terminate_process(&self.name).await { Ok(_) => { self.state = ProcessState::StoppedByCli; self.pid = Pid::new(); }, Err(er) => { error!("Cannot forcefully stop process {} : {}", self.name, er); }, } } }, "user-hold" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name); match freeze_process(&self.name).await { Ok(_) => { self.state = ProcessState::HoldingByCli; self.pid = Pid::new(); }, Err(er) => { error!("Cannot forcefully freeze process {} : {}", self.name, er); }, } } }, "hold" => { if !is_frozen(&self.name).await { info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); match freeze_process(&self.name).await { Ok(_) => { self.state = ProcessState::Holding; self.pid = Pid::new(); }, Err(er) => { error!("Cannot freeze process {} : {}", self.name, er); }, } } }, "restart" => { info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); let pid = restart_process(&self.name, &self.bin).await; sleep(Duration::from_millis(100)).await; if let Ok(pid) = pid { self.pid = Pid(pid); info!("{}: New PID - {}", self.name, self.pid); } }, _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), } tokio::time::sleep(Duration::from_micros(100)).await; } pub async fn stop_by_user_call(&mut self) -> anyhow::Result<()> { terminate_process(&self.name).await?; self.state = ProcessState::StoppedByCli; self.pid = Pid::new(); Ok(()) } pub async fn freeze_by_user_call(&mut self) -> anyhow::Result<()> { freeze_process(&self.name).await?; self.state = ProcessState::HoldingByCli; Ok(()) } pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> { let pid = start_process(&self.name, &self.bin).await?; self.state = ProcessState::Pending; self.pid = Pid(pid); Ok(()) } pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> { unfreeze_process(&self.name).await?; self.state = ProcessState::Pending; Ok(()) } } #[async_trait] impl ProcessUnit for ProcessesController { async fn process(&mut self) { if self.negative_events.len() == 0 { match self.state { ProcessState::Holding => { info!("No negative dependecies events on {} process. Unfreezing ...", self.name); if let Err(er) = unfreeze_process(&self.name).await { error!("Cannot unfreeze process {} : {}", self.name, er); } else { self.state = ProcessState::Pending; self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); info!("{}: New PID - {}", self.name, self.pid); } }, ProcessState::Stopped => { info!("No negative dependecies events on {} process. Starting ...", self.name); if let Err(er) = start_process(&self.name, &self.bin).await { error!("Cannot start process {} : {}", self.name, er); } else { self.state = ProcessState::Pending; self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); info!("{}: New PID - {}", self.name, self.pid); } }, _ => {}, } } while let Ok(event) = self.event_reader.try_recv() { match event { Events::Positive(target) => { if self.negative_events.contains(&target) { self.negative_events.remove(&target); } }, Events::Negative(event) => { match event { NegativeOutcomes::FileWasChanged(target, dep_type, trigger) | NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) | NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => { if !self.negative_events.contains(&target) { self.negative_events.insert(target.clone()); self.trigger_on( &target, &trigger, dep_type ).await; } }, } }, } } } } } /// # Fn `get_pid` /// ## for initializing process of unstoppable grubbing metrics. /// /// *input* : `&str` /// /// *output* : `None` if cant get process PID | `Some(Output)` on success /// /// *initiator* : fn `is_frozen`, fn `utils::files::file_handler`, fn `utils::files::service_handler` /// /// *managing* : process name /// /// *depends on* : - /// pub async fn get_pid(name: &str) -> Option { let name = Arc::new(name.to_string()); let res = // todo : unwrap change - can be unsafe tokio::task::spawn_blocking(move || Command::new("pidof").arg(&*name).output()).await.unwrap(); if let Ok(output) = res { if output.stderr.is_empty() && output.stdout.is_empty() { None } else { Some(output) } } else { None } } /// # Fn `is_active` /// ## for checking process's activity state /// /// *input* : `&str` /// /// *output* : `true` if process running | `false` if not /// /// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler` /// /// *managing* : process name /// /// *depends on* : - /// pub async fn is_active(name: &str) -> bool { let arc_name = Arc::new(name.to_string()); tokio::task::spawn_blocking(move || { let output = Command::new("pidof") .arg(&*arc_name) .output() .unwrap_or_else(|_| { error!("Failed to execute command 'pidof'"); std::process::exit(101); }); !String::from_utf8_lossy(&output.stdout).trim().is_empty() }) .await .unwrap() } /// # Fn `is_frozen` /// ## for checking process's hibernation state /// /// *input* : `&str` /// /// *output* : `true` if process is frozen | `false` if not /// /// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler` /// /// *managing* : process name /// /// *depends on* : fn `get_pid` /// pub async fn is_frozen(name: &str) -> bool { let temp: Output; if let Some(output) = get_pid(name).await { temp = output; } else { return false; } let pid = String::from_utf8_lossy(&temp.stdout); let pid = pid.trim(); let arc_pid = Arc::new(pid.to_string()); if pid.is_empty() { false } else { tokio::task::spawn_blocking(move || { let cmd = Command::new("ps") .args(["-o", "stat=", "-p", &arc_pid]) .output() .unwrap_or_else(|_| { error!("Failed to execute ps command"); std::process::exit(101); }); String::from_utf8_lossy(&cmd.stdout).contains("T") }) .await .unwrap() } } /// # Fn `terminate_process` /// ## for stop current process /// /// *input* : `&str` /// /// *output* : () /// /// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler`, fn `utils::run_daemons` /// /// *managing* : process name /// /// *depends on* : - /// pub async fn terminate_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") .arg(name) .output()?; Ok(()) } /// # Fn `terminate_process` /// ## for freeze/hibernate current process /// /// *input* : `&str` /// /// *output* : () /// /// *initiator* : fn `utils::run_daemons` /// /// *managing* : process name /// /// *depends on* : - /// pub async fn freeze_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") .args(["-STOP", name]) .output()?; Ok(()) } /// # Fn `unfreeze_process` /// ## for unfreeze/hibernate current process /// /// *input* : `&str` /// /// *output* : () /// /// *initiator* : fn `utils::run_daemons` /// /// *managing* : process name /// /// *depends on* : - /// pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") .args(["-CONT", name]) .output()?; Ok(()) } /// # Fn `restart_process` /// ## for restarting current process /// /// *input* : `&str`, &str /// /// *output* : () /// /// *initiator* : fn `utils::run_daemons` /// /// *managing* : process name and path to its exec file /// /// *depends on* : fn `start_process`, fn `terminate_process` /// pub async fn restart_process(name: &str, path: &str) -> anyhow::Result { terminate_process(name).await?; tokio::time::sleep(Duration::from_millis(100)).await; start_process(name, path).await } /// # Fn `start_process` /// ## for starting current process /// /// *input* : `&str`, &str /// /// *output* : () /// /// *initiator* : fn `restart_process` /// /// *managing* : process name and path to its exec file /// /// *depends on* : - /// pub async fn start_process(name: &str, path: &str) -> anyhow::Result { // let runsh = format!("{} {}", "exec", path); let mut command = Command::new(path); // command.arg(path); match command.spawn() { Ok(child) => { let pid = child.id(); warn!("Process {} is running now!", name); Ok(pid) } Err(er) => { Err(anyhow::Error::msg(format!("Cannot start process {} : {}", name, er))) } } } #[cfg(test)] mod process_unittests { use super::*; // 1 full cycle - start -> restart -> stop // 2 full cycle - start -> freeze -> unfreze -> stop // 2x is active // 2x is frozen // 2x pidof // rewrite, its a pipe #[tokio::test] async fn full_cycle_with_restart() { // let _ = std::io::stdout().write_all(b""); let res1 = start_process("restart-prc", "./tests/examples/restart-prc").await; assert!(res1.is_ok()); let res2 = restart_process("restart-prc", "./tests/examples/restart-prc").await; assert!(res2.is_ok()); let _ = terminate_process("restart-prc").await; let res3 = is_active("restart-prc").await; assert!(!res3); } // rewrite, its a pipe #[tokio::test] async fn full_cycle_with_freeze() { assert!(true); } #[tokio::test] async fn is_active_check() { let res1 = start_process("tmp-prc", "./tests/examples/tmp-prc").await; assert!(res1.is_ok()); assert!(is_active("tmp-prc").await); let _ = terminate_process("tmp-prc").await; } #[tokio::test] async fn isnt_active_check() { assert!(!is_active("invalid-process-name").await); } #[tokio::test] async fn is_frozen_check() { let res1 = start_process("freeze-check", "./tests/examples/freeze-check").await; assert!(res1.is_ok()); assert!(!is_frozen("freeze-check").await); } #[tokio::test] async fn pidof_active_process() { assert!(get_pid("pidof-prc").await.is_none()); let res1 = start_process("pidof-prc", "./tests/examples/pidof-prc").await; assert!(res1.is_ok()); assert!(get_pid("pidof-prc").await.is_some()); let _ = terminate_process("pidof-prc").await; } // broken mechanism need to check #[tokio::test] async fn pidof_disabled_process() { assert!(get_pid("invalid-process-name").await.is_none()); } }