From b4c42dfc000ef14b2a22b6344f77c976ff667da1 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 13 May 2025 10:57:02 +0300 Subject: [PATCH 01/31] version patch --- noxis-rs/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 1ad650e..8867489 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "noxis-rs" -version = "0.11.26" +version = "0.11.27" edition = "2021" [dependencies] From 62af29b9e01e680791a4a4cb45f194da780ed5b6 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 13 May 2025 11:32:30 +0300 Subject: [PATCH 02/31] added config storing in supervisor --- noxis-rs/src/utils.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index b1e315a..03513db 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -12,7 +12,7 @@ use crate::options::structs::{CustomError, TrackingProcess, Processes}; // use inotify::Inotify; use log::{error, warn, info}; use prcs::{ - freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, + freeze_process, is_active, is_frozen, restart_process, terminate_process, unfreeze_process, }; // use services::service_handler; @@ -47,14 +47,16 @@ pub mod v2 { prcs : LinkedList, files : LinkedList, services : LinkedList, + config : Arc, } impl Supervisor { pub fn new() -> Supervisor { - Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()} + Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()) } } - pub async fn with_config(mut self, config: &Processes) -> Supervisor { - let _ = config.processes.iter() + pub async fn with_config(mut self, config: Processes) -> Supervisor { + self.config = Arc::from(config); + let _ = self.config.processes.iter() .for_each(|prc| { let (rx, tx) = mpsc::channel::(10); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); @@ -127,6 +129,9 @@ pub mod v2 { async fn process(&mut self) { info!("Initializing monitoring ..."); loop { + // + // todo: CHANNEL check and reaction + // // dbg!(&self); let mut tasks: Vec> = vec![]; // let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap()); @@ -177,7 +182,7 @@ pub mod v2 { pub async fn init_monitoring( config: Processes ) -> anyhow::Result<()> { - let mut supervisor = Supervisor::new().with_config(&config).await; + let mut supervisor = Supervisor::new().with_config(config).await; info!("Monitoring: {} ", &supervisor.get_stats()); supervisor.process().await; Ok(()) From 551223dd911bcbb43f3fa757c250791b77eb74c9 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 13 May 2025 12:09:47 +0300 Subject: [PATCH 03/31] added new status (HOS BY USER) --- noxis-rs/src/options/structs.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index d37011d..62e6e3e 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -93,7 +93,21 @@ pub enum ProcessState { Holding, Stopped, StoppedByCli, + HoldingByCli, } +impl std::fmt::Display for ProcessState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + return match self { + ProcessState::Pending => write!(f, "Running"), + ProcessState::Holding => write!(f, "Holding"), + ProcessState::Stopped => write!(f, "Stopped"), + ProcessState::StoppedByCli => write!(f, "Forcibly stopped"), + ProcessState::HoldingByCli => write!(f, "Forcibly holding"), + } + } +} + + #[derive(Debug)] pub enum Events { Positive(Arc), From 4b9db02528dd72a58bb0ac30bc7a47c66bb6ecec Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 13 May 2025 12:10:37 +0300 Subject: [PATCH 04/31] #7. PId support + new status + todo --- noxis-rs/src/utils/prcs.rs | 52 +++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 3f10903..9e72d4e 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -13,10 +13,35 @@ pub mod v2 { use std::path::Path; use super::*; + + #[derive(Debug)] + struct Pid(i64); + + impl std::fmt::Display for Pid { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + return write!(f, "{}", self.0); + } +} + + impl Pid { + fn new() -> Self { + Pid(-1) + } + fn new_from_output(pid: Option) -> Self { + let temp = { + match pid { + Some(pid) => String::from_utf8_lossy(pid.stdout.trim_ascii()).into_owned(), + None => return Pid(-1), + } + }; + Pid(temp.parse::().unwrap_or_else(|_| { -1 })) + } + } #[derive(Debug)] pub struct ProcessesController { - name: Arc, + pub name: Arc, + pub pid : Pid, bin: String, // obj: Arc, state: ProcessState, @@ -34,6 +59,7 @@ pub mod v2 { pub fn new(name: &str, event_reader: MpscReciever) -> ProcessesController { ProcessesController { name : Arc::from(name), + pid : Pid::new(), bin : String::new(), state : ProcessState::Stopped, event_reader, @@ -55,6 +81,23 @@ pub mod v2 { info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); terminate_process(&self.name).await; self.state = ProcessState::Stopped; + self.pid = Pid::new(); + } + }, + "user-stop" => { + if is_active(&self.name).await { + info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); + terminate_process(&self.name).await; + self.state = ProcessState::StoppedByCli; + self.pid = Pid::new(); + } + }, + "user-hold" => { + if is_active(&self.name).await { + info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name); + freeze_process(&self.name).await; + self.state = ProcessState::HoldingByCli; + self.pid = Pid::new(); } }, "hold" => { @@ -62,11 +105,14 @@ pub mod v2 { info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); freeze_process(&self.name).await; self.state = ProcessState::Holding; + self.pid = Pid::new(); } }, "restart" => { info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); let _ = restart_process(&self.name, &self.bin).await; + self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); + info!("{}: New PID - {}", self.name, self.pid); }, _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), } @@ -85,6 +131,8 @@ pub mod v2 { error!("Cannot unfreeze process {} : {}", self.name, er); } else { self.state = ProcessState::Pending; + self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); + info!("{}: New PID - {}", self.name, self.pid); } }, ProcessState::Stopped => { @@ -93,6 +141,8 @@ pub mod v2 { error!("Cannot start process {} : {}", self.name, er); } else { self.state = ProcessState::Pending; + self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); + info!("{}: New PID - {}", self.name, self.pid); } }, _ => {}, From 6ff17d9620db1ed7cdff6857950f81e5e4aa9e14 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 13 May 2025 16:20:09 +0300 Subject: [PATCH 05/31] metrics added (without processes) --- noxis-rs/src/main.rs | 4 + noxis-rs/src/options/structs.rs | 1 + noxis-rs/src/utils/metrics.rs | 403 +++++++++++++++++--------------- noxis-rs/src/utils/prcs.rs | 12 +- 4 files changed, 232 insertions(+), 188 deletions(-) diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 4cc69f4..9a1d0da 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -16,9 +16,13 @@ use options::preboot::PrebootParams; use tokio::sync::{broadcast, oneshot}; use options::config::v2::init_config_mechanism; use utils::v2::init_monitoring; +use metrics::init_metrics_grubber; #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ + init_metrics_grubber().await; + todo!(); + let preboot = Arc::new(PrebootParams::validate()); let _ = setup_logger(); diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 62e6e3e..3423d25 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use async_trait::async_trait; use std::sync::Arc; + #[derive(Debug)] pub enum DependencyType { File, diff --git a/noxis-rs/src/utils/metrics.rs b/noxis-rs/src/utils/metrics.rs index 756e44b..47b733f 100644 --- a/noxis-rs/src/utils/metrics.rs +++ b/noxis-rs/src/utils/metrics.rs @@ -2,18 +2,136 @@ // cpu load, ram/rom load and net activity // use std::sync::Mutex; -use std::sync::Arc; -use crate::options::structs::TrackingProcess; -use sysinfo::{Process, System}; -use tokio::join; -use crate::options::structs::{ProcessMetrics, ContainerMetrics}; -use super::get_container_id; +use std::{collections::BTreeMap, net, sync::Arc}; +use crate::options::structs::{ProcessState, TrackingProcess}; +use sysinfo::{System, Disks as DisksList, Networks}; +use crate::options::structs::Dependencies; +use serde::Serialize; +use super::prcs::v2::Pid; // use pcap::{Device, Capture, Active}; // use std::net::Ipv4Addr; // use anyhow::{Result, Ok}; // type PacketBuffer = Arc>>; +type CoreUsage = BTreeMap; +type Disks = Vec; +type Ifaces = Vec; +pub type MetricProcesses = Vec; + + +#[derive(Serialize, Debug)] +struct FullMetrics { + hostname : String, + os : String, + kernel : String, + cpu : Cpu, + ram : Ram, + disks : Disks, + networks : Ifaces, + processes : MetricProcesses, +} + +#[derive(Debug)] +struct HostInfo { + hostname : String, + os : String, + kernel : String, +} + + +#[derive(Serialize, Debug)] +struct Cpu { + global_usage : f32, + usage : CoreUsage, +} + +#[derive(Serialize, Debug)] +struct CoreInfo { + name: String, + brand : String, + frequency : u64, + vendor_id : String, + usage : f32, +} + +#[derive(Serialize, Debug)] +struct Ram { + free_mem : u64, + free_swap : u64, + total_mem : u64, + total_swap : u64 +} + +#[derive(Serialize, Debug)] +struct Disk { + name : String, + kind : String, + fs : String, + mount_point : String, + total_space : u64, + available_space : u64, + is_removable : bool, + is_readonly : bool, +} + + // vec +#[derive(Serialize, Debug)] +struct Network { + iname : String, + mac : String, + recieved : u64, + transmitted : u64, + total_recieved_bytes : u64, + total_transmitted_bytes : u64, + total_recieved_packets : u64, + total_transmitted_packets : u64, + errors_on_recieved : u64, + errors_on_transmitted : u64, +} + +#[derive(Serialize, Debug)] +pub struct ProcessesExtended { + name : String, + status : String, + pid : Pid, + dependencies : Dependencies, + cpu_usage : f32, + ram_usage : f32, + virtual_mem_usage : u64, + disks_usage_read_bytes: u64, + disks_usage_write_bytes: u64, +} + +impl ProcessesExtended { + pub fn from_old_with_params( + old : Arc, + pid : Pid, + status : ProcessState, + ) -> Self { + Self { + name : old.name.clone(), + status : status.to_string(), + pid, + dependencies : old.dependencies.clone(), + cpu_usage : 0.0, + ram_usage : 0.0, + virtual_mem_usage : 0, + disks_usage_read_bytes: 0, + disks_usage_write_bytes: 0, + } + } + fn add_metrics(&mut self, system : &mut System) { + if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) { + self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32; + self.ram_usage = (system.total_memory() as f32) / (prc.memory() as f32); + self.disks_usage_read_bytes = prc.disk_usage().total_read_bytes; + self.disks_usage_write_bytes = prc.disk_usage().total_written_bytes; + self.virtual_mem_usage = prc.virtual_memory(); + } + } +} + /// # Fn `init_metrics_grubber` /// ## for initializing process of unstoppable grubbing metrics. /// @@ -28,204 +146,117 @@ use super::get_container_id; /// *depends on* : - /// #[allow(dead_code)] -pub async fn init_metrics_grubber() { +pub async fn init_metrics_grubber( + /* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */ +) { let mut system = System::new(); - // let mut buffer: Vec = vec![]; - // let shared_buf: PacketBuffer = Arc::new(Mutex::new(buffer)); + get_all_metrics(&mut system).await; +} +async fn get_all_metrics(system: &mut System) { system.refresh_all(); - // let temp = String::from_utf8(get_pid("systemd").await.unwrap().stdout).unwrap(); - // let prc = system.process(Pid::from_str(&temp).unwrap()).unwrap(); - // prc. - // let _ = capture_packets(shared_buf.clone()).await; + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + dbg!(get_host_info().await); + dbg!(get_cpu_metrics(system).await); + dbg!(get_ram_metrics(system).await); + dbg!(get_all_disks_metrics().await); + dbg!(get_all_ifaces_metrics().await); } -#[allow(dead_code)] -#[allow(unused_variables)] -async fn gather_metrics(proc: Arc) { - +async fn get_host_info() -> HostInfo { + HostInfo { + hostname : System::host_name().unwrap_or_default(), + os : System::long_os_version().unwrap_or_default(), + kernel : System::kernel_version().unwrap_or_default(), + } } -// DEPRECATED : for net monitoring -// async fn capture_packets(buffer: PacketBuffer) -> Result<()> { -// let mut cap = Capture::from_device(Device::lookup()?.unwrap())? -// .promisc(true) -// .open()?; +async fn get_cpu_metrics(system: &mut System) -> Cpu { + system.refresh_cpu_all(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; -// cap.filter("not broadcast and not multicast", true)?; + let mut buffer = CoreUsage::new(); + let global_usage = system.global_cpu_usage(); -// while let core::result::Result::Ok(packet) = cap.next_packet() { -// if let Some((src, dst, prot)) = get_packet_info(&packet.data).await { -// let packet_info = PacketInfo::new(String::from(prot), dst, src, packet.header.len as usize); -// let mut locked_buffer = buffer.lock().unwrap(); -// println!("{:?}", &packet_info); -// locked_buffer.push(packet_info); -// } -// } -// Ok(()) -// } -// async fn get_packet_info(data: &[u8]) -> Option<(Ipv4Addr, Ipv4Addr, &str)> { -// if data.len() >= 20 { -// let src_ip = Ipv4Addr::new(data[12], data[13], data[14], data[15]); -// let dst_ip = Ipv4Addr::new(data[16], data[17], data[18], data[19]); -// let protocol = match data[9] { -// 1 => "ICMP", -// 6 => "TCP", -// 17 => "UDP", -// _ => "Unknown", -// }; - -// Some((src_ip, dst_ip, protocol)) -// } else { -// None -// } -// } + system.cpus() + .iter() + .enumerate() + .for_each(|(id, cpu)| { + let core_info = CoreInfo { + // id, + brand : cpu.brand().to_string(), + name : cpu.name().to_string(), + frequency : cpu.frequency(), + vendor_id : cpu.vendor_id().to_string(), + usage : cpu.cpu_usage(), + }; + // buffer.push(core_info); + buffer.entry(id).or_insert(core_info); + }); - -/// # Fn `get_all_container_metrics` -/// ## for gathering all container (whole system metrics) -/// -/// *input* : `Arc`, `Arc>` -/// -/// *output* : `ContainerMetrics` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object, ref counter to list of processes -/// -/// *depends on* : `TrackingProcess` -/// -#[allow(dead_code)] -async fn get_all_container_metrics(sys: Arc, prcs: Arc>) -> ContainerMetrics { - let metrics = join!( - get_cpu_metrics_container(sys.clone()), - get_ram_metrics_container(sys.clone()), - get_subsystems(prcs.clone()) - ); - ContainerMetrics::new( - &get_container_id().unwrap_or(String::from("unknown")), - metrics.0, - metrics.1, - metrics.2 - ) + Cpu { + global_usage, + usage: buffer + } } -/// # Fn `get_cpu_metrics_container` -/// ## for gathering container cpu metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_cpu_metrics_container(sys: Arc) -> f32 { - sys.global_cpu_usage() +async fn get_ram_metrics(system: &mut System) -> Ram { + system.refresh_memory(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + Ram { + free_mem : system.free_memory(), + free_swap : system.free_swap(), + total_mem : system.total_memory(), + total_swap : system.total_swap(), + } } -/// # Fn `get_ram_metrics_container` -/// ## for gathering container ram metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_ram_metrics_container(sys: Arc) -> f32 { - (sys.used_memory() / sys.total_memory()) as f32 * 100.0 -} -// async fn get_mem_metrics_container(sys: Arc) -> f32 { -// sys. -// } - -/// # Fn `get_subsystems` -/// ## for gathering info about container subsystems (processes) -/// -/// *input* : `Arc>` -/// -/// *output* : `Vec` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to list of `TrackingProcess` -/// -/// *depends on* : `TrackingProcess` -/// -#[allow(dead_code)] -async fn get_subsystems(prcs: Arc>) -> Vec { - prcs.iter().map(|process| process.name.clone()).collect() +async fn get_all_disks_metrics() -> Disks { + let disks = DisksList::new_with_refreshed_list(); + let mut buffer = Disks::new(); + disks.list() + .iter() + .for_each(|disk| { + let disk = Disk { + name : disk.name().to_string_lossy().into_owned(), + kind: disk.kind().to_string(), + fs : disk.file_system().to_string_lossy().into_owned(), + mount_point : disk.mount_point().to_string_lossy().into_owned(), + total_space : disk.total_space(), + available_space : disk.available_space(), + is_removable : disk.is_removable(), + is_readonly : disk.is_read_only() + }; + buffer.push(disk); + }); + buffer } -/// # Fn `get_all_metrics_process` -/// ## for gathering all process' metrics -/// -/// *input* : `Arc`, `Arc` -/// -/// *output* : `ProcessMetrics` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : two ref counters to `Process` and `System` -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_all_metrics_process(proc: Arc, sys: Arc) -> ProcessMetrics { - let metrics = join!( - get_cpu_metrics_process(proc.clone()), - get_ram_metrics_process(proc.clone(), sys.clone()) - ); - ProcessMetrics::new( - proc.name().to_str().unwrap_or("unknown"), - metrics.0, - metrics.1 - ) +async fn get_all_ifaces_metrics() -> Ifaces { + let mut ifaces = Ifaces::new(); + let networks = Networks::new_with_refreshed_list(); + networks.iter() + .for_each(|(iface_name, data)| { + let mac = data.mac_address().to_string(); + let iface = Network { + iname : iface_name.to_owned(), + mac : mac, + recieved : data.received(), + transmitted : data.transmitted(), + total_recieved_bytes : data.total_received(), + total_transmitted_bytes : data.total_transmitted(), + total_recieved_packets : data.total_packets_received(), + total_transmitted_packets : data.total_packets_transmitted(), + errors_on_recieved : data.errors_on_received(), + errors_on_transmitted : data.errors_on_transmitted(), + }; + ifaces.push(iface); + }); + ifaces } -/// # Fn `get_cpu_metrics_process` -/// ## for gathering process cpu metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `Process` object -/// -/// *depends on* : - -/// -async fn get_cpu_metrics_process(proc: Arc) -> f32 { - proc.cpu_usage() -} - -/// # Fn `get_ram_metrics_process` -/// ## for gathering process ram metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `Process` object -/// -/// *depends on* : - -/// -async fn get_ram_metrics_process(proc: Arc, sys: Arc) -> f32 { - (proc.memory() as f64 / sys.total_memory() as f64) as f32 * 100.0 as f32 -} +async fn get_all_processes_metrics(system: &mut System) {} #[cfg(test)] mod metrics_unittets { diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 9e72d4e..76a09f6 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -6,6 +6,7 @@ use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUni use std::collections::HashSet; use tokio::sync::mpsc::Receiver as MpscReciever; use async_trait::async_trait; +use serde::Serialize; pub mod v2 { use log::info; @@ -14,8 +15,8 @@ pub mod v2 { use super::*; - #[derive(Debug)] - struct Pid(i64); + #[derive(Debug, Serialize, Clone, Copy)] + pub struct Pid(i64); impl std::fmt::Display for Pid { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -36,6 +37,9 @@ pub mod v2 { }; Pid(temp.parse::().unwrap_or_else(|_| { -1 })) } + pub fn new_sysinfo_pid(&self) -> sysinfo::Pid { + sysinfo::Pid::from_u32(self.0 as u32) + } } #[derive(Debug)] @@ -71,6 +75,10 @@ pub mod v2 { self } + pub fn get_pid(&self) -> Pid { + self.pid + } + async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { match trigger { "stay" => { From d6d45fe4aa1b16a8b385d3ce4425690b3f65b05e Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 13 May 2025 16:22:42 +0300 Subject: [PATCH 06/31] +mod version --- noxis-rs/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 8867489..38380c0 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "noxis-rs" -version = "0.11.27" +version = "0.12.0" edition = "2021" [dependencies] From 895dc345573f10f4526828f75f1975455250f025 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 14 May 2025 11:36:47 +0300 Subject: [PATCH 07/31] + env var for Noxis socket --- noxis-rs/.env.example | 3 ++- noxis-rs/src/main.rs | 12 +++++++----- noxis-rs/src/options/cli_pipeline.rs | 10 +++++++--- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/noxis-rs/.env.example b/noxis-rs/.env.example index c956c65..aa21a94 100644 --- a/noxis-rs/.env.example +++ b/noxis-rs/.env.example @@ -9,4 +9,5 @@ NOXIS_HAGENT_SOCKET_PATH = "/var/run/example/hostagent.sock" NOXIS_LOG_TO = "/var/log/noxis/noxis.log" NOXIS_REMOTE_SERVER_URL = "ip.ip.ip.ip:port" NOXIS_CONFIG_PATH = "./settings.json" -NOXIS_METRICS_MODE = "full" \ No newline at end of file +NOXIS_METRICS_MODE = "full" +NOXIS_SOCKET_PATH = "/path/to/noxis.sock" \ No newline at end of file diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 9a1d0da..7a1ca45 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -20,8 +20,8 @@ use metrics::init_metrics_grubber; #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ - init_metrics_grubber().await; - todo!(); + // init_metrics_grubber().await; + // todo!(); let preboot = Arc::new(PrebootParams::validate()); let _ = setup_logger(); @@ -33,19 +33,21 @@ async fn main() -> anyhow::Result<()>{ let (tx_oneshot, rx_oneshot) = oneshot::channel::(); let mut handler: Vec> = vec![]; - // initilaizing task for config manipulations + // initilaizing task for config manipulations + let preboot_config = preboot.clone(); let config_module = tokio::spawn(async move { let _ = init_config_mechanism( rx_oneshot, tx_brd, - preboot.clone() + preboot_config ).await; }); handler.push(config_module); // initilaizing task for cli manipulation + let preboot_cli = preboot.clone(); let cli_module = tokio::spawn(async move { - if let Err(er) = init_cli_pipeline().await { + if let Err(er) = init_cli_pipeline(preboot_cli).await { error!("CLI pipeline failed due to {}", er) } }); diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index 0c13e22..9464465 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -2,9 +2,12 @@ use log::{error, info}; use tokio::net::{ UnixStream, UnixListener }; use tokio::time::{sleep, Duration}; use std::fs; +use std::sync::Arc; use tokio::io::{ AsyncWriteExt, AsyncReadExt}; use noxis_cli::Cli; +use super::preboot::PrebootParams; + /// # Fn `init_cli_pipeline` /// ## for catching all input requests from CLI /// @@ -18,14 +21,15 @@ use noxis_cli::Cli; /// /// *depends on* : - /// -pub async fn init_cli_pipeline() -> anyhow::Result<()> { - let socket_path = "noxis.sock"; +pub async fn init_cli_pipeline(params: Arc) -> anyhow::Result<()> { + let socket_path = ¶ms.self_socket; let _ = fs::remove_file(socket_path); match UnixListener::bind(socket_path) { Ok(list) => { // TODO: remove `unwrap`s - info!("Listening on {}", socket_path); + info!("Listening on {}", socket_path.display()); + std::env::set_var("NOXIS_SOCKET_PATH", socket_path); loop { match list.accept().await { Ok((socket, _)) => { From 848b321228027622084f7796816f35b13db8a1ae Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 14 May 2025 11:37:04 +0300 Subject: [PATCH 08/31] + Noxis socket param --- noxis-rs/src/options/preboot.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 245db0c..78463c7 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -3,6 +3,7 @@ //! #[allow(unused_imports)] use anyhow::{Result, Error}; +use log::warn; use std::path::PathBuf; use std::env::var; use dotenv::dotenv; @@ -187,6 +188,7 @@ pub struct PrebootParams { pub remote_server_url : String, pub config : PathBuf, pub metrics: MetricsPrebootParams, + pub self_socket : PathBuf, } /// # implementation for `MetricsPrebootParams` @@ -251,6 +253,16 @@ impl PrebootParams { Err(_) => MetricsPrebootParams::Full, } }, + self_socket : { + match var("NOXIS_SOCKET_PATH") { + Ok(val) => PathBuf::from(val), + Err(_) => { + let default = std::env::current_dir().expect("Crushed on getting current_dir path. Check fs state!"); + warn!("$NOXIS_SOCKET_PATH wans't set. Default value - {}", default.display()); + PathBuf::from(default) + }, + } + }, } } } From afeece915cfd7cbfd01c6c1116b3b1e51619bd47 Mon Sep 17 00:00:00 2001 From: prplV Date: Sun, 18 May 2025 17:44:30 +0300 Subject: [PATCH 09/31] new features during cli rework --- noxis-cli/.env.example | 1 + noxis-cli/Cargo.toml | 1 + noxis-cli/src/cli.rs | 40 ++++++++++++++++++++++++++ noxis-cli/src/cli_error.rs | 2 +- noxis-cli/src/cli_net.rs | 2 +- noxis-cli/src/main.rs | 3 +- noxis-rs/src/options/cli_pipeline.rs | 42 ++++++++++++++++++++++++---- noxis-rs/src/options/preboot.rs | 6 ++-- 8 files changed, 85 insertions(+), 12 deletions(-) create mode 100644 noxis-cli/.env.example diff --git a/noxis-cli/.env.example b/noxis-cli/.env.example new file mode 100644 index 0000000..b0d53a3 --- /dev/null +++ b/noxis-cli/.env.example @@ -0,0 +1 @@ +NOXIS_SOCKET_PATH = "/home/vladislavd/diplom_code/noxis-rs/noxis.sock" \ No newline at end of file diff --git a/noxis-cli/Cargo.toml b/noxis-cli/Cargo.toml index e02d5f8..712b2b6 100644 --- a/noxis-cli/Cargo.toml +++ b/noxis-cli/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0.94" clap = { version = "4.5.22", features = ["derive"] } +dotenv = "0.15.0" serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.133" thiserror = "2.0.11" diff --git a/noxis-cli/src/cli.rs b/noxis-cli/src/cli.rs index 5a82b64..f5e7672 100644 --- a/noxis-cli/src/cli.rs +++ b/noxis-cli/src/cli.rs @@ -79,7 +79,23 @@ pub enum ConfigAction { about = "To reset all config settings", )] Reset, + #[command( + about = "To get current Noxis configuration", + name = "ls" + )] + Show(EnvConfig), } +#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] +pub struct EnvConfig { + // flag + #[arg( + long = "env", + action, + help = "to read environment vars configuration", + )] + pub is_env : bool, +} + #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] pub struct LocalConfig { @@ -148,4 +164,28 @@ pub enum ProcessAction { about = "To get info about current process's services-dependencies", )] Services, +} + +pub mod metrics_models { + pub enum MetricsMode { + Full, + // system + Cpu, + Memory, + Ram, + Rom, + Network, + // processes + Processes + // Config + } +} + +impl Cli { + pub fn validate_socket(mut self) -> Self { + if let Ok(path) = std::env::var("NOXIS_SOCKET_PATH") { + self.socket = path; + } + self + } } \ No newline at end of file diff --git a/noxis-cli/src/cli_error.rs b/noxis-cli/src/cli_error.rs index d5bae9b..ef9f802 100644 --- a/noxis-cli/src/cli_error.rs +++ b/noxis-cli/src/cli_error.rs @@ -2,7 +2,7 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum NoxisCliError { - #[error("Can't find socket `{0}`. Error : {1}")] + #[error("Can't find socket `{0}`. {1}")] NoxisDaemonMissing(String, String), #[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")] PortIsNotWritable, diff --git a/noxis-cli/src/cli_net.rs b/noxis-cli/src/cli_net.rs index a3300ed..6d31d12 100644 --- a/noxis-cli/src/cli_net.rs +++ b/noxis-cli/src/cli_net.rs @@ -25,6 +25,6 @@ pub async fn try_send(cli: Cli) -> Result<()> { .await .map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?; - println!("Received response: {}", String::from_utf8_lossy(&response)); + println!("{}", String::from_utf8_lossy(&response)); Ok(()) } \ No newline at end of file diff --git a/noxis-cli/src/main.rs b/noxis-cli/src/main.rs index 7961b75..2ad8ead 100644 --- a/noxis-cli/src/main.rs +++ b/noxis-cli/src/main.rs @@ -9,7 +9,8 @@ use anyhow::Result; #[tokio::main] async fn main() -> Result<()>{ - let cli = Cli::parse(); + dotenv::dotenv().ok(); + let cli = Cli::parse().validate_socket(); try_send(cli).await?; Ok(()) } diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index 9464465..451b6bb 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -1,4 +1,4 @@ -use log::{error, info}; +use log::{error, info, warn}; use tokio::net::{ UnixStream, UnixListener }; use tokio::time::{sleep, Duration}; use std::fs; @@ -34,7 +34,7 @@ pub async fn init_cli_pipeline(params: Arc) -> anyhow::Result<()> match list.accept().await { Ok((socket, _)) => { // tokio::spawn(); - process_connection(socket).await; + process_connection(socket, params.clone()).await; }, Err(er) => { error!("Cannot poll connection to CLI due to {}", er); @@ -64,7 +64,7 @@ pub async fn init_cli_pipeline(params: Arc) -> anyhow::Result<()> /// /// *depends on* : `tokio::net::TcpStream` /// -async fn process_connection(mut stream: UnixStream) { +async fn process_connection(mut stream: UnixStream, params: Arc) { let mut buf = vec![0; 1024]; match stream.read(&mut buf).await { Ok(0) => { @@ -76,9 +76,15 @@ async fn process_connection(mut stream: UnixStream) { match serde_json::from_slice::(&buf) { Ok(cli) => { info!("Received CLI request: {:?}", cli); - let response = "OK"; - if let Err(e) = stream.write_all(response.as_bytes()).await { - error!("Failed to send response: {}", e); + match process_cli_cmd(cli, params.clone()).await { + Ok(response) => { + if let Err(e) = stream.write_all(response.as_bytes()).await { + error!("Failed to send response: {}", e); + } + }, + Err(er) => { + error!("Can't send response from cli_pipeline: {}", er); + }, } } Err(e) => { @@ -90,3 +96,27 @@ async fn process_connection(mut stream: UnixStream) { } let _ = stream.shutdown().await; } + + +async fn process_cli_cmd(cli : Cli, params: Arc) -> anyhow::Result { + use noxis_cli::{Commands, ConfigAction}; + return match cli.command { + Commands::Config(config) => { + match config.action { + ConfigAction::Show(env ) => { + if env.is_env { + Ok(serde_json::to_string_pretty(params.as_ref())?) + } else { + /* */ + Ok(String::from("Ok")) + } + }, + /* */ + _ => Ok(String::from("Ok")) + } + }, + /* */ + Commands::Status => Ok(String::from("Ok")), + _ => Ok(String::from("Ok")) + } +} diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 78463c7..21c523e 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -20,9 +20,9 @@ use dotenv::dotenv; /// noxis-rs ... --metrics none /// ``` /// -#[derive(clap::ValueEnum, Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum MetricsPrebootParams { - Full, + Full, System, Processes, Net, @@ -177,7 +177,7 @@ impl std::fmt::Display for MetricsPrebootParams { /// export NOXIS_METRICS_MODE "full" /// ``` /// -#[derive(Debug)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct PrebootParams { pub no_hostagent : bool, pub no_logs: bool, From 92aa6f5b6617a663d5a30a3120e8de1eb82598c9 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 22 May 2025 09:30:18 +0300 Subject: [PATCH 10/31] for diplom adapt --- noxis-rs/src/options/preboot.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 21c523e..df16a48 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -179,11 +179,11 @@ impl std::fmt::Display for MetricsPrebootParams { /// #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct PrebootParams { - pub no_hostagent : bool, + // pub no_hostagent : bool, pub no_logs: bool, pub refresh_logs : bool, pub no_sub : bool, - pub socket_path : PathBuf, + // pub socket_path : PathBuf, pub log_to : PathBuf, pub remote_server_url : String, pub config : PathBuf, @@ -198,12 +198,12 @@ impl PrebootParams { dotenv().ok(); Self { // bool - no_hostagent : { - match var("NOXIS_NO_HAGENT") { - Ok(_) => true, - Err(_) => false, - } - }, + // no_hostagent : { + // match var("NOXIS_NO_HAGENT") { + // Ok(_) => true, + // Err(_) => false, + // } + // }, no_logs : { match var("NOXIS_NO_LOGS") { Ok(_) => true, @@ -223,12 +223,12 @@ impl PrebootParams { } }, // vals - socket_path : { - match var("NOXIS_HAGENT_SOCKET_PATH") { - Ok(val) => PathBuf::from(val), - Err(_) => PathBuf::from("/var/run/enode/hostagent.sock"), - } - }, + // socket_path : { + // match var("NOXIS_HAGENT_SOCKET_PATH") { + // Ok(val) => PathBuf::from(val), + // Err(_) => PathBuf::from("/var/run/enode/hostagent.sock"), + // } + // }, log_to : { match var("NOXIS_LOG_TO") { Ok(val) => PathBuf::from(val), From 5c94e366eba995cd809286e5d6872e91648add12 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 22 May 2025 09:30:35 +0300 Subject: [PATCH 11/31] get local config using cli --- noxis-rs/src/options/cli_pipeline.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index 451b6bb..f1fbf7b 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -5,6 +5,7 @@ use std::fs; use std::sync::Arc; use tokio::io::{ AsyncWriteExt, AsyncReadExt}; use noxis_cli::Cli; +use super::structs::Processes; use super::preboot::PrebootParams; @@ -21,7 +22,7 @@ use super::preboot::PrebootParams; /// /// *depends on* : - /// -pub async fn init_cli_pipeline(params: Arc) -> anyhow::Result<()> { +pub async fn init_cli_pipeline(params: Arc, config : Arc) -> anyhow::Result<()> { let socket_path = ¶ms.self_socket; let _ = fs::remove_file(socket_path); @@ -33,8 +34,7 @@ pub async fn init_cli_pipeline(params: Arc) -> anyhow::Result<()> loop { match list.accept().await { Ok((socket, _)) => { - // tokio::spawn(); - process_connection(socket, params.clone()).await; + process_connection(socket, params.clone(), config.clone()).await; }, Err(er) => { error!("Cannot poll connection to CLI due to {}", er); @@ -64,7 +64,7 @@ pub async fn init_cli_pipeline(params: Arc) -> anyhow::Result<()> /// /// *depends on* : `tokio::net::TcpStream` /// -async fn process_connection(mut stream: UnixStream, params: Arc) { +async fn process_connection(mut stream: UnixStream, params: Arc, config : Arc) { let mut buf = vec![0; 1024]; match stream.read(&mut buf).await { Ok(0) => { @@ -76,7 +76,7 @@ async fn process_connection(mut stream: UnixStream, params: Arc) match serde_json::from_slice::(&buf) { Ok(cli) => { info!("Received CLI request: {:?}", cli); - match process_cli_cmd(cli, params.clone()).await { + match process_cli_cmd(cli, params.clone(), config).await { Ok(response) => { if let Err(e) = stream.write_all(response.as_bytes()).await { error!("Failed to send response: {}", e); @@ -98,7 +98,7 @@ async fn process_connection(mut stream: UnixStream, params: Arc) } -async fn process_cli_cmd(cli : Cli, params: Arc) -> anyhow::Result { +async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Arc) -> anyhow::Result { use noxis_cli::{Commands, ConfigAction}; return match cli.command { Commands::Config(config) => { @@ -108,7 +108,7 @@ async fn process_cli_cmd(cli : Cli, params: Arc) -> anyhow::Resul Ok(serde_json::to_string_pretty(params.as_ref())?) } else { /* */ - Ok(String::from("Ok")) + Ok(serde_json::to_string_pretty(global_config.as_ref())?) } }, /* */ From df3934452b505fa77abb3053a2a0e37ed06ee569 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 22 May 2025 09:31:18 +0300 Subject: [PATCH 12/31] brd listener for cli - cli ini only after config pull from brd ?? --- noxis-rs/src/main.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 7a1ca45..68a179d 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -29,6 +29,8 @@ async fn main() -> anyhow::Result<()>{ info!("Noxis is configurating..."); // let (tx_brd, mut rx_brd) = broadcast::channel::(1); + // for cli to get config + let mut rx_cli_brd = tx_brd.subscribe(); // cli <-> config let (tx_oneshot, rx_oneshot) = oneshot::channel::(); let mut handler: Vec> = vec![]; @@ -47,7 +49,17 @@ async fn main() -> anyhow::Result<()>{ // initilaizing task for cli manipulation let preboot_cli = preboot.clone(); let cli_module = tokio::spawn(async move { - if let Err(er) = init_cli_pipeline(preboot_cli).await { + let config = { + let mut tick = tokio::time::interval(Duration::from_millis(500)); + loop { + tick.tick().await; + break match rx_cli_brd.try_recv() { + Ok(conf) => conf, + Err(_) => continue, + } + } + }; + if let Err(er) = init_cli_pipeline(preboot_cli, Arc::new(config)).await { error!("CLI pipeline failed due to {}", er) } }); From 5fe95bfcd9d68cb265aeb54585048e5581b459fa Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 22 May 2025 09:51:37 +0300 Subject: [PATCH 13/31] log level setitng added --- noxis-rs/.env.example | 3 ++- noxis-rs/src/options/logger.rs | 25 ++++++++++++++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/noxis-rs/.env.example b/noxis-rs/.env.example index aa21a94..8efbd9b 100644 --- a/noxis-rs/.env.example +++ b/noxis-rs/.env.example @@ -10,4 +10,5 @@ NOXIS_LOG_TO = "/var/log/noxis/noxis.log" NOXIS_REMOTE_SERVER_URL = "ip.ip.ip.ip:port" NOXIS_CONFIG_PATH = "./settings.json" NOXIS_METRICS_MODE = "full" -NOXIS_SOCKET_PATH = "/path/to/noxis.sock" \ No newline at end of file +NOXIS_SOCKET_PATH = "/path/to/noxis.sock" +NOXIS_MAX_LOG_LEVEL = "TRACE" \ No newline at end of file diff --git a/noxis-rs/src/options/logger.rs b/noxis-rs/src/options/logger.rs index 14cd92c..314cbf7 100644 --- a/noxis-rs/src/options/logger.rs +++ b/noxis-rs/src/options/logger.rs @@ -49,7 +49,7 @@ pub fn setup_logger() -> Result<(), crate::options::structs::CustomError> { record.args(), ) }) - .filter(None, LevelFilter::Info) + .filter(None, LevelFilter::from_env()) .target(env_logger::Target::Stdout) // temporary deprecated // .target(env_logger::Target::Pipe(log_target)) @@ -58,6 +58,29 @@ pub fn setup_logger() -> Result<(), crate::options::structs::CustomError> { Ok(()) } +trait FromEnv { + fn from_env() -> LevelFilter; +} + +impl FromEnv for LevelFilter { + fn from_env() -> LevelFilter { + return match std::env::var("NOXIS_MAX_LOG_LEVEL") { + Ok(var) => { + match var.to_ascii_lowercase().trim().as_ref() { + "trace" => LevelFilter::Trace, + "debug" => LevelFilter::Debug, + "info" => LevelFilter::Info, + "error" => LevelFilter::Error, + "warn" => LevelFilter::Warn, + "off" => LevelFilter::Off, + _ => LevelFilter::Info, + } + }, + Err(_) => LevelFilter::Info, + } + } +} + #[cfg(test)] mod logger_tests { use super::*; From a4afd430afbf82203ef5ca4b508b92fc27fa3721 Mon Sep 17 00:00:00 2001 From: prplV Date: Fri, 23 May 2025 15:17:03 +0300 Subject: [PATCH 14/31] cli work with config --- noxis-rs/src/main.rs | 14 ++---- noxis-rs/src/options/cli_pipeline.rs | 75 ++++++++++++++++++++++++---- noxis-rs/src/options/config.rs | 2 +- noxis-rs/src/options/structs.rs | 3 ++ 4 files changed, 74 insertions(+), 20 deletions(-) diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 68a179d..3f1c6fd 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -1,28 +1,20 @@ mod options; mod utils; -use clap::Parser; use log::{error, info}; -use options::config::*; use options::logger::setup_logger; use options::signals::set_valid_destructor; use options::structs::Processes; use options::cli_pipeline::init_cli_pipeline; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc; -use utils::*; use options::preboot::PrebootParams; use tokio::sync::{broadcast, oneshot}; use options::config::v2::init_config_mechanism; use utils::v2::init_monitoring; -use metrics::init_metrics_grubber; #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ - // init_metrics_grubber().await; - // todo!(); - let preboot = Arc::new(PrebootParams::validate()); let _ = setup_logger(); @@ -59,7 +51,11 @@ async fn main() -> anyhow::Result<()>{ } } }; - if let Err(er) = init_cli_pipeline(preboot_cli, Arc::new(config)).await { + if let Err(er) = init_cli_pipeline( + preboot_cli, + Arc::new(config), + tx_oneshot + ).await { error!("CLI pipeline failed due to {}", er) } }); diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index f1fbf7b..f711ac9 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -1,5 +1,6 @@ use log::{error, info, warn}; use tokio::net::{ UnixStream, UnixListener }; +use tokio::sync::{Mutex, OnceCell}; use tokio::time::{sleep, Duration}; use std::fs; use std::sync::Arc; @@ -9,6 +10,9 @@ use super::structs::Processes; use super::preboot::PrebootParams; +type ConfigGateway = tokio::sync::oneshot::Sender; +type ProcessedConfigGateway = Arc>>; + /// # Fn `init_cli_pipeline` /// ## for catching all input requests from CLI /// @@ -22,10 +26,20 @@ use super::preboot::PrebootParams; /// /// *depends on* : - /// -pub async fn init_cli_pipeline(params: Arc, config : Arc) -> anyhow::Result<()> { +pub async fn init_cli_pipeline( + params: Arc, + config : Arc, + config_gateway : ConfigGateway, +) -> anyhow::Result<()> { let socket_path = ¶ms.self_socket; let _ = fs::remove_file(socket_path); + let config_gateway = Arc::new( + Mutex::new( + OnceCell::new_with(Some(config_gateway)) + ) + ); + match UnixListener::bind(socket_path) { Ok(list) => { // TODO: remove `unwrap`s @@ -34,7 +48,13 @@ pub async fn init_cli_pipeline(params: Arc, config : Arc { - process_connection(socket, params.clone(), config.clone()).await; + // ??? maybe errors on async work with data transfering between modules + let params = params.clone(); + let config = config.clone(); + let config_gateway = config_gateway.clone(); + tokio::spawn(async move { + process_connection(socket, params.clone(), config.clone(), config_gateway.clone()).await; + }); }, Err(er) => { error!("Cannot poll connection to CLI due to {}", er); @@ -64,7 +84,7 @@ pub async fn init_cli_pipeline(params: Arc, config : Arc, config : Arc) { +async fn process_connection(mut stream: UnixStream, params: Arc, config : Arc, cfg_gateway : ProcessedConfigGateway) { let mut buf = vec![0; 1024]; match stream.read(&mut buf).await { Ok(0) => { @@ -76,15 +96,18 @@ async fn process_connection(mut stream: UnixStream, params: Arc, match serde_json::from_slice::(&buf) { Ok(cli) => { info!("Received CLI request: {:?}", cli); - match process_cli_cmd(cli, params.clone(), config).await { + let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone()).await { Ok(response) => { - if let Err(e) = stream.write_all(response.as_bytes()).await { - error!("Failed to send response: {}", e); - } + response }, Err(er) => { - error!("Can't send response from cli_pipeline: {}", er); + let error_msg = format!("Error: {}", er); + error!("{}", &error_msg); + error_msg }, + }; + if let Err(e) = stream.write_all(response.as_bytes()).await { + error!("Failed to send response: {}", e); } } Err(e) => { @@ -98,7 +121,7 @@ async fn process_connection(mut stream: UnixStream, params: Arc, } -async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Arc) -> anyhow::Result { +async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Arc, cfg_gateway: ProcessedConfigGateway) -> anyhow::Result { use noxis_cli::{Commands, ConfigAction}; return match cli.command { Commands::Config(config) => { @@ -111,8 +134,40 @@ async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Ok(serde_json::to_string_pretty(global_config.as_ref())?) } }, + ConfigAction::Reset => { + Err(anyhow::Error::msg("It's temporarly forbidden to reset current config using CLI-util")) + }, + ConfigAction::Local(cfg) => { + if cfg.is_json { + /* */ + let new_config = serde_json::from_str::(&cfg.config)?; + let new_version = new_config.get_version().to_string(); + + use super::{config::config_comparing, structs::ConfigActuality}; + + return match config_comparing(&global_config, &new_config) { + ConfigActuality::Remote => { + let cfg_gateway = cfg_gateway.clone(); + tokio::spawn(async move { + let mut lock = cfg_gateway.lock().await; + match lock.take() { + Some(channel) => { + let _ = channel.send(new_config); + }, + None => error!("Cannot update confif due to channel sender loss"), + } + }); + Ok(format!("Ok. Saving and reloading with version {}", new_version)) + }, + _ => Err(anyhow::Error::msg(format!("Local config (version: {}) is more actual", global_config.get_version()))), + } + } else { + Err(anyhow::Error::msg("It's temporarly forbidden to set config in non-json mode")) + } + }, + ConfigAction::Remote => {Ok(params.remote_server_url.clone())}, /* */ - _ => Ok(String::from("Ok")) + _ => Err(anyhow::Error::msg("Unrecognized command from CLI")) } }, /* */ diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 00e3e73..11350cc 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -780,7 +780,7 @@ pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc ConfigActuality { +pub fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { if local.is_default() { return ConfigActuality::Remote; } diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 3423d25..51855d5 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -173,6 +173,9 @@ impl Processes { pub fn is_default(&self) -> bool { self.date_of_creation.is_empty() } + pub fn get_version(&self) -> &str { + &self.date_of_creation + } } /// # Struct for the 2nd level in json conf file From 3f98fd7f24e168298686e4ee2c898a74fd6fa740 Mon Sep 17 00:00:00 2001 From: prplV Date: Fri, 23 May 2025 16:16:03 +0300 Subject: [PATCH 15/31] prcs update --- noxis-rs/src/options/cli_pipeline.rs | 2 +- noxis-rs/src/utils/prcs.rs | 44 +++++++++++++++++++--------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index f711ac9..a976a19 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -167,7 +167,7 @@ async fn process_cli_cmd(cli : Cli, params: Arc, global_config : }, ConfigAction::Remote => {Ok(params.remote_server_url.clone())}, /* */ - _ => Err(anyhow::Error::msg("Unrecognized command from CLI")) + // _ => Err(anyhow::Error::msg("Unrecognized command from CLI")) } }, /* */ diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 76a09f6..4ed600b 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -10,6 +10,7 @@ use serde::Serialize; pub mod v2 { use log::info; + use tokio::time::sleep; use crate::options::structs::DependencyType; use std::path::Path; @@ -119,6 +120,7 @@ pub mod v2 { "restart" => { info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); let _ = restart_process(&self.name, &self.bin).await; + sleep(Duration::from_millis(100)).await; self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); info!("{}: New PID - {}", self.name, self.pid); }, @@ -126,6 +128,26 @@ pub mod v2 { } tokio::time::sleep(Duration::from_micros(100)).await; } + pub async fn stop_by_user_call(&mut self) -> anyhow::Result<()> { + terminate_process(&self.name).await?; + self.state = ProcessState::StoppedByCli; + Ok(()) + } + pub async fn freeze_by_user_call(&mut self) -> anyhow::Result<()> { + freeze_process(&self.name).await?; + self.state = ProcessState::HoldingByCli; + Ok(()) + } + pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> { + start_process(&self.name, &self.bin).await?; + self.state = ProcessState::Pending; + Ok(()) + } + pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> { + unfreeze_process(&self.name).await?; + self.state = ProcessState::Pending; + Ok(()) + } } #[async_trait] @@ -298,14 +320,11 @@ pub async fn is_frozen(name: &str) -> bool { /// /// *depends on* : - /// -pub async fn terminate_process(name: &str) { +pub async fn terminate_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") - .arg(name) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute command 'pkill'"); - std::process::exit(101); - }); + .arg(name) + .output()?; + Ok(()) } /// # Fn `terminate_process` @@ -321,14 +340,11 @@ pub async fn terminate_process(name: &str) { /// /// *depends on* : - /// -pub async fn freeze_process(name: &str) { +pub async fn freeze_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") .args(["-STOP", name]) - .output() - .unwrap_or_else(|_| { - error!("Failed to freeze process"); - std::process::exit(101); - }); + .output()?; + Ok(()) } /// # Fn `unfreeze_process` @@ -365,7 +381,7 @@ pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> { /// *depends on* : fn `start_process`, fn `terminate_process` /// pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> { - terminate_process(name).await; + terminate_process(name).await?; tokio::time::sleep(Duration::from_millis(100)).await; start_process(name, path).await } From 54d2b1aaf712d9e44ca6e8277eb3522bd335c969 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 26 May 2025 14:32:58 +0300 Subject: [PATCH 16/31] serv + config fix --- noxis-rs/settings.json | 4 ++-- noxis-rs/src/options/config.rs | 8 ++----- noxis-rs/src/utils/prcs.rs | 4 ++-- noxis-rs/src/utils/services.rs | 43 +++++++++++++++++++++++++++------- 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index b27d9c7..4dae157 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -21,11 +21,11 @@ "port": 443, "triggers": { "wait": 10, - "onLost": "restart" + "onLost": "stop" } } ] } } ] -} +} \ No newline at end of file diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 11350cc..3a461e1 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -189,7 +189,7 @@ pub mod v2 { }, Ok(_) => { info!("Successfully subscribed to {} pubsub channel", channel_name); - let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3))); + let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(1))); loop { if let Ok(msg) = pub_sub.get_message() { // dbg!("ok on get message"); @@ -325,10 +325,6 @@ pub mod v2 { if !events.is_empty() { warn!("Local config file was overwritten. Discarding changes ..."); need_to_export_config = true; - // events - // .iter() - // .any(|event| *event == EventMask::DELETE_SELF) - // .then(|| need_to_recreate_watcher = true); } } } @@ -686,7 +682,7 @@ fn get_connection_watcher(client: &Client) -> Connection { /// fn restart_main_thread() -> std::io::Result<()> { let current_exe = env::current_exe()?; - Command::new(current_exe).exec(); + let _ = Command::new(current_exe).exec(); Ok(()) } diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 4ed600b..23a712c 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -88,7 +88,7 @@ pub mod v2 { "stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); - terminate_process(&self.name).await; + let _ = terminate_process(&self.name).await; self.state = ProcessState::Stopped; self.pid = Pid::new(); } @@ -96,7 +96,7 @@ pub mod v2 { "user-stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); - terminate_process(&self.name).await; + let _ = terminate_process(&self.name).await; self.state = ProcessState::StoppedByCli; self.pid = Pid::new(); } diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index a381cb1..33737a3 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -95,10 +95,37 @@ pub mod v2 { self.event_registrator.entry(proc_name).or_insert((trigger, sender)); } async fn check_state(&self) -> anyhow::Result<()> { - let mut addrs = self.access_url.to_socket_addrs()?; - if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { - return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))) + let url = self.access_url.clone(); + let resolve_future = tokio::task::spawn_blocking(move || { + url.to_socket_addrs() + }); + let addrs: Vec<_> = match tokio::time::timeout(Duration::from_secs(1), resolve_future).await { + Ok(Ok(addrs)) => addrs?.collect(), + Ok(Err(er)) => return Err(er.into()), + Err(_) => return Err(anyhow::Error::msg("DNS resolution timeout")), + }; + + if addrs.is_empty() { + return Err(anyhow::Error::msg("No addresses resolved")); } + + let tasks: Vec<_> = addrs.into_iter().map(|addr| async move { + match tokio::time::timeout(Duration::from_secs(1), tokio::net::TcpStream::connect(&addr)).await { + Ok(Ok(_)) => Some(addr), + _ => None, + } + }).collect(); + let mut any_success = false; + for task in futures::future::join_all(tasks).await { + if task.is_some() { + any_success = true; + break; + } + } + if !any_success { + return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))); + } + Ok(()) } async fn trigger_on(&mut self) { @@ -123,7 +150,6 @@ pub mod v2 { let timer = tokio::time::Instant::now(); let mut attempt: u32 = 1; let access_url = Arc::new(self.access_url.clone()); - // let event_registrator = &mut self.event_registrator; if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async { // let access_url = access_url.clone(); @@ -133,15 +159,16 @@ pub mod v2 { attempt += 1; let state_check_result = self.check_state().await; - + if state_check_result.is_ok() { info!("Connection to {} is `OK` now", &access_url); - self.state = ServiceState::Ok; + self.state = ServiceState::Ok; break; } else { let now = timer.elapsed(); + let iterator = self.config.iter() - .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) + .filter(|(&wait, _)| tokio::time::Duration::from_secs(wait as u64) <= now) .flat_map(|(_, a)| a.iter().cloned()) .collect::>>(); @@ -178,7 +205,7 @@ pub mod v2 { self.trigger_on().await; }, (ServiceState::Ok, Err(_)) => { - warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); + warn!("Unreachable for connection service `{}`. Initializing reconnect mechanism ...", &self.access_url); self.state = ServiceState::Unavailable; self.trigger_on().await; }, From bd5e21fce7dd87074b356387229ef4a5256de8d9 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 28 May 2025 14:50:43 +0300 Subject: [PATCH 17/31] file check bug fixed --- noxis-rs/src/utils/files.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index da87b0f..2cfa44f 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -102,14 +102,16 @@ } match &mut self.watcher { Some(notify) => { - let mut buffer = [0; 1024]; - if let Ok(mut notif_events) = notify.read_events(&mut buffer) { - // notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);}); - // todo!(); - if let (recreate_watcher, true) = ( - notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF), - notif_events.any(|mask| mask.mask == EventMask::MODIFY) - ) { + let mut buffer = [0; 128]; + if let Ok(notif_events) = notify.read_events(&mut buffer) { + let (need_to_recreate, was_modifired) = notif_events.fold((false, false), |(a, b), mask| { + ( + a || mask.mask == EventMask::DELETE_SELF, + b || mask.mask == EventMask::MODIFY, + ) + }); + + if let (recreate_watcher, true) = (need_to_recreate, was_modifired) { warn!("File {} ({}) was changed", self.name, &self.path); if recreate_watcher { self.watcher = match create_watcher(&self.name, &self.path) { From 3147c73006631d461b948743623b37ea7da60e69 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 28 May 2025 15:06:28 +0300 Subject: [PATCH 18/31] PID bug fixed --- noxis-rs/src/utils/prcs.rs | 79 ++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 25 deletions(-) diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 23a712c..6c7c173 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -17,7 +17,7 @@ pub mod v2 { use super::*; #[derive(Debug, Serialize, Clone, Copy)] - pub struct Pid(i64); + pub struct Pid(u32); impl std::fmt::Display for Pid { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -27,16 +27,16 @@ pub mod v2 { impl Pid { fn new() -> Self { - Pid(-1) + Pid(0) } fn new_from_output(pid: Option) -> Self { let temp = { match pid { Some(pid) => String::from_utf8_lossy(pid.stdout.trim_ascii()).into_owned(), - None => return Pid(-1), + None => return Pid(0), } }; - Pid(temp.parse::().unwrap_or_else(|_| { -1 })) + Pid(temp.parse::().unwrap_or_else(|_| { 0 })) } pub fn new_sysinfo_pid(&self) -> sysinfo::Pid { sysinfo::Pid::from_u32(self.0 as u32) @@ -88,41 +88,67 @@ pub mod v2 { "stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); - let _ = terminate_process(&self.name).await; - self.state = ProcessState::Stopped; - self.pid = Pid::new(); + match terminate_process(&self.name).await { + Ok(_) => { + self.state = ProcessState::Stopped; + self.pid = Pid::new(); + }, + Err(er) => { + error!("Cannot stop process {} : {}", self.name, er); + }, + } } }, "user-stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); - let _ = terminate_process(&self.name).await; - self.state = ProcessState::StoppedByCli; - self.pid = Pid::new(); + match terminate_process(&self.name).await { + Ok(_) => { + self.state = ProcessState::StoppedByCli; + self.pid = Pid::new(); + }, + Err(er) => { + error!("Cannot forcefully stop process {} : {}", self.name, er); + }, + } } }, "user-hold" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name); - freeze_process(&self.name).await; - self.state = ProcessState::HoldingByCli; - self.pid = Pid::new(); + match freeze_process(&self.name).await { + Ok(_) => { + self.state = ProcessState::HoldingByCli; + self.pid = Pid::new(); + }, + Err(er) => { + error!("Cannot forcefully freeze process {} : {}", self.name, er); + }, + } } }, "hold" => { if !is_frozen(&self.name).await { info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); - freeze_process(&self.name).await; - self.state = ProcessState::Holding; - self.pid = Pid::new(); + match freeze_process(&self.name).await { + Ok(_) => { + self.state = ProcessState::Holding; + self.pid = Pid::new(); + }, + Err(er) => { + error!("Cannot freeze process {} : {}", self.name, er); + }, + } } }, "restart" => { info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); - let _ = restart_process(&self.name, &self.bin).await; + let pid = restart_process(&self.name, &self.bin).await; sleep(Duration::from_millis(100)).await; - self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); - info!("{}: New PID - {}", self.name, self.pid); + if let Ok(pid) = pid { + self.pid = Pid(pid); + info!("{}: New PID - {}", self.name, self.pid); + } }, _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), } @@ -131,6 +157,7 @@ pub mod v2 { pub async fn stop_by_user_call(&mut self) -> anyhow::Result<()> { terminate_process(&self.name).await?; self.state = ProcessState::StoppedByCli; + self.pid = Pid::new(); Ok(()) } pub async fn freeze_by_user_call(&mut self) -> anyhow::Result<()> { @@ -139,8 +166,9 @@ pub mod v2 { Ok(()) } pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> { - start_process(&self.name, &self.bin).await?; + let pid = start_process(&self.name, &self.bin).await?; self.state = ProcessState::Pending; + self.pid = Pid(pid); Ok(()) } pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> { @@ -380,7 +408,7 @@ pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> { /// /// *depends on* : fn `start_process`, fn `terminate_process` /// -pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> { +pub async fn restart_process(name: &str, path: &str) -> anyhow::Result { terminate_process(name).await?; tokio::time::sleep(Duration::from_millis(100)).await; start_process(name, path).await @@ -399,18 +427,19 @@ pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> { /// /// *depends on* : - /// -pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> { +pub async fn start_process(name: &str, path: &str) -> anyhow::Result { // let runsh = format!("{} {}", "exec", path); let mut command = Command::new(path); // command.arg(path); match command.spawn() { - Ok(_) => { + Ok(child) => { + let pid = child.id(); warn!("Process {} is running now!", name); - Ok(()) + Ok(pid) } Err(er) => { - Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er))) + Err(anyhow::Error::msg(format!("Cannot start process {} : {}", name, er))) } } } From ac0c3cb8c16f8c5b5a7e7cf0ac760f129c3c2785 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 28 May 2025 16:42:50 +0300 Subject: [PATCH 19/31] constructor inlining --- noxis-rs/src/utils/files.rs | 2 ++ noxis-rs/src/utils/prcs.rs | 3 ++- noxis-rs/src/utils/services.rs | 5 +++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index 2cfa44f..4d022b6 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -42,6 +42,7 @@ } impl FilesController { + #[inline(always)] pub fn new(name: &str, triggers: EventHandlers) -> FilesController { let name: Arc = Arc::from(name); Self { @@ -53,6 +54,7 @@ code_name : name.clone(), } } + #[inline(always)] pub fn with_path(mut self, path: impl AsRef) -> anyhow::Result { self.path = path.as_ref().to_string_lossy().into_owned(); self.watcher = { diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 6c7c173..7999930 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -61,6 +61,7 @@ pub mod v2 { } impl ProcessesController { + #[inline(always)] pub fn new(name: &str, event_reader: MpscReciever) -> ProcessesController { ProcessesController { name : Arc::from(name), @@ -71,11 +72,11 @@ pub mod v2 { negative_events : HashSet::new(), } } + #[inline(always)] pub fn with_exe(mut self, bin: impl AsRef) -> ProcessesController { self.bin = bin.as_ref().to_string_lossy().into_owned(); self } - pub fn get_pid(&self) -> Pid { self.pid } diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index 33737a3..7885eb4 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -42,6 +42,7 @@ pub mod v2 { } impl ServicesController { + #[inline(always)] pub fn new() -> ServicesController { ServicesController { name : String::new(), @@ -51,6 +52,7 @@ pub mod v2 { event_registrator : EventHandlers::new(), } } + #[inline(always)] pub fn with_access_name( mut self, hostname: &str, @@ -60,7 +62,7 @@ pub mod v2 { self.access_url = Arc::from(access_url); self } - + #[inline(always)] pub fn with_params( mut self, conn_queue: ConnectionQueue, @@ -70,7 +72,6 @@ pub mod v2 { self.event_registrator = event_reg; self } - pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) } From 75940cb187e11be2eba420d96c3aebc87af79f8c Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 28 May 2025 17:28:13 +0300 Subject: [PATCH 20/31] pid logic fixed --- noxis-rs/src/utils/prcs.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 7999930..996e46e 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -120,7 +120,7 @@ pub mod v2 { match freeze_process(&self.name).await { Ok(_) => { self.state = ProcessState::HoldingByCli; - self.pid = Pid::new(); + // self.pid = Pid::new(); }, Err(er) => { error!("Cannot forcefully freeze process {} : {}", self.name, er); @@ -134,7 +134,7 @@ pub mod v2 { match freeze_process(&self.name).await { Ok(_) => { self.state = ProcessState::Holding; - self.pid = Pid::new(); + // self.pid = Pid::new(); }, Err(er) => { error!("Cannot freeze process {} : {}", self.name, er); @@ -177,6 +177,11 @@ pub mod v2 { self.state = ProcessState::Pending; Ok(()) } + pub async fn restart_by_user_call(&mut self) -> anyhow::Result<()> { + let pid = restart_process(&self.name, &self.bin).await?; + self.pid = Pid(pid); + Ok(()) + } } #[async_trait] @@ -190,18 +195,22 @@ pub mod v2 { error!("Cannot unfreeze process {} : {}", self.name, er); } else { self.state = ProcessState::Pending; - self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); - info!("{}: New PID - {}", self.name, self.pid); + // self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); + // info!("{}: New PID - {}", self.name, self.pid); + info!("Process {} was unfreezed", &self.name); } }, ProcessState::Stopped => { info!("No negative dependecies events on {} process. Starting ...", self.name); - if let Err(er) = start_process(&self.name, &self.bin).await { - error!("Cannot start process {} : {}", self.name, er); - } else { - self.state = ProcessState::Pending; - self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); - info!("{}: New PID - {}", self.name, self.pid); + match start_process(&self.name, &self.bin).await { + Ok(pid) => { + self.state = ProcessState::Pending; + self.pid = Pid(pid); + info!("{}: New PID - {}", self.name, self.pid); + }, + Err(er) => { + error!("Cannot start process {} : {}", self.name, er); + }, } }, _ => {}, From 81df3f8435982e29d985dc13e6f76d8064c66a5b Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 28 May 2025 18:23:51 +0300 Subject: [PATCH 21/31] service bug --- noxis-rs/settings.json | 2 +- noxis-rs/src/utils/prcs.rs | 5 +- noxis-rs/src/utils/services.rs | 174 ++++++--------------------------- 3 files changed, 33 insertions(+), 148 deletions(-) diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index 4dae157..62d45f2 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -20,7 +20,7 @@ "hostname": "ya.ru", "port": 443, "triggers": { - "wait": 10, + "wait": 5, "onLost": "stop" } } diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 996e46e..3214edd 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -91,6 +91,7 @@ pub mod v2 { info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); match terminate_process(&self.name).await { Ok(_) => { + info!("Process {} was stopped ...", &self.name); self.state = ProcessState::Stopped; self.pid = Pid::new(); }, @@ -105,6 +106,7 @@ pub mod v2 { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); match terminate_process(&self.name).await { Ok(_) => { + info!("Process {} was forcefully stopped ...", &self.name); self.state = ProcessState::StoppedByCli; self.pid = Pid::new(); }, @@ -119,6 +121,7 @@ pub mod v2 { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name); match freeze_process(&self.name).await { Ok(_) => { + info!("Process {} was forcefully frozen ...", &self.name); self.state = ProcessState::HoldingByCli; // self.pid = Pid::new(); }, @@ -133,8 +136,8 @@ pub mod v2 { info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); match freeze_process(&self.name).await { Ok(_) => { + info!("Process {} was frozen ...", &self.name); self.state = ProcessState::Holding; - // self.pid = Pid::new(); }, Err(er) => { error!("Cannot freeze process {} : {}", self.name, er); diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index 7885eb4..0d568e9 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -5,8 +5,11 @@ use std::sync::Arc; use tokio::time::Duration; use tokio::sync::mpsc::Sender as Sender; use async_trait::async_trait; +use std::pin::Pin; +use futures::future::Future; pub mod v2 { + use futures::FutureExt; use log::info; use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState}; @@ -72,6 +75,7 @@ pub mod v2 { self.event_registrator = event_reg; self } + pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) } @@ -111,7 +115,7 @@ pub mod v2 { } let tasks: Vec<_> = addrs.into_iter().map(|addr| async move { - match tokio::time::timeout(Duration::from_secs(1), tokio::net::TcpStream::connect(&addr)).await { + match tokio::time::timeout(Duration::from_secs(2), tokio::net::TcpStream::connect(&addr)).await { Ok(Ok(_)) => Some(addr), _ => None, } @@ -132,11 +136,16 @@ pub mod v2 { async fn trigger_on(&mut self) { match self.state { ServiceState::Ok => { - let _ = self.event_registrator - .iter() - .map(|(_, (_, el))| async { - let _ = el.send(Events::Positive(self.access_url.clone())).await; - }); + let futures : Vec + Send>>> = self.event_registrator.iter() + .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) + .map(|(prc, (serv, sender_opt))| async move { + info!("Notifying process {} ...", prc); + let _ = sender_opt.send(Events::Positive(serv.clone())); + }) + .map(|fut| fut.boxed()) + .collect(); + + futures::future::join_all(futures).await; }, ServiceState::Unavailable => { // looped check and notifying @@ -160,10 +169,20 @@ pub mod v2 { attempt += 1; let state_check_result = self.check_state().await; - + if state_check_result.is_ok() { info!("Connection to {} is `OK` now", &access_url); - self.state = ServiceState::Ok; + self.state = ServiceState::Ok; + let futures : Vec + Send>>> = self.event_registrator.iter() + .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) + .map(|(prc, (serv, sender_opt))| async move { + info!("Notifying process {} ...", prc); + let _ = sender_opt.send(Events::Positive(serv.clone())); + }) + .map(|fut| fut.boxed()) + .collect(); + + futures::future::join_all(futures).await; break; } else { let now = timer.elapsed(); @@ -178,7 +197,7 @@ pub mod v2 { info!("Trying to notify process `{}` ...", &proc_name); let sender_opt = self.event_registrator.get(&name) .map(|(trigger, sender)| - (trigger.to_service_negative_event(name.clone()), sender) + (trigger.to_service_negative_event(self.access_url.clone()), sender) ); if let Some((tr, tx)) = sender_opt { @@ -217,144 +236,7 @@ pub mod v2 { } } -/// # Fn `service_handler` -/// ## function to realize mechanism of current process' dep services monitoring -/// -/// *input* : `&str`, `&Vec`, `Arc>` -/// -/// *output* : () -/// -/// *initiator* : fn `utils::running_handler` -/// -/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer -/// -/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` -/// -// pub async fn service_handler( -// name: &str, -// services: &Vec, -// tx: Arc>, -// ) -> Result<(), CustomError> { -// // println!("service daemon on {}", name); -// for serv in services { -// if check_service(&serv.hostname, &serv.port).await.is_err() { -// if !is_active(name).await || is_frozen(name).await { -// return Err(CustomError::Fatal); -// } -// error!( -// "Service {}:{} is unreachable for process {}", -// &serv.hostname, &serv.port, &name -// ); -// match serv.triggers.on_lost.as_str() { -// "stay" => { -// tx.send(4).await.unwrap(); -// continue; -// } -// "stop" => { -// if looped_service_connecting(name, serv).await.is_err() { -// tx.send(5).await.unwrap(); -// tokio::task::yield_now().await; -// return Err(CustomError::Fatal); -// } -// } -// "hold" => { -// // if is_frozen(name).await { -// // return Err(CustomError::Fatal); -// // } -// if looped_service_connecting(name, serv).await.is_err() { -// tx.send(6).await.unwrap(); -// tokio::task::yield_now().await; -// return Err(CustomError::Fatal); -// } -// } -// _ => { -// tx.send(101).await.unwrap(); -// return Err(CustomError::Fatal); -// } -// } -// } -// } -// tokio::time::sleep(Duration::from_millis(100)).await; -// Ok(()) -// } -/// # Fn `looped_service_connecting` -/// ## for service's state check in loop (with delay and restriction of attempts) -/// -/// *input* : `&str`, `&Services` -/// -/// *output* : Ok(()) if service now available | Err(er) if still not -/// -/// *initiator* : fn `service_handler` -/// -/// *managing* : process name, current service struct -/// -/// *depends on* : fn `check_service` -/// -// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { -// if serv.triggers.wait == 0 { -// loop { -// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; -// warn!( -// "Attempting to connect from {} process to {}:{}", -// &name, &serv.hostname, &serv.port -// ); -// match check_service(&serv.hostname, &serv.port).await { -// Ok(_) => { -// log::info!( -// "Successfully connected to {} from {} process!", -// &serv.hostname, -// &name -// ); -// break; -// } -// Err(_) => { -// tokio::task::yield_now().await; -// } -// } -// } -// Ok(()) -// } else { -// let start = Instant::now(); -// while start.elapsed().as_secs() < serv.triggers.wait.into() { -// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; -// warn!( -// "Attempting to connect from {} process to {}:{}", -// &name, &serv.hostname, &serv.port -// ); -// match check_service(&serv.hostname, &serv.port).await { -// Ok(_) => { -// log::info!( -// "Successfully connected to {} from {} process!", -// &serv.hostname, -// &name -// ); -// return Ok(()); -// } -// Err(_) => { -// tokio::task::yield_now().await; -// } -// } -// } -// Err(CustomError::Fatal) -// } -// } - -/// # Fn `check_service` -/// ## for check current service's availiability -/// -/// *input* : `&str`, `&u32` -/// -/// *output* : Ok(()) if service now available | Err(er) if still not -/// -/// *initiator* : fn `service_handler`, fn `looped_service_connecting` -/// -/// *managing* : hostname, port -/// -/// *depends on* : - -/// -// ! have to be rewritten -// todo: rewrite use async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { let addr = format!("{}:{}", hostname, port); From 08e99385b91673266f3574a4871485ebc6e87cf8 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 16:59:12 +0300 Subject: [PATCH 22/31] force stop and freeze wrapping --- noxis-rs/src/utils/prcs.rs | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 3214edd..4747204 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -191,20 +191,20 @@ pub mod v2 { impl ProcessUnit for ProcessesController { async fn process(&mut self) { if self.negative_events.len() == 0 { - match self.state { - ProcessState::Holding => { - info!("No negative dependecies events on {} process. Unfreezing ...", self.name); + let conditions = (is_active(&self.name).await, is_frozen(&self.name).await); + let state = &self.state; + match (state, conditions) { + (ProcessState::Holding, (_, _)) => { + info!("No negative dependecies events on {} frozen process. Unfreezing ...", self.name); if let Err(er) = unfreeze_process(&self.name).await { error!("Cannot unfreeze process {} : {}", self.name, er); } else { self.state = ProcessState::Pending; - // self.pid = Pid::new_from_output(get_pid(self.name.as_ref()).await); - // info!("{}: New PID - {}", self.name, self.pid); info!("Process {} was unfreezed", &self.name); } }, - ProcessState::Stopped => { - info!("No negative dependecies events on {} process. Starting ...", self.name); + (ProcessState::Stopped, (_, _)) => { + info!("No negative dependecies events on stopped {} process. Starting ...", self.name); match start_process(&self.name, &self.bin).await { Ok(pid) => { self.state = ProcessState::Pending; @@ -216,6 +216,28 @@ pub mod v2 { }, } }, + (ProcessState::Pending, (false, false)) => { + info!("{} process was impermissibly stopped. Starting ...", self.name); + match start_process(&self.name, &self.bin).await { + Ok(pid) => { + self.state = ProcessState::Pending; + self.pid = Pid(pid); + info!("{}: New PID - {}", self.name, self.pid); + }, + Err(er) => { + error!("Cannot start process {} : {}", self.name, er); + }, + } + }, + (ProcessState::Pending, (true, true)) => { + info!("No negative dependecies events on {} process. Unfreezing ...", self.name); + if let Err(er) = unfreeze_process(&self.name).await { + error!("Cannot unfreeze process {} : {}", self.name, er); + } else { + self.state = ProcessState::Pending; + info!("Process {} was unfreezed", &self.name); + } + }, _ => {}, } } From 19b3477560c5927f4f60de104b6a9d4d9d3363a9 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:15:57 +0300 Subject: [PATCH 23/31] service handler bug fixed --- noxis-rs/settings.json | 2 +- noxis-rs/src/utils/services.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index 62d45f2..263600a 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -20,7 +20,7 @@ "hostname": "ya.ru", "port": 443, "triggers": { - "wait": 5, + "wait": 2, "onLost": "stop" } } diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index 0d568e9..cdee768 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -140,7 +140,7 @@ pub mod v2 { .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) .map(|(prc, (serv, sender_opt))| async move { info!("Notifying process {} ...", prc); - let _ = sender_opt.send(Events::Positive(serv.clone())); + let _ = sender_opt.send(Events::Positive(serv.clone())).await; }) .map(|fut| fut.boxed()) .collect(); From 3d0b7766ac0364493ea3c909b4507fa29d997c3f Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:24:05 +0300 Subject: [PATCH 24/31] utils ref + lazy static --- noxis-rs/Cargo.toml | 1 + noxis-rs/src/utils.rs | 255 ++---------------------------------------- 2 files changed, 10 insertions(+), 246 deletions(-) diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 38380c0..73ed76b 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -21,3 +21,4 @@ dotenv = "0.15.0" futures = "0.3.31" async-trait = "0.1.88" crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } +lazy_static = "1.5.0" diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 03513db..2ef0d08 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -4,31 +4,23 @@ pub mod metrics; pub mod prcs; pub mod services; -// TODO : saving current flags state - -use crate::options::structs::{CustomError, TrackingProcess, Processes}; -// use files::create_watcher; -// use files::file_handler; -// use inotify::Inotify; -use log::{error, warn, info}; -use prcs::{ - freeze_process, is_active, is_frozen, restart_process, terminate_process, - unfreeze_process, -}; -// use services::service_handler; +use crate::options::structs::Processes; +use log::{error, info}; use std::process::Command; use std::sync::Arc; -// use tokio::join; use tokio::sync::mpsc; use tokio::time::Duration; -// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; -// controllers import use prcs::v2::ProcessesController; use files::v2::FilesController; use services::v2::ServicesController; use async_trait::async_trait; +use lazy_static::lazy_static; -const GET_ID_CMD: &str = "hostname"; +lazy_static! { + static ref GET_ID_CMD : &'static str = "hostname"; +} + +// const GET_ID_CMD: &str = "hostname"; pub mod v2 { use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; @@ -187,237 +179,8 @@ pub mod v2 { supervisor.process().await; Ok(()) } - - - // async fn generate_controllers<'a>(config: Processes) -> (HashSet>, HashSet>, HashSet>) { - // let mut prcs: HashSet> = HashSet::new(); - // let mut files: HashSet> = HashSet::new(); - // let mut services: HashSet> = HashSet::new(); - // for prc in config.processes { - // let (rx, tx) = mpsc::channel::>(10); - // // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path); - // let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path); - // let a = new_prc.process().await; - - // } - // (prcs, files, services) - // } - // spawn prc check with semaphore check - async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) } - - // spawn file check with semaphore check - async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) } - - // spawn service check with semaphore check - async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) } } -/// # Fn `run_daemons` -/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel -/// -/// *input* : `Arc`, `Arc>`, `&mut mpsc::Receiver`, -/// -/// *output* : () -/// -/// *initiator* : main thread -/// -/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader -/// -/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher` -/// -/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process -/// -// pub async fn run_daemons( -// proc: Arc, -// tx: Arc>, -// rx: &mut mpsc::Receiver, -// ) { -// // creating watchers + ---buffers--- -// let mut watchers: Vec = vec![]; -// for file in proc.dependencies.files.clone().into_iter() { -// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { -// watchers.push(watcher); -// } else { -// let _ = tx.send(121).await; -// } -// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); -// } -// let watchers_clone: Arc>> = -// Arc::new(tokio::sync::Mutex::new(watchers)); - -// loop { -// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); -// tokio::select! { -// _ = run_hand => continue, -// _val = rx.recv() => { -// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { -// return; -// } -// }, -// } -// tokio::task::yield_now().await; -// } -// } - -async fn process_protocol_symbol(proc: Arc, val: u8) -> Result<(), CustomError>{ - match val { - // 1 - File-dependency handling error -> terminating (after waiting) - 1 => { - if is_active(&proc.name).await { - error!("File-dependency handling error: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(500)).await; - } - // return; - }, - // 2 - File-dependency handling error -> holding (after waiting) - 2 => { - if !is_frozen(&proc.name).await { - error!("File-dependency handling error: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - }, - // 3 - Running process error - 3 => { - error!("Error due to starting {} process", &proc.name); - return Err(CustomError::Fatal) - }, - // 4 - Timeout of waiting service-dependency -> staying (after waiting) - 4 => { - // warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // 5 - Timeout of waiting service-dependency -> terminating (after waiting) - 5 => { - if is_active(&proc.name).await { - error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(500)).await; - } - }, - // 6 - Timeout of waiting service-dependency -> holding (after waiting) - 6 => { - // println!("holding {}-{}", proc.name, is_active(&proc.name).await); - if !is_frozen(&proc.name).await { - error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_secs(1)).await; - } - }, - // // 7 - File-dependency change -> terminating (after check) - 7 => { - error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - return Err(CustomError::Fatal) - }, - // // 8 - File-dependency change -> restarting (after check) - 8 => { - warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); - let _ = restart_process(&proc.name, &proc.path).await; - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // // 9 - File-dependency change -> staying (after check) - 9 => { - warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); - tokio::time::sleep(Duration::from_millis(100)).await; - }, - - // 10 - Process unfreaze call via file handler (or service handler) - 10 | 11 => { - if is_frozen(&proc.name).await { - warn!("Unfreezing process {} call...", &proc.name); - unfreeze_process(&proc.name).await; - } - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // 11 - Process unfreaze call via service handler - // 11 => { - // if is_frozen(&proc.name).await { - // warn!("Unfreezing process {} call...", &proc.name); - // unfreeze_process(&proc.name).await; - // } - // tokio::time::sleep(Duration::from_millis(100)).await; - // }, - // 101 - Impermissible trigger values in JSON - 101 => { - error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", &proc.name); - if is_active(&proc.name).await { - terminate_process(&proc.name).await; - } - return Err(CustomError::Fatal) - }, - // - // 121 - Cannot create valid watcher for file dependency - // todo : think about valid situation - 121 => { - error!("Cannot create valid watcher for file dependency. Terminating {} process...", &proc.name); - let _ = terminate_process(&proc.name).await; - return Err(CustomError::Fatal) - }, - // 111 - global thread termination with killing current child in a face - // of a current process - 111 => { - warn!("Terminating {}'s child processes...", &proc.name); - match is_active(&proc.name).await { - true => { - terminate_process(&proc.name).await; - }, - false => { - log::info!("Process {} is already terminated!", proc.name); - }, - } - }, - _ => {}, - } - Ok(()) -} -// check process status daemon -/// # Fn `run_daemons` -/// ## func to async exec subjobs of checking process, services and files states -/// -/// *input* : `Arc`, `Arc>`, `Arc>>` -/// -/// *output* : () -/// -/// *initiator* : fn `run_daemons` -/// -/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers -/// -/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` -/// -// pub async fn running_handler( -// prc: Arc, -// tx: Arc>, -// watchers: Arc>>, -// ) { -// // services and files check (once) -// let files_check = file_handler( -// &prc.name, -// &prc.dependencies.files, -// tx.clone(), -// watchers.clone(), -// ); -// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - -// let res = join!(files_check, services_check); -// // if inactive -> spawn checks -> active is true -// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { -// if start_process(&prc.name, &prc.path).await.is_err() { -// tx.send(3).await.unwrap(); -// return; -// } -// } -// // if frozen -> spawn checks -> unfreeze is true -// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { -// tx.send(10).await.unwrap(); -// return; -// } -// // tokio::time::sleep(Duration::from_millis(100)).await; -// tokio::task::yield_now().await; -// } - // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' /// # Fn `get_container_id` /// ## for getting container id used in logs @@ -433,7 +196,7 @@ async fn process_protocol_symbol(proc: Arc, val: u8) -> Result< /// *depends on* : - /// pub fn get_container_id() -> Option { - match Command::new(GET_ID_CMD).output() { + match Command::new(*GET_ID_CMD).output() { Ok(output) => { if !output.status.success() { return None; From b456c7aa18aaf35b8fa8c775b1e213e2e49bef5e Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:25:51 +0300 Subject: [PATCH 25/31] ref + reset lint warnigs --- noxis-rs/src/utils/prcs.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 4747204..01e292e 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -29,15 +29,7 @@ pub mod v2 { fn new() -> Self { Pid(0) } - fn new_from_output(pid: Option) -> Self { - let temp = { - match pid { - Some(pid) => String::from_utf8_lossy(pid.stdout.trim_ascii()).into_owned(), - None => return Pid(0), - } - }; - Pid(temp.parse::().unwrap_or_else(|_| { 0 })) - } + #[allow(unused)] pub fn new_sysinfo_pid(&self) -> sysinfo::Pid { sysinfo::Pid::from_u32(self.0 as u32) } @@ -77,6 +69,7 @@ pub mod v2 { self.bin = bin.as_ref().to_string_lossy().into_owned(); self } + #[allow(unused)] pub fn get_pid(&self) -> Pid { self.pid } @@ -158,28 +151,33 @@ pub mod v2 { } tokio::time::sleep(Duration::from_micros(100)).await; } + #[allow(unused)] pub async fn stop_by_user_call(&mut self) -> anyhow::Result<()> { terminate_process(&self.name).await?; self.state = ProcessState::StoppedByCli; self.pid = Pid::new(); Ok(()) } + #[allow(unused)] pub async fn freeze_by_user_call(&mut self) -> anyhow::Result<()> { freeze_process(&self.name).await?; self.state = ProcessState::HoldingByCli; Ok(()) } + #[allow(unused)] pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> { let pid = start_process(&self.name, &self.bin).await?; self.state = ProcessState::Pending; self.pid = Pid(pid); Ok(()) } + #[allow(unused)] pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> { unfreeze_process(&self.name).await?; self.state = ProcessState::Pending; Ok(()) } + #[allow(unused)] pub async fn restart_by_user_call(&mut self) -> anyhow::Result<()> { let pid = restart_process(&self.name, &self.bin).await?; self.pid = Pid(pid); From 1c651c93cedaf1ee8e35b5d630932fe07002c6b9 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:33:21 +0300 Subject: [PATCH 26/31] ref files --- noxis-rs/src/utils/files.rs | 127 +++--------------------------------- 1 file changed, 9 insertions(+), 118 deletions(-) diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index 4d022b6..ad3b713 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -1,16 +1,12 @@ - use crate::options::structs::{CustomError, Files}; - use super::prcs::{is_active, is_frozen}; - use inotify::{EventMask, Inotify, WatchMask}; - use std::borrow::BorrowMut; - use std::path::Path; - use std::sync::Arc; - use tokio::sync::mpsc; - use tokio::sync::mpsc::Sender as Sender; - use tokio::time::Duration; - use crate::options::structs::Events; - use async_trait::async_trait; +use crate::options::structs::CustomError; +use inotify::{EventMask, Inotify, WatchMask}; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::mpsc::Sender as Sender; +use crate::options::structs::Events; +use async_trait::async_trait; - pub mod v2 { +pub mod v2 { use log::{error, info, warn}; use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; use super::*; @@ -92,14 +88,10 @@ #[async_trait] impl ProcessUnit for FilesController { async fn process(&mut self) { - // polling file check - // 1) existing check - // dbg!(&self); if let Ok(_) = check_file(&self.name, &self.path).await { if let FileState::NotFound = self.state { info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name); self.state = FileState::Ok; - // reseting negative outcome in prc self.trigger_on(None).await; } match &mut self.watcher { @@ -112,7 +104,6 @@ b || mask.mask == EventMask::MODIFY, ) }); - if let (recreate_watcher, true) = (need_to_recreate, was_modifired) { warn!("File {} ({}) was changed", self.name, &self.path); if recreate_watcher { @@ -144,10 +135,9 @@ return; } self.trigger_on(None).await; - // 2) change check } } - } +} /// # Fn `create_watcher` /// ## for creating watcher on file's delete | update events @@ -169,105 +159,6 @@ Ok(inotify) } - /// # Fn `create_watcher` - /// ## for managing processes by checking dep files' states - /// - /// *input* : `&str`, `&[Files]`, `Arc>`, `Arc>>` - /// - /// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check - /// - /// *initiator* : fn `utils::running_handler` - /// - /// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc>`, mut list of file watchers`Arc>>` - /// - /// *depends on* : Files - /// - pub async fn file_handler( - name: &str, - files: &[Files], - tx: Arc>, - watchers: Arc>>, - ) -> anyhow::Result<()> { - for (i, file) in files.iter().enumerate() { - // let src = format!("{}{}", file.src, file.filename); - if check_file(&file.filename, &file.src).await.is_err() { - if !is_active(name).await || is_frozen(name).await { - return Err(anyhow::Error::msg("Process is frozen or stopped")); - } - match file.triggers.on_delete.as_str() { - "stay" => { - tx.send(9).await.unwrap(); - continue; - } - "stop" => { - if is_active(name).await { - tx.send(1).await.unwrap(); - } - return Err(anyhow::Error::msg("Process was stopped")); - } - "hold" => { - if is_active(name).await { - tx.send(2).await.unwrap(); - return Err(anyhow::Error::msg("Process was frozen")); - } - } - _ => { - tokio::time::sleep(Duration::from_millis(50)).await; - tx.send(101).await.unwrap(); - return Err(anyhow::Error::msg("Impermissible character or word in file trigger")); - } - } - } else if is_active(name).await && !is_frozen(name).await { - let watchers = watchers.clone(); - // println!("mutex: {:?}", watchers); - let mut buffer = [0; 128]; - let mut mutex_guard = watchers.lock().await; - if let Some(notify) = mutex_guard.get_mut(i) { - let events = notify.read_events(&mut buffer); - // println!("{:?}", events); - if events.is_ok() { - let events: Vec = events - .unwrap() - .map(|mask| mask.mask) - .filter(|mask| { - *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF - }) - .collect(); - for event in events { - if let EventMask::DELETE_SELF = event { - // ! warning (DELETE_SELF event) ! - // println!("! warning (DELETE_SELF event) !"); - // * watcher recreation after dealing with file recreation mechanism in text editors - let mutex = notify.borrow_mut(); - - // *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); - if let Ok(watcher) = create_watcher(&file.filename, &file.src) { - *mutex = watcher; - } - } - match file.triggers.on_change.as_str() { - "stop" => { - let _ = tx.send(7).await; - } - "restart" => { - let _ = tx.send(8).await; - } - "stay" => { - let _ = tx.send(9).await; - } - _ => { - let _ = tx.send(101).await; - } - } - } - } - } - } - } - tokio::task::yield_now().await; - Ok(()) - } - /// # Fn `check_file` /// ## for checking existance of current file /// From e0720a5f5406cf1de2e94f7adc475a66c0d26ba5 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:35:31 +0300 Subject: [PATCH 27/31] services ref --- noxis-rs/src/utils/services.rs | 37 +++++++++------------------------- 1 file changed, 10 insertions(+), 27 deletions(-) diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index cdee768..0d90921 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -1,6 +1,5 @@ -use crate::options::structs::CustomError; use log::{error, warn}; -use std::net::{TcpStream, ToSocketAddrs}; +use std::net::ToSocketAddrs; use std::sync::Arc; use tokio::time::Duration; use tokio::sync::mpsc::Sender as Sender; @@ -236,31 +235,15 @@ pub mod v2 { } } - -async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { - let addr = format!("{}:{}", hostname, port); - - match addr.to_socket_addrs() { - Ok(mut addrs) => { - if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { - Ok(()) - } else { - Err(CustomError::Fatal) - } - } - Err(_) => Err(CustomError::Fatal), - } -} - #[cfg(test)] mod service_unittests { - use super::check_service; - #[tokio::test] - async fn check_available_service() { - assert!(check_service("ya.ru", &443).await.is_ok()); - } - #[tokio::test] - async fn check_unavailable_service() { - assert!(check_service("unavailable.service", &1111).await.is_err()); - } + // use super::check_service; + // #[tokio::test] + // async fn check_available_service() { + // assert!(check_service("ya.ru", &443).await.is_ok()); + // } + // #[tokio::test] + // async fn check_unavailable_service() { + // assert!(check_service("unavailable.service", &1111).await.is_err()); + // } } From 5f63459e4f069d1552e3b13dfe1731d89dadcaaf Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:37:03 +0300 Subject: [PATCH 28/31] config ref --- noxis-rs/src/options/config.rs | 343 --------------------------------- 1 file changed, 343 deletions(-) diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 3a461e1..54bafce 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -414,259 +414,6 @@ fn load_processes(json_filename: &str) -> Option { None } -/// # Fn `get_actual_config` -/// ## for getting actual Monitor's config from local and remote storages -/// -/// *input* : - -/// -/// *output* : `None` on fatal error in mechanisms | `Some(conf)` on finish reading and parsing -/// -/// *initiator* : main thread -/// -/// *managing* : - -/// -/// *depends on* : struct `Processes` -/// -pub async fn get_actual_config(params : Arc) -> Option { - // * if no local conf -> loop and +inf getting conf from redis server - // * if local conf -> once getting conf from redis server - let config_path = params.config.to_str().unwrap_or_else(|| { - error!("Invalid character in config file. Config path was set to default"); - "settings.json" - }); - info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url); - match load_processes(config_path) { - Some(local_conf) => { - info!( - "Found local configuration, version - {}", - &local_conf.date_of_creation - ); - if !params.no_sub { - if let Some(remote_conf) = - // TODO : rework with pubsub mech - once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url)) - { - return match config_comparing(&local_conf, &remote_conf) { - ConfigActuality::Local => { - info!("Local config is actual"); - Some(local_conf) - } - ConfigActuality::Remote => { - info!("Pulled config is more actual. Saving changes!"); - if save_new_config(&remote_conf, config_path).is_err() { - error!("Saving changes process failed due to unexpected error...") - } - Some(remote_conf) - } - }; - } - } - Some(local_conf) - } - None => { - warn!("No local valid conf was found. Trying to pull remote one..."); - if !params.no_sub { - let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url))); - if let Some(conf) = get_remote_conf_watcher(&mut conn).await { - info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); - let _ = save_new_config(&conf, config_path); - return Some(conf); - } - } - None - } - } -} - -/// # Fn `get_remote_conf_watcher` -/// ## for infinitive pulling remote config -/// -/// *input* : `&mut Connection` -/// -/// *output* : `None` on fatal error | `Some(conf)` on succesfull pulling -/// -/// *initiator* : fn `get_actual_config` -/// -/// *managing* : mut ref `Connection` object -/// -/// *depends on* : struct `Processes` -/// -async fn get_remote_conf_watcher(conn : &mut Connection) -> Option { - let mut conn = conn.as_pubsub(); - let cont = crate::utils::get_container_id(); - loop { - match cont { - Some(ref cont) => { - let cont = cont.trim(); - if conn.subscribe(cont).is_err() { - // todo : delay - continue; - } - match conn.get_message() { - Ok(msg) => { - let msg: Result = msg.get_payload(); - if let Ok(payload) = msg { - if let Some(remote) = parse_extern_config(&payload) { - return Some(remote) - } - else { - error!("Pulled invalid config, cannot start. Waiting for remote conf..."); - } - } else { - error!("Cannot get Redis message payload. Waiting for remote conf..."); - } - // todo : delay - continue; - }, - Err(_) => { - // todo : delay - continue; - }, - } - }, - None => { - error!("Cannot get container id. Returning"); - break - }, - } - } - None -} - -/// # Fn `get_remote_conf_watcher` -/// ## for trying to pull remote config -/// -/// > only for situation when local isn't None (no need to fck redis server) -/// -/// *input* : `&str` -/// -/// *output* : `None` on empty pubsub or error | `Some(conf)` on succesfull pulling -/// -/// *initiator* : fn `get_actual_config` -/// -/// *managing* : &str of Redis Server credentials -/// -/// *depends on* : struct `Processes` -/// -fn once_get_remote_configuration(serv_info: &str) -> Option { - let cont = crate::utils::get_container_id(); - match Client::open(serv_info) { - Ok(client) => { - match client.get_connection() { - Ok(mut conn) => { - let mut conn = conn.as_pubsub(); - match conn.subscribe(cont) { - Ok(_) => { - if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() { - error!("Cannot set reading pubsub timeout and pull remote config"); - return None; - } - match conn.get_message() { - Ok(msg) => { - info!("Pulled config from Redis Server"); - let get_payload: Result = msg.get_payload(); - match get_payload { - Ok(payload) => { - let remote = parse_extern_config(&payload); - if remote.is_none() { - error!("Pulled config is invalid. Check it in Redis Server"); - } - remote - }, - Err(_) => { - error!("Cannot extract payload from new message. Check Redis Server state"); - None - }, - } - }, - Err(_) => { - None - }, - } - }, - Err(_) => { - error!("Redis subscription process failed. Check Redis configuration!"); - None - } - } - } - Err(_) => { - error!("Redis connection attempt is failed. Check Redis configuration!"); - None - } - } - } - Err(_) => { - error!("Redis-Client opening attempt is failed. Check network configuration!"); - None - } - } -} - -// ! watchers - -/// # Fn `open_watcher` -/// ## for infinitive opening Redis client -/// -/// > only for situation when local isn't None (no need to fck redis server) -/// -/// *input* : `Option` -/// -/// *output* : redis::Client on successful opening client -/// -/// *initiator* : fn `get_actual_config` -/// -/// *managing* : &str of Redis Server credentials -/// -/// *depends on* : struct `redis::Client` -/// -fn open_watcher(serv_info: &str) -> Client { - loop { - match Client::open(serv_info) { - Ok(redis) => { - info!("Successfully opened Redis-Client"); - return redis; - } - Err(_) => { - error!("Redis-Client opening attempt is failed. Check network configuration! Retrying..."); - std::thread::sleep(Duration::from_secs(4)); - } - } - } -} - -/// # Fn `get_connection_watcher` -/// ## for infinitive establishing Redis connection on existing client -/// -/// > only for situation when local isn't None (no need to fck redis server) -/// -/// *input* : `&Client` -/// -/// *output* : `Connection` -/// -/// *initiator* : fn `get_actual_config` -/// -/// *managing* : &Client for opening connection -/// -/// *depends on* : struct `redis::Connection` -/// -fn get_connection_watcher(client: &Client) -> Connection { - loop { - match client.get_connection() { - Ok(conn) => { - info!("Successfully got Redis connection object"); - return conn; - } - Err(_) => { - error!( - "Redis connection attempt is failed. Check Redis configuration! Retrying..." - ); - std::thread::sleep(Duration::from_secs(4)); - } - } - } -} - /// # Fn `restart_main_thread` /// ## for restart monitor with new config /// @@ -686,83 +433,6 @@ fn restart_main_thread() -> std::io::Result<()> { Ok(()) } -/// # Fn `subscribe_config_stream` -/// ## for subscribe on changes, pulling to Redis pubsub to get more actual config -/// -/// *input* : `Arc` -/// -/// *output* : `Ok(())` on end of work | `Err(er)` on error with subscribing mechanism -/// -/// *initiator* : fn `subscribe_config_stream` -/// -/// *managing* : `Arc` to compare old config with new pulled -/// -/// *depends on* : `Processes` -/// -pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc) -> Result<(), CustomError> { - let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); - - if params.no_sub { - return Err(CustomError::Fatal); - } - if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { - if let Ok(mut conn) = client.get_connection() { - match crate::utils::get_container_id() { - Some(channel_name) => { - let channel_name = channel_name.trim(); - let mut pubsub = conn.as_pubsub(); - if pubsub.subscribe(&channel_name).is_ok() { - info!("Runner subscribed on config update publishing in channel {}", &channel_name); - loop { - if let Ok(msg) = pubsub.get_message() { - let get_remote_config: Result = msg.get_payload(); - match get_remote_config { - Ok(payload) => { - if let Some(remote_config) = parse_extern_config(&payload) { - match config_comparing(&actual_prcs, &remote_config) { - ConfigActuality::Remote => { - warn!("Pulled config is actual. Saving and restarting..."); - if save_new_config(&remote_config, config_path).is_err() { - error!("Error with saving new config to {}. Stopping sub mechanism...", config_path); - return Err(CustomError::Fatal); - } - if restart_main_thread().is_err() { - error!("Error with restarting Runner. Stopping sub mechanism..."); - return Err(CustomError::Fatal); - } - } - _ => { - warn!("Pulled new config. Current config is more actual ..."); - continue - }, - } - } - else { - error!("Invalid conig was pulled"); - } - }, - Err(_) => { - error!("Cannot extract new config from message"); - break; - }, - } - } - sleep(Duration::from_secs(30)).await; - } - } else { - error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name); - } - }, - None => { - error!("Cannot get channel name"); - } - } - } - } - error!("Error with subscribing Redis stream on update. Working only with selected config..."); - Err(CustomError::Fatal) -} - /// # Fn `config_comparing` /// ## for compare old and new configs /// @@ -789,14 +459,6 @@ pub fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActualit } } -// ! TEMPORARILY DEPRECATED ! -// fn native_date_from_millis(mls: &str) -> Option> { -// match mls.parse::(){ -// Ok(val) => return chrono::DateTime::from_timestamp_millis(val), -// Err(_) => return None, -// } -// } - /// # Fn `save_new_config` /// ## mechanism for saving new config in local storage /// @@ -885,11 +547,6 @@ mod config_unittests { assert_eq!(config_comparing(&a, &b), ConfigActuality::Remote); } - // TODO : strange output - // #[test] - // fn get_actual_config_mechanism() { - // assert!(get_actual_config().is_some()) - // } #[test] fn save_config() { let a = Processes { From 0d9020d3bf565c6c4e8865bc1ffdc935a4763872 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:37:21 +0300 Subject: [PATCH 29/31] config ref --- noxis-rs/src/options/cli_pipeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index a976a19..199a9ba 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -1,4 +1,4 @@ -use log::{error, info, warn}; +use log::{error, info}; use tokio::net::{ UnixStream, UnixListener }; use tokio::sync::{Mutex, OnceCell}; use tokio::time::{sleep, Duration}; From 9133ce5de7bc601ac0e09f67d852eb8d086e38b0 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:38:39 +0300 Subject: [PATCH 30/31] signals ref --- noxis-rs/src/options/signals.rs | 1 - noxis-rs/src/utils/metrics.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/noxis-rs/src/options/signals.rs b/noxis-rs/src/options/signals.rs index f840510..2a45850 100644 --- a/noxis-rs/src/options/signals.rs +++ b/noxis-rs/src/options/signals.rs @@ -1,4 +1,3 @@ -use super::structs::CustomError; use std::sync::Arc; use tokio::io; use tokio::sync::mpsc; diff --git a/noxis-rs/src/utils/metrics.rs b/noxis-rs/src/utils/metrics.rs index 47b733f..aa7774d 100644 --- a/noxis-rs/src/utils/metrics.rs +++ b/noxis-rs/src/utils/metrics.rs @@ -2,7 +2,7 @@ // cpu load, ram/rom load and net activity // use std::sync::Mutex; -use std::{collections::BTreeMap, net, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use crate::options::structs::{ProcessState, TrackingProcess}; use sysinfo::{System, Disks as DisksList, Networks}; use crate::options::structs::Dependencies; From 5a5b7dc3158b3f2a2a3c485a6750e2f5ef1c3008 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 29 May 2025 17:41:42 +0300 Subject: [PATCH 31/31] prcs bug with leaking process fixed) --- noxis-rs/src/utils/prcs.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 01e292e..dfb6c7e 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -520,6 +520,7 @@ mod process_unittests { let res1 = start_process("freeze-check", "./tests/examples/freeze-check").await; assert!(res1.is_ok()); assert!(!is_frozen("freeze-check").await); + let _ = terminate_process("freeze-check").await; } #[tokio::test] async fn pidof_active_process() {