From e5437fb877586614a9106e6a2eaf745942b5230a Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 13:04:09 +0300 Subject: [PATCH 01/10] docs: structs --- src/options/structs.rs | 153 +++++++++++++++++++++++++++++++++++++---- src/utils/metrics.rs | 29 ++++---- 2 files changed, 156 insertions(+), 26 deletions(-) diff --git a/src/options/structs.rs b/src/options/structs.rs index d0ea23d..8e3ac3f 100644 --- a/src/options/structs.rs +++ b/src/options/structs.rs @@ -11,8 +11,20 @@ pub enum ConfigActuality { Remote, } -/// # struct for the 1st level in json conf file +/// # Struct for the 1st level in json conf file +/// ## for storing main config data +/// /// > (needed in serialization and deserialization) +/// +/// *depends on* : `TrackingProcess` +/// +/// ``` +/// { +/// -> "dateOfCreation": "1721381809104", +/// -> "configServer": "localhost", +/// -> "processes": [ +/// { ... +/// ``` #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Processes { // #[serde(rename="id")] @@ -25,8 +37,24 @@ pub struct Processes { pub processes: Vec, } -/// # struct for each process to contain info, such as name, path and dependencies +/// # Struct for the 2nd level in json conf file +/// ## for each process to contain info, such as name, path and dependencies +/// /// > (needed in serialization and deserialization) +/// +/// *depends on* : `Dependencies` +/// +/// ``` +/// ... +/// "processes": [ +/// -> { +/// -> "name": "temp-process", +/// -> "path": "/home/user/monitor/runner-rs/temp-process", +/// -> "dependencies": { ... } +/// -> }, ... +/// ] +/// ... +/// ``` #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TrackingProcess { pub name: String, @@ -34,8 +62,22 @@ pub struct TrackingProcess { pub dependencies: Dependencies, } -/// # struct for processes' dependencies including files and services +/// # Struct for the 3d level in json conf file +/// ## for processes' dependencies including files and services +/// /// > (needed in serialization and deserialization) +/// +/// *depends on* : `Files`, `Services` +/// +/// ``` +/// ... +/// "path": "/home/user/monitor/runner-rs/temp-process", +/// -> "dependencies": { +/// -> "files": [ ... ], +/// -> "services": [ ... ] +/// -> } +/// ... +/// ``` #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Dependencies { #[serde(default)] @@ -44,17 +86,49 @@ pub struct Dependencies { pub services: Vec, } -/// # struct for containing file object with its triggers to manipulate in daemons +/// # Struct for the 4th level in json conf file +/// ## for containing file object with its triggers to manipulate in daemons +/// /// > (needed in serialization and deserialization) +/// +/// *depends on* : `FileTriggers` +/// +/// ``` +/// ... +/// "files": [ +/// -> { +/// -> "filename": "dep-file", +/// -> "src": "/home/user/monitor/runner-rs/tests/examples/", +/// -> "triggers": { ... } +/// -> } , +/// ... +/// ], ... +/// ``` #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Files { pub filename: String, pub src: String, - pub triggers: FIleTriggers, + pub triggers: FileTriggers, } -/// # struct for containing service object with its triggers to manipulate in daemons +/// # Struct for the 4th level in json conf file +/// ## for containing service object with its triggers to manipulate in daemons +/// /// > (needed in serialization and deserialization) +/// +/// *depends on* : `ServiceTriggers` +/// +/// ``` +/// ... +/// "services": [ +/// -> { +/// -> "hostname" : "ya.ru", +/// -> "port" : 443, +/// -> "triggers": { ... } +/// -> } , +/// ... +/// ], ... +/// ``` #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Services { pub hostname: String, @@ -62,8 +136,23 @@ pub struct Services { pub triggers: ServiceTriggers, } -/// # struct for instancing each service's policies such as on lost or time to wait till reachable +/// # Struct for the 5th level in json conf file +/// ## for instancing each service's policies such as on lost or time to wait till reachable +/// /// > (needed in serialization and deserialization) +/// +/// *depends on* : - +/// +/// ``` +/// ... +/// "port": 443, +/// -> "triggers": { +/// -> "wait": 10, +/// -> "delay": 2, +/// -> "onLost": "hold" +/// -> } +/// ... +/// ``` #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ServiceTriggers { pub wait: u32, @@ -72,24 +161,44 @@ pub struct ServiceTriggers { pub on_lost: String, } -/// # struct for instancing each file's policies such as on-delete or onupdate events +/// # Struct for the 5th level in json conf file +/// ## for instancing each file's policies such as on-delete or onupdate events +/// /// > (needed in serialization and deserialization) +/// +/// *depends on* : - +/// +/// ``` +/// ... +/// "src": "/home/user/monitor/runner-rs/tests/examples/", +/// -> "triggers": { +/// -> "onDelete": "stop", +/// -> "onChange": "stay" +/// -> } +/// ... +/// ``` #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct FIleTriggers { +pub struct FileTriggers { #[serde(rename = "onDelete")] pub on_delete: String, #[serde(rename = "onChange")] pub on_change: String, } -/// -/// +/// # Metrics struct +/// ## for gathering all system metrics (from container + each process) +/// +/// > (needed in hagent communication, `?...?`) +/// +/// *depends on* : `ContainerMetrics`, `ProcessMetrics` +/// #[derive(Debug, Clone, Serialize,)] pub struct Metrics { pub container_metrics : ContainerMetrics, pub processes_metrics : Vec, // pub net_metrics : Vec, } +/// ## Metrics struct's constructor impl Metrics { pub fn new(cm: ContainerMetrics, prm: Vec) -> Self { Metrics { @@ -101,7 +210,12 @@ impl Metrics { } +/// # Container metrics struct +/// ## for gathering all container metrics /// +/// > (needed in gathering metrics) +/// +/// *depends on* : - /// #[derive(Debug, Clone, Serialize)] pub struct ContainerMetrics { @@ -111,7 +225,7 @@ pub struct ContainerMetrics { // pub net_activity : ??? processes : Vec, } - +/// ## Container struct's constructor impl ContainerMetrics { pub fn new(container_id : &str, cpu: f32, ram: f32, subsystems: Vec,) -> Self{ ContainerMetrics { @@ -123,7 +237,12 @@ impl ContainerMetrics { } } +/// # Process metrics struct +/// ## for gathering each process's all metrics /// +/// > (needed in gathering metrics) +/// +/// *depends on* : - /// #[derive(Debug, Clone, Serialize)] pub struct ProcessMetrics { @@ -131,7 +250,7 @@ pub struct ProcessMetrics { cpu_load : f32, ram_load : f32, } - +/// ## Process struct's constructor impl ProcessMetrics { pub fn new(process_name :&str, cpu: f32, ram: f32) -> Self { ProcessMetrics { @@ -142,6 +261,13 @@ impl ProcessMetrics { } } +/// # Packet info struct +/// ## for gathering info about container's net activity +/// +/// > (needed in gathering metrics) +/// +/// *depends on* : - +/// #[derive(Debug, Clone, Serialize)] pub struct PacketInfo { protocol : String, @@ -149,6 +275,7 @@ pub struct PacketInfo { src_ip : Ipv4Addr, size : usize, } +/// ## PacketInfo's constructor impl PacketInfo { pub fn new(prt: String, dest: Ipv4Addr, src: Ipv4Addr, size_of_packet: usize) -> Self { PacketInfo { diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index 7e8e663..9b736d4 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -1,29 +1,32 @@ // submodule needed to get metrics such as // cpu load, ram/rom load and net activity -use std::sync::Mutex; +// use std::sync::Mutex; use std::sync::Arc; use crate::options::structs::TrackingProcess; use sysinfo::{Process, System}; use tokio::join; -use crate::options::structs::{ProcessMetrics, ContainerMetrics, PacketInfo}; +use crate::options::structs::{ProcessMetrics, ContainerMetrics}; use crate::utils::get_container_id; // use pcap::{Device, Capture, Active}; // use std::net::Ipv4Addr; // use anyhow::{Result, Ok}; -type PacketBuffer = Arc>>; +// type PacketBuffer = Arc>>; -// init_metrics_grubber - fn for initializing -// loop for unstoppable grubbing metrics. -// -// input : vec of processes -// output : Err if it cant create grubbers | Ok on finish -// initiator : main thread -// managing : object of unix-socket reader -// depends on : network activity -// !! FOR PROCESS !! -// pub async fn init_metrics_grubber(prcs: Arc>) { +/// # Fn init_metrics_grubber +/// ## for initializing process of unstoppable grubbing metrics. +/// +/// `input` : vec of processes +/// +/// `output` : Err if it cant create grubbers | Ok on finish +/// +/// `initiator` : main thread +/// +/// `managing` : object of unix-socket reader +/// +/// `depends on` : network activity +/// pub async fn init_metrics_grubber() { let mut system = System::new(); // let mut buffer: Vec = vec![]; From a03fbc41b09b62a278cea3026dc078c6a92a0baf Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 13:24:21 +0300 Subject: [PATCH 02/10] docs: signals --- src/options/signals.rs | 42 ++++++++++++++++++++++++++++++++++++++++++ src/utils/metrics.rs | 10 +++++----- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/src/options/signals.rs b/src/options/signals.rs index 7cdc614..c107c74 100644 --- a/src/options/signals.rs +++ b/src/options/signals.rs @@ -9,6 +9,19 @@ use tokio::{ type SendersVec = Arc>>>; +/// # Fn set_valid_destructor +/// ## for initializing process of unstoppable grubbing metrics. +/// +/// *input* : `Result<()>` +/// +/// *output* : `Err` if it cant create signals listeners | `Ok` on returning Monitor +/// +/// *initiator* : main thread +/// +/// *managing* : `Arc>>>` +/// +/// *depends on* : Sig, Signals +/// pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> { let (mut int, mut term, mut stop) = ( Sig::new(Signals::Sigint, senders.clone()), @@ -23,16 +36,30 @@ pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError } Ok(()) } +/// # Enum Signals +/// ## for instancing each managed system signals (such as SIGINT) +/// +/// > (element needed in Sig constructor's signature) +/// +/// *depends on* : - enum Signals { Sigint, Sigterm, Sigstop, } + +/// # Struct Signals +/// ## for instancing each managed system signals (such as SIGINT) +/// +/// > (needed to construct system signals listener) +/// +/// *depends on* : Signals struct Sig { signal: Signal, sig_type: Signals, senders: SendersVec, } +/// ## default Sig's constructor impl Sig { fn new(signal_type: Signals, sends: SendersVec) -> Self { Sig { @@ -42,6 +69,9 @@ impl Sig { } } } +/// ## trait Display realization for returning String-name of signal +/// +/// > (needed in logs) impl std::fmt::Display for Signals { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { @@ -51,6 +81,7 @@ impl std::fmt::Display for Signals { } } } +/// ## associated func to init signals listener impl Signals { fn get_signal(&self) -> io::Result { match self { @@ -60,9 +91,20 @@ impl Signals { } } } +/// # Trait SigPostProcessing +/// ## to handle post-processing jobs after getting system signal +/// +/// ## > (needed in signals post-processing) +/// trait SigPostProcessing { async fn post_processing(&mut self) -> io::Result<()>; } + +/// # Trait SigPostProcessing realization for Sig struct +/// ## to deinitialize Monitor correctly after getting signal +/// +/// ## > (needed in signals post-processing) +/// impl SigPostProcessing for Sig { async fn post_processing(&mut self) -> io::Result<()> { // manipulations ... diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index 9b736d4..65a2e7b 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -17,15 +17,15 @@ use crate::utils::get_container_id; /// # Fn init_metrics_grubber /// ## for initializing process of unstoppable grubbing metrics. /// -/// `input` : vec of processes +/// *input* : `Result<()>` /// -/// `output` : Err if it cant create grubbers | Ok on finish +/// *output* : `Err` if it cant create grubbers | `Ok` on finish /// -/// `initiator` : main thread +/// *initiator* : main thread /// -/// `managing` : object of unix-socket reader +/// *managing* : object of unix-socket reader /// -/// `depends on` : network activity +/// *depends on* : network activity /// pub async fn init_metrics_grubber() { let mut system = System::new(); From b07a0e1212ac4b65a6de4ca967c28017398c3192 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 13:25:39 +0300 Subject: [PATCH 03/10] docs: logger --- src/options/logger.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/options/logger.rs b/src/options/logger.rs index ce44420..47ba570 100644 --- a/src/options/logger.rs +++ b/src/options/logger.rs @@ -13,6 +13,19 @@ use crate::utils::get_container_id; // file_writer: BufWriter, // } +/// # Fn `setup_logger` +/// ## for initializing process of unstoppable grubbing metrics. +/// +/// *input* : `Result<()>` +/// +/// *output* : `Err` if it cant create logger | `Ok` after logger initialing +/// +/// *initiator* : main thread +/// +/// *managing* : - +/// +/// *depends on* : - +/// pub fn setup_logger() -> Result<(), crate::options::structs::CustomError> { // if Command::new("sh").args(["-c", "mkdir logs"]).output().is_err() { // println!("Error: Cannot init logs directory"); From 08ca2483e1a3ad11ae05fd313002d88f04caf85f Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 14:11:04 +0300 Subject: [PATCH 04/10] docs: config --- src/options/config.rs | 154 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 3 deletions(-) diff --git a/src/options/config.rs b/src/options/config.rs index 7643fdd..5f865ea 100644 --- a/src/options/config.rs +++ b/src/options/config.rs @@ -11,7 +11,19 @@ use tokio::time::Duration; const CONFIG_PATH: &str = "settings.json"; -// 4ever sync +/// # Fn `load_processes` +/// ## for reading and parsing *local* storing config +/// +/// *input* : `&str` +/// +/// *output* : `None` if local conf file doesn't exist or invalid | `Some(conf)` on finish reading and parsing +/// +/// *initiator* : func `get_actual_config` +/// +/// *managing* : conf file name in `&str` format +/// +/// *depends on* : struct `Processes` +/// fn load_processes(json_filename: &str) -> Option { if let Ok(res) = fs::read_to_string(json_filename) { if let Ok(conf) = serde_json::from_str::(&res) { @@ -21,6 +33,19 @@ fn load_processes(json_filename: &str) -> Option { None } +/// # Fn `get_actual_config` +/// ## for getting actual Monitor's config from local and remote storages +/// +/// *input* : - +/// +/// *output* : `None` on fatal error in mechanisms | `Some(conf)` on finish reading and parsing +/// +/// *initiator* : main thread +/// +/// *managing* : - +/// +/// *depends on* : struct `Processes` +/// pub async fn get_actual_config() -> Option { // * if no local conf -> loop and +inf getting conf from redis server // * if local conf -> once getting conf from redis server @@ -64,6 +89,19 @@ pub async fn get_actual_config() -> Option { } } +/// # Fn `get_remote_conf_watcher` +/// ## for infinitive pulling remote config +/// +/// *input* : `&mut Connection` +/// +/// *output* : `None` on fatal error | `Some(conf)` on succesfull pulling +/// +/// *initiator* : fn `get_actual_config` +/// +/// *managing* : mut ref `Connection` object +/// +/// *depends on* : struct `Processes` +/// async fn get_remote_conf_watcher(conn : &mut Connection) -> Option { let mut conn = conn.as_pubsub(); let cont = crate::utils::get_container_id(); @@ -105,8 +143,22 @@ async fn get_remote_conf_watcher(conn : &mut Connection) -> Option { } None } -// ! once iter exec -// ! only for situation when local isn't None (no need to fck redis server) + +/// # Fn `get_remote_conf_watcher` +/// ## for trying to pull remote config +/// +/// > only for situation when local isn't None (no need to fck redis server) +/// +/// *input* : `&str` +/// +/// *output* : `None` on empty pubsub or error | `Some(conf)` on succesfull pulling +/// +/// *initiator* : fn `get_actual_config` +/// +/// *managing* : &str of Redis Server credentials +/// +/// *depends on* : struct `Processes` +/// fn once_get_remote_configuration(serv_info: &str) -> Option { let cont = crate::utils::get_container_id(); match Client::open(serv_info) { @@ -165,6 +217,21 @@ fn once_get_remote_configuration(serv_info: &str) -> Option { // ! watchers +/// # Fn `open_watcher` +/// ## for infinitive opening Redis client +/// +/// > only for situation when local isn't None (no need to fck redis server) +/// +/// *input* : `Option` +/// +/// *output* : redis::Client on successful opening client +/// +/// *initiator* : fn `get_actual_config` +/// +/// *managing* : &str of Redis Server credentials +/// +/// *depends on* : struct `redis::Client` +/// fn open_watcher(serv_info: &str) -> Client { loop { match Client::open(serv_info) { @@ -180,6 +247,21 @@ fn open_watcher(serv_info: &str) -> Client { } } +/// # Fn `get_connection_watcher` +/// ## for infinitive establishing Redis connection on existing client +/// +/// > only for situation when local isn't None (no need to fck redis server) +/// +/// *input* : `&Client` +/// +/// *output* : `Connection` +/// +/// *initiator* : fn `get_actual_config` +/// +/// *managing* : &Client for opening connection +/// +/// *depends on* : struct `redis::Connection` +/// fn get_connection_watcher(client: &Client) -> Connection { loop { match client.get_connection() { @@ -197,11 +279,38 @@ fn get_connection_watcher(client: &Client) -> Connection { } } +/// # Fn `restart_main_thread` +/// ## for restart monitor with new config +/// +/// *input* : - +/// +/// *output* : `Ok(())` on valid restart | `Err(er)` on error +/// +/// *initiator* : fn `subscribe_config_stream` +/// +/// *managing* : - +/// +/// *depends on* : - +/// fn restart_main_thread() -> std::io::Result<()> { let current_exe = env::current_exe()?; Command::new(current_exe).exec(); Ok(()) } + +/// # Fn `subscribe_config_stream` +/// ## for subscribe on changes, pulling to Redis pubsub to get more actual config +/// +/// *input* : `Arc` +/// +/// *output* : `Ok(())` on end of work | `Err(er)` on error with subscribing mechanism +/// +/// *initiator* : fn `subscribe_config_stream` +/// +/// *managing* : `Arc` to compare old config with new pulled +/// +/// *depends on* : `Processes` +/// pub async fn subscribe_config_stream(actual_prcs: Arc) -> Result<(), CustomError> { if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) { if let Ok(mut conn) = client.get_connection() { @@ -259,6 +368,19 @@ pub async fn subscribe_config_stream(actual_prcs: Arc) -> Result<(), Err(CustomError::Fatal) } +/// # Fn `config_comparing` +/// ## for compare old and new configs +/// +/// *input* : local: `&Processes`, remote: `&Processes` +/// +/// *output* : `ConfigActuality::Local` or `ConfigActuality::Remote` +/// +/// *initiator* : fn `subscribe_config_stream`, fn `get_actual_config` +/// +/// *managing* : two objects `&Processes` +/// +/// *depends on* : `Processes`, `ConfigActuality` +/// fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { let local_date: u64 = local.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap(); @@ -277,6 +399,19 @@ fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { // } // } +/// # Fn `save_new_config` +/// ## mechanism for saving new config in local storage +/// +/// *input* : `&Processes`, `&str` +/// +/// *output* : `Ok(())` on succesfull saving | Err(er) on fs error +/// +/// *initiator* : fn `subscribe_config_stream`, fn `get_actual_config` +/// +/// *managing* : new config object: `&Processes` and config file name: `&str` +/// +/// *depends on* : `Processes` +/// fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> { match serde_json::to_string_pretty(&config) { // Ok(st) => match fs::write(config_file, st) { @@ -305,6 +440,19 @@ fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomEr } } +/// # Fn `parse_extern_config` +/// ## for parsing &str to Processes +/// +/// *input* : `&str` +/// +/// *output* : parsed config in Some(Processes) | None on error with parsing +/// +/// *initiator* : fn `subscribe_config_stream`, fn `once_get_remote_configuration`, fn `get_remote_conf` +/// +/// *managing* : unparsed config `&str` +/// +/// *depends on* : `Processes` +/// fn parse_extern_config(json_string: &str) -> Option { if let Ok(des) = serde_json::from_str::(json_string) { return Some(des); From d58f140a29ce6fc1b385202cceff23969d0825e0 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 14:42:31 +0300 Subject: [PATCH 05/10] docs: files --- src/utils/files.rs | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/utils/files.rs b/src/utils/files.rs index 4221da9..6293c7b 100644 --- a/src/utils/files.rs +++ b/src/utils/files.rs @@ -7,6 +7,19 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::Duration; +/// # Fn `create_watcher` +/// ## for creating watcher on file's delete | update events +/// +/// *input* : `&str`, `&str` +/// +/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction +/// +/// *initiator* : fn `file_handler`, fn `utils::run_daemons` +/// +/// *managing* : current file's name: &str, path in local storage to current file: &str +/// +/// *depends on* : - +/// pub async fn create_watcher(filename: &str, path: &str) -> Result { let src = format!("{}{}", path, filename); let inotify: Inotify = Inotify::init()?; @@ -14,6 +27,19 @@ pub async fn create_watcher(filename: &str, path: &str) -> Result>`, `Arc>>` +/// +/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check +/// +/// *initiator* : fn `utils::running_handler` +/// +/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc>`, mut list of file watchers`Arc>>` +/// +/// *depends on* : Files +/// pub async fn file_handler( name: &str, files: &[Files], @@ -97,6 +123,19 @@ pub async fn file_handler( Ok(()) } +/// # Fn `check_file` +/// ## for checking existance of current file +/// +/// *input* : `&str`, `&str` +/// +/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error +/// +/// *initiator* : fn `file_handler` +/// +/// *managing* : current file's name: `&str` and current file's path in local storage: `&str` +/// +/// *depends on* : network activity +/// pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let arc_name = Arc::new(filename.to_string()); let arc_path = Arc::new(path.to_string()); From 15bb446aa413362347d091bd5e963fb494f0bba4 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 14:48:24 +0300 Subject: [PATCH 06/10] docs: hagent --- src/utils/hagent.rs | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/src/utils/hagent.rs b/src/utils/hagent.rs index 6e5910f..c794538 100644 --- a/src/utils/hagent.rs +++ b/src/utils/hagent.rs @@ -1,12 +1,37 @@ // module needed to check host-agent health condition and to communicate with it -/// asdasdasds use tokio::{io::Interest, net::UnixStream}; +/// # Fn `open_unix_socket` +/// ## opening unix-socket for host-agent communication +/// +/// *input* : - +/// +/// *output* : `Ok(socket)` if socket was successfully opened | `Err(er)` if not +/// +/// *initiator* : main thread `(??)` +/// +/// *managing* : - +/// +/// *depends on* : - +/// async fn open_unix_socket() -> Result { let socket = UnixStream::connect("/var/run/enode/hostagent.sock").await?; Ok(socket) } +/// # Fn `ha_healthcheck` +/// ## for checking host-agent state +/// +/// *input* : `&UnixStream` +/// +/// *output* : `Ok(()))` if host-agent is running | `Err(er)` if not +/// +/// *initiator* : main thread `(??)` +/// +/// *managing* : ref on unix-socket object +/// +/// *depends on* : - +/// async fn ha_healthcheck(socket: &UnixStream) -> Result<(), std::io::Error >{ socket.ready(Interest::WRITABLE).await?; if socket.writable().await.is_ok() { @@ -19,7 +44,19 @@ async fn ha_healthcheck(socket: &UnixStream) -> Result<(), std::io::Error >{ Ok(()) } - +/// # Fn `ha_healthcheck` +/// ## for sending data to host-agent using unix-socket +/// +/// *input* : `&UnixStream`, `&str` +/// +/// *output* : `Ok(()))` if data was sent| `Err(er)` if not +/// +/// *initiator* : main thread `(??)` +/// +/// *managing* : socket: `&UnixStream`, data: `&str` +/// +/// *depends on* : - +/// async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), std::io::Error > { socket.ready(Interest::WRITABLE).await?; if socket.writable().await.is_ok() { From 174779bfcbc317696d6ea0497a1d187c361d9fcd Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 15:02:38 +0300 Subject: [PATCH 07/10] docs: metrics (tmp) --- src/utils/metrics.rs | 110 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 100 insertions(+), 10 deletions(-) diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index 65a2e7b..923106a 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -14,18 +14,18 @@ use crate::utils::get_container_id; // type PacketBuffer = Arc>>; -/// # Fn init_metrics_grubber +/// # Fn `init_metrics_grubber` /// ## for initializing process of unstoppable grubbing metrics. /// -/// *input* : `Result<()>` +/// *input* : `Arc>` ?? /// /// *output* : `Err` if it cant create grubbers | `Ok` on finish /// -/// *initiator* : main thread +/// *initiator* : main thread ?? /// /// *managing* : object of unix-socket reader /// -/// *depends on* : network activity +/// *depends on* : - /// pub async fn init_metrics_grubber() { let mut system = System::new(); @@ -79,8 +79,19 @@ async fn gather_metrics(proc: Arc) { // } -// !!! -// for container (whole system metrics) +/// # Fn `get_all_container_metrics` +/// ## for gathering all container (whole system metrics) +/// +/// *input* : `Arc`, `Arc>` +/// +/// *output* : `ContainerMetrics` +/// +/// *initiator* : main thread ?? +/// +/// *managing* : ref counter to `System` object, ref counter to list of processes +/// +/// *depends on* : `TrackingProcess` +/// async fn get_all_container_metrics(sys: Arc, prcs: Arc>) -> ContainerMetrics { let metrics = join!( get_cpu_metrics_container(sys.clone()), @@ -94,22 +105,74 @@ async fn get_all_container_metrics(sys: Arc, prcs: Arc` +/// +/// *output* : `f32` +/// +/// *initiator* : main thread ?? +/// +/// *managing* : ref counter to `System` object +/// +/// *depends on* : - +/// async fn get_cpu_metrics_container(sys: Arc) -> f32 { sys.global_cpu_usage() } + +/// # Fn `get_ram_metrics_container` +/// ## for gathering container ram metrics +/// +/// *input* : `Arc` +/// +/// *output* : `f32` +/// +/// *initiator* : main thread ?? +/// +/// *managing* : ref counter to `System` object +/// +/// *depends on* : - +/// async fn get_ram_metrics_container(sys: Arc) -> f32 { (sys.used_memory() / sys.total_memory()) as f32 * 100.0 } // async fn get_mem_metrics_container(sys: Arc) -> f32 { // sys. // } + +/// # Fn `get_subsystems` +/// ## for gathering info about container subsystems (processes) +/// +/// *input* : `Arc>` +/// +/// *output* : `Vec` +/// +/// *initiator* : main thread ?? +/// +/// *managing* : ref counter to list of `TrackingProcess` +/// +/// *depends on* : `TrackingProcess` +/// async fn get_subsystems(prcs: Arc>) -> Vec { prcs.iter().map(|process| process.name.clone()).collect() } -// !!! -// for process (process metrics) -// % +/// # Fn `get_all_metrics_process` +/// ## for gathering all process' metrics +/// +/// *input* : `Arc`, `Arc` +/// +/// *output* : `ProcessMetrics` +/// +/// *initiator* : main thread ?? +/// +/// *managing* : two ref counters to `Process` and `System` +/// +/// *depends on* : - +/// async fn get_all_metrics_process(proc: Arc, sys: Arc) -> ProcessMetrics { let metrics = join!( get_cpu_metrics_process(proc.clone()), @@ -121,10 +184,37 @@ async fn get_all_metrics_process(proc: Arc, sys: Arc) -> Proces metrics.1 ) } + +/// # Fn `get_cpu_metrics_process` +/// ## for gathering process cpu metrics +/// +/// *input* : `Arc` +/// +/// *output* : `f32` +/// +/// *initiator* : main thread ?? +/// +/// *managing* : ref counter to `Process` object +/// +/// *depends on* : - +/// async fn get_cpu_metrics_process(proc: Arc) -> f32 { proc.cpu_usage() } -// % + +/// # Fn `get_ram_metrics_process` +/// ## for gathering process ram metrics +/// +/// *input* : `Arc` +/// +/// *output* : `f32` +/// +/// *initiator* : main thread ?? +/// +/// *managing* : ref counter to `Process` object +/// +/// *depends on* : - +/// async fn get_ram_metrics_process(proc: Arc, sys: Arc) -> f32 { (proc.memory() as f64 / sys.total_memory() as f64) as f32 * 100.0 as f32 } From c14a6f943b2d43c3be89011a75d9f5153824e9ff Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 15:15:39 +0300 Subject: [PATCH 08/10] docs: prcs --- src/utils/prcs.rs | 112 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 4 deletions(-) diff --git a/src/utils/prcs.rs b/src/utils/prcs.rs index f76a68e..8cc865b 100644 --- a/src/utils/prcs.rs +++ b/src/utils/prcs.rs @@ -1,10 +1,22 @@ use crate::options::structs::CustomError; use log::{error, warn}; -use std::io; use std::process::{Command, Output}; use std::sync::Arc; use tokio::time::Duration; +/// # Fn `get_pid` +/// ## for initializing process of unstoppable grubbing metrics. +/// +/// *input* : `&str` +/// +/// *output* : `None` if cant get process PID | `Some(Output)` on success +/// +/// *initiator* : fn `is_frozen`, fn `utils::files::file_handler`, fn `utils::files::service_handler` +/// +/// *managing* : process name +/// +/// *depends on* : - +/// pub async fn get_pid(name: &str) -> Option { let name = Arc::new(name.to_string()); let res = @@ -20,8 +32,20 @@ pub async fn get_pid(name: &str) -> Option { None } } -// ! can be with bug !!! -// * APPROVED + +/// # Fn `is_active` +/// ## for checking process's activity state +/// +/// *input* : `&str` +/// +/// *output* : `true` if process running | `false` if not +/// +/// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler` +/// +/// *managing* : process name +/// +/// *depends on* : - +/// pub async fn is_active(name: &str) -> bool { let arc_name = Arc::new(name.to_string()); tokio::task::spawn_blocking(move || { @@ -38,7 +62,19 @@ pub async fn is_active(name: &str) -> bool { .unwrap() } -// T is for stopped processes +/// # Fn `is_frozen` +/// ## for checking process's hibernation state +/// +/// *input* : `&str` +/// +/// *output* : `true` if process is frozen | `false` if not +/// +/// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler` +/// +/// *managing* : process name +/// +/// *depends on* : fn `get_pid` +/// pub async fn is_frozen(name: &str) -> bool { let temp: Output; if let Some(output) = get_pid(name).await { @@ -66,6 +102,20 @@ pub async fn is_frozen(name: &str) -> bool { .unwrap() } } + +/// # Fn `terminate_process` +/// ## for stop current process +/// +/// *input* : `&str` +/// +/// *output* : - +/// +/// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler`, fn `utils::run_daemons` +/// +/// *managing* : process name +/// +/// *depends on* : - +/// pub async fn terminate_process(name: &str) { let _ = Command::new("pkill") .arg(name) @@ -76,6 +126,19 @@ pub async fn terminate_process(name: &str) { }); } +/// # Fn `terminate_process` +/// ## for freeze/hibernate current process +/// +/// *input* : `&str` +/// +/// *output* : - +/// +/// *initiator* : fn `utils::run_daemons` +/// +/// *managing* : process name +/// +/// *depends on* : - +/// pub async fn freeze_process(name: &str) { let _ = Command::new("pkill") .args(["-STOP", name]) @@ -85,6 +148,20 @@ pub async fn freeze_process(name: &str) { std::process::exit(101); }); } + +/// # Fn `unfreeze_process` +/// ## for unfreeze/hibernate current process +/// +/// *input* : `&str` +/// +/// *output* : - +/// +/// *initiator* : fn `utils::run_daemons` +/// +/// *managing* : process name +/// +/// *depends on* : - +/// pub async fn unfreeze_process(name: &str) { let _ = Command::new("pkill") .args(["-CONT", name]) @@ -94,12 +171,39 @@ pub async fn unfreeze_process(name: &str) { std::process::exit(101); }); } + +/// # Fn `restart_process` +/// ## for restarting current process +/// +/// *input* : `&str`, &str +/// +/// *output* : - +/// +/// *initiator* : fn `utils::run_daemons` +/// +/// *managing* : process name and path to its exec file +/// +/// *depends on* : fn `start_process`, fn `terminate_process` +/// pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { terminate_process(name).await; tokio::time::sleep(Duration::from_millis(100)).await; start_process(name, path).await } +/// # Fn `start_process` +/// ## for starting current process +/// +/// *input* : `&str`, &str +/// +/// *output* : - +/// +/// *initiator* : fn `restart_process` +/// +/// *managing* : process name and path to its exec file +/// +/// *depends on* : - +/// pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { // let runsh = format!("{} {}", "exec", path); let mut command = Command::new(path); From adb3858ee4fdf00c8ec28cc70604f121e3dea6d5 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 15:29:00 +0300 Subject: [PATCH 09/10] docs: services + prcs fix --- src/utils/prcs.rs | 10 +++++----- src/utils/services.rs | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/utils/prcs.rs b/src/utils/prcs.rs index 8cc865b..7c718ab 100644 --- a/src/utils/prcs.rs +++ b/src/utils/prcs.rs @@ -108,7 +108,7 @@ pub async fn is_frozen(name: &str) -> bool { /// /// *input* : `&str` /// -/// *output* : - +/// *output* : () /// /// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler`, fn `utils::run_daemons` /// @@ -131,7 +131,7 @@ pub async fn terminate_process(name: &str) { /// /// *input* : `&str` /// -/// *output* : - +/// *output* : () /// /// *initiator* : fn `utils::run_daemons` /// @@ -154,7 +154,7 @@ pub async fn freeze_process(name: &str) { /// /// *input* : `&str` /// -/// *output* : - +/// *output* : () /// /// *initiator* : fn `utils::run_daemons` /// @@ -177,7 +177,7 @@ pub async fn unfreeze_process(name: &str) { /// /// *input* : `&str`, &str /// -/// *output* : - +/// *output* : () /// /// *initiator* : fn `utils::run_daemons` /// @@ -196,7 +196,7 @@ pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> /// /// *input* : `&str`, &str /// -/// *output* : - +/// *output* : () /// /// *initiator* : fn `restart_process` /// diff --git a/src/utils/services.rs b/src/utils/services.rs index 234be50..d89ee6b 100644 --- a/src/utils/services.rs +++ b/src/utils/services.rs @@ -6,6 +6,19 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; +/// # Fn `service_handler` +/// ## function to realize mechanism of current process' dep services monitoring +/// +/// *input* : `&str`, `&Vec`, `Arc>` +/// +/// *output* : () +/// +/// *initiator* : fn `utils::running_handler` +/// +/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer +/// +/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` +/// pub async fn service_handler( name: &str, services: &Vec, @@ -54,6 +67,19 @@ pub async fn service_handler( Ok(()) } +/// # Fn `looped_service_connecting` +/// ## for service's state check in loop (with delay and restriction of attempts) +/// +/// *input* : `&str`, `&Services` +/// +/// *output* : Ok(()) if service now available | Err(er) if still not +/// +/// *initiator* : fn `service_handler` +/// +/// *managing* : process name, current service struct +/// +/// *depends on* : fn `check_service` +/// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { if serv.triggers.wait == 0 { loop { @@ -103,6 +129,19 @@ async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), Cu } } +/// # Fn `check_service` +/// ## for check current service's availiability +/// +/// *input* : `&str`, `&u32` +/// +/// *output* : Ok(()) if service now available | Err(er) if still not +/// +/// *initiator* : fn `service_handler`, fn `looped_service_connecting` +/// +/// *managing* : hostname, port +/// +/// *depends on* : - +/// // ! have to be rewritten // todo: rewrite use async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { From e89d1be8e487ec5474fd45596de467de579549de Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 13 Nov 2024 15:42:39 +0300 Subject: [PATCH 10/10] docs: utils --- src/utils.rs | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index 9c7e92a..6d6c334 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -22,9 +22,21 @@ use tokio::time::Duration; const GET_ID_CMD: &str = "hostname"; -/// # async func to run 3 main daemons (now it's more like tree-form than classical 0.1.0 form ) -/// > hint : give mpsc with capacity 1 to jump over potential errors during running process -/// > ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") ** +/// # Fn `run_daemons` +/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel +/// +/// *input* : `Arc`, `Arc>`, `&mut mpsc::Receiver`, +/// +/// *output* : () +/// +/// *initiator* : main thread +/// +/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader +/// +/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher` +/// +/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process +/// pub async fn run_daemons( proc: Arc, tx: Arc>, @@ -167,6 +179,19 @@ pub async fn run_daemons( tokio::task::yield_now().await; } // check process status daemon +/// # Fn `run_daemons` +/// ## func to async exec subjobs of checking process, services and files states +/// +/// *input* : `Arc`, `Arc>`, `Arc>>` +/// +/// *output* : () +/// +/// *initiator* : fn `run_daemons` +/// +/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers +/// +/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` +/// pub async fn running_handler( prc: Arc, tx: Arc>, @@ -199,6 +224,19 @@ pub async fn running_handler( } // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' +/// # Fn `get_container_id` +/// ## for getting container id used in logs +/// +/// *input* : - +/// +/// *output* : Some(String) if cont-id was grubbed | None - if not +/// +/// *initiator* : fn `options::logger::setup_logger` +/// +/// *managing* : - +/// +/// *depends on* : - +/// pub fn get_container_id() -> Option { match Command::new(GET_ID_CMD).output() { Ok(output) => {