From 6a9491d3b7aa0557eb59fd6e85bd570aa52e1f53 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 11 Nov 2024 10:34:45 +0300 Subject: [PATCH 1/5] init commit --- src/utils/metrics.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index 221ca5f..4de54ba 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -2,15 +2,14 @@ // cpu load, ram/rom load and net activity use std::sync::Mutex; -use std::{str::FromStr, sync::Arc}; +use std::sync::Arc; use crate::options::structs::TrackingProcess; -use sysinfo::{Pid, Process, System}; +use sysinfo::{Process, System}; use tokio::join; -use crate::utils::prcs::get_pid; use crate::options::structs::{ProcessMetrics, ContainerMetrics, PacketInfo}; -use pcap::{Device, Capture, Active}; -use std::net::Ipv4Addr; -use anyhow::{Result, Ok}; +// use pcap::{Device, Capture, Active}; +// use std::net::Ipv4Addr; +// use anyhow::{Result, Ok}; type PacketBuffer = Arc>>; From 4b897c00637990a60becb6fbaed82a9c9c2aca5e Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 12 Nov 2024 12:11:04 +0300 Subject: [PATCH 2/5] fixed metrics struct --- src/options/structs.rs | 18 +++++++++++------- src/utils/hagent.rs | 34 +++++++++++++++++++++++++++++++--- src/utils/metrics.rs | 14 ++++++++++++-- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/src/options/structs.rs b/src/options/structs.rs index 55020c1..1aedb94 100644 --- a/src/options/structs.rs +++ b/src/options/structs.rs @@ -105,15 +105,17 @@ impl Metrics { /// #[derive(Debug, Clone)] pub struct ContainerMetrics { - pub cpu_load : f32, - pub ram_load : f32, + container_id : String, + cpu_load : f32, + ram_load : f32, // pub net_activity : ??? - pub processes : Vec, + processes : Vec, } impl ContainerMetrics { - pub fn new(cpu: f32, ram: f32, subsystems: Vec,) -> Self{ + pub fn new(container_id : &str, cpu: f32, ram: f32, subsystems: Vec,) -> Self{ ContainerMetrics { + container_id : String::from(container_id), cpu_load : cpu, ram_load : ram, processes : subsystems, @@ -125,13 +127,15 @@ impl ContainerMetrics { /// #[derive(Debug, Clone)] pub struct ProcessMetrics { - pub cpu_load : f32, - pub ram_load : f32, + process_name : String, + cpu_load : f32, + ram_load : f32, } impl ProcessMetrics { - pub fn new(cpu: f32, ram: f32) -> Self { + pub fn new(process_name :&str, cpu: f32, ram: f32) -> Self { ProcessMetrics { + process_name : String::from(process_name), cpu_load : cpu, ram_load : ram, } diff --git a/src/utils/hagent.rs b/src/utils/hagent.rs index 39c464c..4fd8d75 100644 --- a/src/utils/hagent.rs +++ b/src/utils/hagent.rs @@ -1,13 +1,38 @@ // module needed to check host-agent health condition and to communicate with it -use tokio::net::UnixStream; +use tokio::{io::Interest, net::UnixStream}; // // code will be here // async fn open_unix_socket() -> Result { - let socket = UnixStream::connect("/var/run/enode/hostagent.sock=").await?; + let socket = UnixStream::connect("/var/run/enode/hostagent.sock").await?; Ok(socket) } +async fn ha_healthcheck(socket: &UnixStream) -> Result<(), std::io::Error >{ + socket.ready(Interest::WRITABLE).await?; + if socket.writable().await.is_ok() { + if let Err(er) = socket.try_write(b"Hello HAgent") { + return Err(er); + } + } else { + return Err(std::io::ErrorKind::WouldBlock.into()); + } + Ok(()) +} + + +async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), std::io::Error > { + socket.ready(Interest::WRITABLE).await?; + if socket.writable().await.is_ok() { + if let Err(er) = socket.try_write(data.as_bytes()) { + return Err(er); + } + } else { + return Err(std::io::ErrorKind::WouldBlock.into()); + } + Ok(()) +} + #[cfg(test)] mod hagent_unittets { use super::*; @@ -15,7 +40,10 @@ mod hagent_unittets { // maybe bool : true -> alive, false -> dead // simple request on api async fn hagent_healthcheck() { - assert!(true); + let sock = open_unix_socket().await; + assert!(sock.is_ok()); + let sock = sock.unwrap(); + assert!(ha_healthcheck(&sock).await.is_ok()); } #[tokio::test] // Result diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index 4de54ba..7e8e663 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -7,6 +7,7 @@ use crate::options::structs::TrackingProcess; use sysinfo::{Process, System}; use tokio::join; use crate::options::structs::{ProcessMetrics, ContainerMetrics, PacketInfo}; +use crate::utils::get_container_id; // use pcap::{Device, Capture, Active}; // use std::net::Ipv4Addr; // use anyhow::{Result, Ok}; @@ -83,7 +84,12 @@ async fn get_all_container_metrics(sys: Arc, prcs: Arc) -> f32 { sys.global_cpu_usage() @@ -106,7 +112,11 @@ async fn get_all_metrics_process(proc: Arc, sys: Arc) -> Proces get_cpu_metrics_process(proc.clone()), get_ram_metrics_process(proc.clone(), sys.clone()) ); - ProcessMetrics::new(metrics.0, metrics.1) + ProcessMetrics::new( + proc.name().to_str().unwrap_or("unknown"), + metrics.0, + metrics.1 + ) } async fn get_cpu_metrics_process(proc: Arc) -> f32 { proc.cpu_usage() From 7677fa1f55d63f421cbed6a6cba4fe4fe67b26ee Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 12 Nov 2024 12:28:46 +0300 Subject: [PATCH 3/5] fixed hagent work --- src/options/structs.rs | 10 +++++----- src/utils/hagent.rs | 18 ++++++++++++++++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/options/structs.rs b/src/options/structs.rs index 1aedb94..d0ea23d 100644 --- a/src/options/structs.rs +++ b/src/options/structs.rs @@ -84,7 +84,7 @@ pub struct FIleTriggers { /// /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize,)] pub struct Metrics { pub container_metrics : ContainerMetrics, pub processes_metrics : Vec, @@ -103,7 +103,7 @@ impl Metrics { /// /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct ContainerMetrics { container_id : String, cpu_load : f32, @@ -125,9 +125,9 @@ impl ContainerMetrics { /// /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct ProcessMetrics { - process_name : String, + pub process_name : String, cpu_load : f32, ram_load : f32, } @@ -142,7 +142,7 @@ impl ProcessMetrics { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct PacketInfo { protocol : String, dst_ip : Ipv4Addr, diff --git a/src/utils/hagent.rs b/src/utils/hagent.rs index 4fd8d75..c74cb9e 100644 --- a/src/utils/hagent.rs +++ b/src/utils/hagent.rs @@ -35,6 +35,10 @@ async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), std::io::Er #[cfg(test)] mod hagent_unittets { + use log::info; + + use crate::utils::metrics; + use super::*; #[tokio::test] // maybe bool : true -> alive, false -> dead @@ -46,10 +50,20 @@ mod hagent_unittets { assert!(ha_healthcheck(&sock).await.is_ok()); } #[tokio::test] - // Result + // --Result // one-shot func async fn send_metrics_to_hagent() { - assert!(true); + let procm = crate::options::structs::ProcessMetrics::new("test-prc", 15.0, 5.0); + let contm = crate::options::structs::ContainerMetrics::new("test", 32.0, 12.0, vec![procm.process_name.clone()]); + let metrics = crate::options::structs::Metrics::new(contm, vec![procm]); + let metrics = &serde_json::to_string_pretty(&metrics).unwrap(); + + let sock = open_unix_socket().await; + assert!(sock.is_ok()); + let sock = sock.unwrap(); + assert!(ha_healthcheck(&sock).await.is_ok()); + assert!(ha_send_data(&sock, &metrics).await.is_ok()); + } #[tokio::test] async fn open_unixsocket_test() { From 9fb57ba3a7718b69f4e601e00be6ef318065dc9c Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 12 Nov 2024 15:24:19 +0300 Subject: [PATCH 4/5] fixed no local config mech --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 2 +- src/options/config.rs | 103 ++++++++++++++++++++++-------------------- src/utils/hagent.rs | 12 ++--- 5 files changed, 62 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 426fa8a..bc70f37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,7 +618,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "runner-rs" -version = "0.5.5" +version = "0.9.24" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 5432f63..1216761 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "runner-rs" -version = "0.5.5" +version = "0.9.24" edition = "2021" [profile.dev] diff --git a/src/main.rs b/src/main.rs index 33fcc5a..6f161d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ async fn main() { // setting up redis connection \ // then conf checks to choose the most actual \ - let processes: Processes = get_actual_config().unwrap_or_else(|| { + let processes: Processes = get_actual_config().await.unwrap_or_else(|| { error!("No actual configuration for runner. Stopping..."); std::process::exit(101); }); diff --git a/src/options/config.rs b/src/options/config.rs index c678655..af233c5 100644 --- a/src/options/config.rs +++ b/src/options/config.rs @@ -22,9 +22,9 @@ fn load_processes(json_filename: &str) -> Option { None } -pub fn get_actual_config() -> Option { - // * if no conf -> loop and +inf getting conf from redis server - // let mut local = load_processes(&CONFIG_PATH); +pub async fn get_actual_config() -> Option { + // * if no local conf -> loop and +inf getting conf from redis server + // * if local conf -> once getting conf from redis server match load_processes(CONFIG_PATH) { Some(local_conf) => { info!( @@ -32,6 +32,7 @@ pub fn get_actual_config() -> Option { &local_conf.date_of_creation ); if let Some(remote_conf) = + // TODO : rework with pubsub mech once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server)) { return match config_comparing(&local_conf, &remote_conf) { @@ -51,15 +52,60 @@ pub fn get_actual_config() -> Option { Some(local_conf) } None => { - // ? ? OUTSTANDING CONSTRUCTION ? + warn!("No local valid conf was found. Trying to pull remote one..."); let mut conn = get_connection_watcher(&open_watcher("redis://localhost/")); - get_stream_info_watcher(&mut conn); - let remote_config = invalid_config_watcher(&mut conn); - let _ = save_new_config(&remote_config, CONFIG_PATH); - Some(remote_config) + let remote_config = get_remote_conf_watcher(&mut conn).await; + if let Some(conf) = remote_config { + info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); + let _ = save_new_config(&conf, CONFIG_PATH); + return Some(conf); + } + None } } } + +async fn get_remote_conf_watcher(conn : &mut Connection) -> Option { + let mut conn = conn.as_pubsub(); + let cont = crate::utils::get_container_id(); + loop { + match cont { + Some(ref cont) => { + let cont = cont.trim(); + if conn.subscribe(cont).is_err() { + // todo : delay + continue; + } + match conn.get_message() { + Ok(msg) => { + let msg: Result = msg.get_payload(); + if let Ok(payload) = msg { + if let Some(remote) = parse_extern_config(&payload) { + return Some(remote) + } + else { + error!("Pulled invalid config, cannot start. Waiting for remote conf..."); + } + } else { + error!("Cannot get Redis message payload. Waiting for remote conf..."); + } + // todo : delay + continue; + }, + Err(_) => { + // todo : delay + continue; + }, + } + }, + None => { + error!("Cannot get container id. Returning"); + break + }, + } + } + None +} // ! once iter exec // ! only for situation when local isn't None (no need to fck redis server) fn once_get_remote_configuration(serv_info: &str) -> Option { @@ -154,47 +200,6 @@ fn get_connection_watcher(client: &Client) -> Connection { } } } -fn get_stream_info_watcher(conn: &mut Connection) { - loop { - if let Ok(val) = conn.xlen::<&str, usize>("config_stream") { - if val != 0 { - info!("Redis stream is able and not empty now"); - return; - } - } - error!("Configuration pulling from Redis stream failed. Check stream state! Retrying..."); - std::thread::sleep(Duration::from_secs(4)); - } -} -fn invalid_config_watcher(conn: &mut Connection) -> Processes { - loop { - if let Some(prcs) = get_remote_config(conn) { - info!( - "Got new config from Redis-Server, version - {}", - &prcs.date_of_creation - ); - return prcs; - } - error!("Got INVALID configuration. Update config! Retrying..."); - std::thread::sleep(Duration::from_secs(4)); - } -} - -// ! end of watchers - -fn get_remote_config(conn: &mut Connection) -> Option { - let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1); - if let Ok(result) = res { - if result.is_empty() { - return None; - } - let config = &result[0]; - if !config.is_empty() { - return parse_extern_config(&config[0].1[0].1); - } - } - None -} fn restart_main_thread() -> std::io::Result<()> { let current_exe = env::current_exe()?; diff --git a/src/utils/hagent.rs b/src/utils/hagent.rs index c74cb9e..3f02098 100644 --- a/src/utils/hagent.rs +++ b/src/utils/hagent.rs @@ -35,10 +35,6 @@ async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), std::io::Er #[cfg(test)] mod hagent_unittets { - use log::info; - - use crate::utils::metrics; - use super::*; #[tokio::test] // maybe bool : true -> alive, false -> dead @@ -53,9 +49,11 @@ mod hagent_unittets { // --Result // one-shot func async fn send_metrics_to_hagent() { - let procm = crate::options::structs::ProcessMetrics::new("test-prc", 15.0, 5.0); - let contm = crate::options::structs::ContainerMetrics::new("test", 32.0, 12.0, vec![procm.process_name.clone()]); - let metrics = crate::options::structs::Metrics::new(contm, vec![procm]); + use crate::options::structs::{ProcessMetrics, ContainerMetrics, Metrics}; + + let procm = ProcessMetrics::new("test-prc", 15.0, 5.0); + let contm = ContainerMetrics::new("test", 32.0, 12.0, vec![procm.process_name.clone()]); + let metrics = Metrics::new(contm, vec![procm]); let metrics = &serde_json::to_string_pretty(&metrics).unwrap(); let sock = open_unix_socket().await; From 535398e58a006c23bf8b330be719c185ba38d7d0 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 12 Nov 2024 16:14:11 +0300 Subject: [PATCH 5/5] fixed all config error and questions --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/options/config.rs | 74 ++++++++++++++++++++----------------------- src/utils/hagent.rs | 5 ++- 4 files changed, 39 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc70f37..a9efa2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,7 +618,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "runner-rs" -version = "0.9.24" +version = "0.9.25" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 1216761..67d68e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "runner-rs" -version = "0.9.24" +version = "0.9.25" edition = "2021" [profile.dev] diff --git a/src/options/config.rs b/src/options/config.rs index af233c5..7643fdd 100644 --- a/src/options/config.rs +++ b/src/options/config.rs @@ -1,6 +1,6 @@ use crate::options::structs::*; use log::{error, info, warn}; -use redis::{Client, Commands, Connection, RedisResult}; +use redis::{Client, Connection}; use std::fs::OpenOptions; use std::io::Write; use std::os::unix::process::CommandExt; @@ -11,7 +11,6 @@ use tokio::time::Duration; const CONFIG_PATH: &str = "settings.json"; -type Res = RedisResult)>>>; // 4ever sync fn load_processes(json_filename: &str) -> Option { if let Ok(res) = fs::read_to_string(json_filename) { @@ -109,49 +108,46 @@ async fn get_remote_conf_watcher(conn : &mut Connection) -> Option { // ! once iter exec // ! only for situation when local isn't None (no need to fck redis server) fn once_get_remote_configuration(serv_info: &str) -> Option { + let cont = crate::utils::get_container_id(); match Client::open(serv_info) { Ok(client) => { match client.get_connection() { Ok(mut conn) => { - if let Ok(len) = conn.xlen::<&str, usize>("config_stream") { - if len == 0 { - warn!("No configuration in DB yet"); - None - } else { - let conf: Res = conn.xrevrange_count("config_stream", "+", "-", 1); - let config: &Vec<(String, Vec<(String, String)>)>; - - if conf.is_ok() { - // guaranteed and safe unwrapping - let conf = conf.unwrap(); - config = &conf[0]; - if config.is_empty() { - error!( - "Empty config was pulled. Check stream and configs state!" - ); - return None; - } - match parse_extern_config(&config[0].1[0].1) { - Some(prcs) => { - info!( - "Config {} was pulled from Redis-Server", - &prcs.date_of_creation - ); - Some(prcs) - } - None => { - error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); - None - } - } - } else { - error!("Configuration pulling from Redis stream failed. Check stream state!"); - None + let mut conn = conn.as_pubsub(); + match conn.subscribe(cont) { + Ok(_) => { + if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() { + error!("Cannot set reading pubsub timeout and pull remote config"); + return None; } + match conn.get_message() { + Ok(msg) => { + info!("Pulled config from Redis Server"); + let get_payload: Result = msg.get_payload(); + match get_payload { + Ok(payload) => { + let remote = parse_extern_config(&payload); + if remote.is_none() { + error!("Pulled config is invalid. Check it in Redis Server"); + } + return remote; + }, + Err(_) => { + error!("Cannot extract payload from new message. Check Redis Server state"); + return None; + }, + } + }, + Err(_) => { + warn!("Cannot get config from Redis Server. Empty channel"); + return None; + }, + } + }, + Err(_) => { + error!("Redis subscription process failed. Check Redis configuration!"); + return None; } - } else { - error!("Cannot find config_stream. Check Redis-stream accessibility!"); - None } } Err(_) => { diff --git a/src/utils/hagent.rs b/src/utils/hagent.rs index 3f02098..6e5910f 100644 --- a/src/utils/hagent.rs +++ b/src/utils/hagent.rs @@ -1,8 +1,7 @@ // module needed to check host-agent health condition and to communicate with it +/// asdasdasds use tokio::{io::Interest, net::UnixStream}; -// -// code will be here -// + async fn open_unix_socket() -> Result { let socket = UnixStream::connect("/var/run/enode/hostagent.sock").await?; Ok(socket)