Compare commits

...

5 Commits

Author SHA1 Message Date
prplV dbb49de09c preboot unitetst + refactor 2024-12-03 15:25:24 +03:00
prplV 74e3b3ab31 no warnings 2024-12-03 13:11:19 +03:00
prplV 8a47ffe851 prcs refactor 2024-12-03 13:09:59 +03:00
prplV 345abde741 no metrics warnings 2024-12-03 13:09:02 +03:00
prplV ba395543e4 no warnings 2024-12-03 13:06:17 +03:00
8 changed files with 162 additions and 70 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "runner-rs" name = "runner-rs"
version = "0.9.25" version = "0.10.9"
edition = "2021" edition = "2021"
[profile.dev] [profile.dev]
@ -9,7 +9,6 @@ debug = true
[profile.test] [profile.test]
debug = false debug = false
[dependencies] [dependencies]
anyhow = "1.0.93" anyhow = "1.0.93"
chrono = "0.4.38" chrono = "0.4.38"

View File

@ -4,12 +4,12 @@
"processes": [ "processes": [
{ {
"name": "temp-process", "name": "temp-process",
"path": "/usr/src/kii/temp-process", "path": "./temp-process",
"dependencies": { "dependencies": {
"files": [ "files": [
{ {
"filename": "dep-file", "filename": "dep-file",
"src": "/usr/src/kii/tests/examples/", "src": "./tests/examples/",
"triggers": { "triggers": {
"onDelete": "stop", "onDelete": "stop",
"onChange": "stay" "onChange": "stay"

View File

@ -6,19 +6,17 @@ use log::{error, info};
use options::config::*; use options::config::*;
use options::logger::setup_logger; use options::logger::setup_logger;
use options::signals::set_valid_destructor; use options::signals::set_valid_destructor;
use options::structs::*; use options::structs::Processes;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::*; use utils::*;
use options::preboot::*; #[allow(unused_imports)]
use options::preboot::PrebootParams;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() { async fn main() {
use options::structs::PrebootParams; let preboot = PrebootParams::parse().validate();
let a = PrebootParams::parse();
println!("{:?}", a);
let _ = setup_logger(); let _ = setup_logger();

View File

@ -1,10 +1,142 @@
// module to handle pre-boot params of the monitor // module to handle pre-boot params of the monitor
use super::structs::PrebootParams; use anyhow::{Result, Ok};
use clap::Parser;
#[derive(clap::ValueEnum, Debug, Clone)]
enum MetricsPrebootParams {
Full,
System,
Processes,
Net,
None,
}
impl std::fmt::Display for MetricsPrebootParams {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
MetricsPrebootParams::Full => write!(f, "full"),
MetricsPrebootParams::System => write!(f, "system"),
MetricsPrebootParams::Processes => write!(f, "processes"),
MetricsPrebootParams::Net => write!(f, "net"),
MetricsPrebootParams::None => write!(f, "none"),
}
}
}
#[derive(Debug, Parser)]
pub struct PrebootParams {
// actions
#[arg(long = "no-hagent", action, conflicts_with="socket_path", help="To disable work with host-agent")]
pub no_hostagent : bool,
#[arg(long = "no-logs", action, conflicts_with="log_to", help="To disable logs")]
pub no_logs: bool,
#[arg(long = "refresh-logs", action, conflicts_with="no_logs", help="To clear logs directory")]
pub refresh_logs : bool,
#[arg(long = "no-remote-config", action, help="To disable work with remote config server", conflicts_with="no_sub")]
pub no_remote_config : bool,
#[arg(long = "no-sub", action, help="To disable subscription mechanism", conflicts_with="no_remote_config")]
pub no_sub : bool,
// params (socket_path, log_to, remote_server_url, config)
#[arg(long = "socket-path", default_value=None, conflicts_with="no_hostagent", help="To set .sock file's path used in communication with host-agent")]
socket_path : Option<String>,
#[arg(long = "log-to", default_value=None, conflicts_with="no_logs", help="To set a path to logs directory")]
log_to : Option<String>,
#[arg(long = "remote-server-url", default_value="redis://localhost", conflicts_with="no_remote_config", help = "To set url of remote config server using in remote config pulling mechanism")]
remote_server_url : String,
#[arg(long = "config", short, default_value="settings.json", help="To set local config file path")]
config : String,
// value enum params (metrics)
#[arg(long = "metrics", short, default_value_t=MetricsPrebootParams::Full, help="To set metrics grubbing mode")]
metrics: MetricsPrebootParams,
}
impl PrebootParams {
pub fn validate(self) -> Result<Self> {
// existing sock file
// existing log dir
// existing sock file
// existing sock file
Ok(self)
}
}
// unit tests of preboot params parsing mech
#[cfg(test)]
mod preboot_unitests{
use super::*;
#[test]
// unit tests of prebot params parsing mech fn parsing_zero_args() {
assert!(PrebootParams::try_parse_from(vec!["runner-rs"]).is_ok())
}
#[test]
fn parsing_hagent_valid_args() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--socket-path", "/path/to/socket"
]).is_ok())
}
#[test]
fn parsing_hagent_invalid_args() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--socket-path", "/path/to/socket",
"--no-hagent"
]).is_err())
}
#[test]
fn parsing_log_valid_args() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--log-to", "/path/to/log/dir"
]).is_ok())
}
#[test]
fn parsing_log_invalid_args() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--log-to /path/to/log/dir",
"--no-logs"
]).is_err())
}
#[test]
fn parsing_config_valid_args() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--no-sub",
"--remote-server-url", "redis://127.0.0.1"
]).is_ok())
}
#[test]
fn parsing_config_invalid_args_noremote_nosub() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--no-remote-config", "--no-sub"
]).is_err())
}
#[test]
fn parsing_config_invalid_args_noremote_remoteurl() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--no-remote-config",
"--remote-server-url", "redis://127.0.0.1"
]).is_err())
}
#[test]
fn parsing_metrics_args_using_value_enum() {
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "full"]).is_ok());
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "system"]).is_ok());
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "processes"]).is_ok());
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "net"]).is_ok());
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "none"]).is_ok());
assert!(PrebootParams::try_parse_from(vec!["runner-rs", "--metrics", "unusual_value"]).is_err());
}
}

View File

@ -2,63 +2,12 @@
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use clap::Parser;
/// # an Error enum (next will be deleted and replaced) /// # an Error enum (next will be deleted and replaced)
pub enum CustomError { pub enum CustomError {
Fatal, Fatal,
} }
#[derive(clap::ValueEnum, Debug, Clone)]
enum MetricsPrebootParams {
Full,
System,
Processes,
Net,
None,
}
impl std::fmt::Display for MetricsPrebootParams {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
MetricsPrebootParams::Full => write!(f, "full"),
MetricsPrebootParams::System => write!(f, "system"),
MetricsPrebootParams::Processes => write!(f, "processes"),
MetricsPrebootParams::Net => write!(f, "net"),
MetricsPrebootParams::None => write!(f, "none"),
}
}
}
#[derive(Debug, Parser)]
pub struct PrebootParams {
// actions
#[arg(long = "no-hagent", action, conflicts_with="socket_path", help="To disable work with host-agent")]
no_hostagent : bool,
#[arg(long = "no-logs", action, conflicts_with="log_to", help="To disable logs")]
no_logs: bool,
#[arg(long = "refresh-logs", action, conflicts_with="no_logs", help="To clear logs directory")]
refresh_logs : bool,
#[arg(long = "no-remote-config", action, help="To disable work with remote config server", conflicts_with="no_sub")]
no_remote_config : bool,
#[arg(long = "no-sub", action, help="To disable subscription mechanism", conflicts_with="no_remote_config")]
no_sub : bool,
// params (socket_path, log_to, remote_server_url, config)
#[arg(long = "socket-path", default_value=None, conflicts_with="no_hostagent", help="To set .sock file's path used in communication with host-agent")]
socket_path : Option<String>,
#[arg(long = "log-to", default_value=None, conflicts_with="no_logs", help="To set a path to logs directory")]
log_to : Option<String>,
#[arg(long = "remote-server-url", default_value="redis://localhost", conflicts_with="no_remote_config", help = "To set url of remote config server using in remote config pulling mechanism")]
remote_server_url : String,
#[arg(long = "config", short, default_value="settings.json", help="To set local config file path")]
config : String,
// value enum params (metrics)
#[arg(long = "metrics", short, default_value_t=MetricsPrebootParams::Full, help="To set metrics grubbing mode")]
metrics: MetricsPrebootParams,
}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum ConfigActuality { pub enum ConfigActuality {
Local, Local,

View File

@ -1,6 +1,10 @@
//
// module needed to check host-agent health condition and to communicate with it // module needed to check host-agent health condition and to communicate with it
//
use tokio::{io::Interest, net::UnixStream}; use tokio::{io::Interest, net::UnixStream};
use anyhow::{Ok, Result, Error}; use anyhow::{Ok, Result, Error};
// to kill lint bug
#[allow(unused_imports)]
use tokio::net::UnixListener; use tokio::net::UnixListener;
/// # Fn `open_unix_socket` /// # Fn `open_unix_socket`
@ -16,6 +20,7 @@ use tokio::net::UnixListener;
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
#[allow(dead_code)]
async fn open_unix_socket(sock_path: &str) -> Result<UnixStream, std::io::Error> { async fn open_unix_socket(sock_path: &str) -> Result<UnixStream, std::io::Error> {
// "/var/run/enode/hostagent.sock" // "/var/run/enode/hostagent.sock"
UnixStream::connect(sock_path).await UnixStream::connect(sock_path).await
@ -34,6 +39,7 @@ async fn open_unix_socket(sock_path: &str) -> Result<UnixStream, std::io::Error>
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
#[allow(dead_code)]
async fn ha_healthcheck(socket: &UnixStream) -> Result<(), Error> { async fn ha_healthcheck(socket: &UnixStream) -> Result<(), Error> {
socket.ready(Interest::WRITABLE).await?; socket.ready(Interest::WRITABLE).await?;
socket.writable().await?; socket.writable().await?;
@ -54,6 +60,7 @@ async fn ha_healthcheck(socket: &UnixStream) -> Result<(), Error> {
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
#[allow(dead_code)]
async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), Error > { async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), Error > {
socket.ready(Interest::WRITABLE).await?; socket.ready(Interest::WRITABLE).await?;
socket.writable().await?; socket.writable().await?;
@ -74,10 +81,10 @@ mod hagent_unittets {
// maybe bool : true -> alive, false -> dead // maybe bool : true -> alive, false -> dead
// simple request on api // simple request on api
async fn hagent_healthcheck() { async fn hagent_healthcheck() {
let mut list = init_listener().await; let _ = init_listener().await;
let sock = open_unix_socket(TEST_SOCKET).await; let sock = open_unix_socket(TEST_SOCKET).await;
assert!(sock.is_ok()); assert!(sock.is_ok());
let mut sock = sock.unwrap(); let sock = sock.unwrap();
assert!(ha_healthcheck(&sock).await.is_ok()); assert!(ha_healthcheck(&sock).await.is_ok());
} }
#[tokio::test] #[tokio::test]
@ -91,17 +98,17 @@ mod hagent_unittets {
let metrics = Metrics::new(contm, vec![procm]); let metrics = Metrics::new(contm, vec![procm]);
let metrics = &serde_json::to_string_pretty(&metrics).unwrap(); let metrics = &serde_json::to_string_pretty(&metrics).unwrap();
let mut list = init_listener().await; let _ = init_listener().await;
let sock = open_unix_socket(TEST_SOCKET).await; let sock = open_unix_socket(TEST_SOCKET).await;
assert!(sock.is_ok()); assert!(sock.is_ok());
let mut sock = sock.unwrap(); let sock = sock.unwrap();
assert!(ha_healthcheck(&sock).await.is_ok()); assert!(ha_healthcheck(&sock).await.is_ok());
assert!(ha_send_data(&sock, &metrics).await.is_ok()); assert!(ha_send_data(&sock, &metrics).await.is_ok());
} }
#[tokio::test] #[tokio::test]
async fn open_unixsocket_test() { async fn open_unixsocket_test() {
let mut list = init_listener().await; let _ = init_listener().await;
assert!(open_unix_socket(TEST_SOCKET).await.is_ok()); assert!(open_unix_socket(TEST_SOCKET).await.is_ok());
} }
} }

View File

@ -27,6 +27,7 @@ use crate::utils::get_container_id;
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
#[allow(dead_code)]
pub async fn init_metrics_grubber() { pub async fn init_metrics_grubber() {
let mut system = System::new(); let mut system = System::new();
// let mut buffer: Vec<PacketInfo> = vec![]; // let mut buffer: Vec<PacketInfo> = vec![];
@ -39,6 +40,8 @@ pub async fn init_metrics_grubber() {
// let _ = capture_packets(shared_buf.clone()).await; // let _ = capture_packets(shared_buf.clone()).await;
} }
#[allow(dead_code)]
#[allow(unused_variables)]
async fn gather_metrics(proc: Arc<Process>) { async fn gather_metrics(proc: Arc<Process>) {
} }
@ -92,6 +95,7 @@ async fn gather_metrics(proc: Arc<Process>) {
/// ///
/// *depends on* : `TrackingProcess` /// *depends on* : `TrackingProcess`
/// ///
#[allow(dead_code)]
async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProcess>>) -> ContainerMetrics { async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProcess>>) -> ContainerMetrics {
let metrics = join!( let metrics = join!(
get_cpu_metrics_container(sys.clone()), get_cpu_metrics_container(sys.clone()),
@ -119,6 +123,7 @@ async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProce
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
#[allow(dead_code)]
async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 { async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
sys.global_cpu_usage() sys.global_cpu_usage()
} }
@ -136,6 +141,7 @@ async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
#[allow(dead_code)]
async fn get_ram_metrics_container(sys: Arc<System>) -> f32 { async fn get_ram_metrics_container(sys: Arc<System>) -> f32 {
(sys.used_memory() / sys.total_memory()) as f32 * 100.0 (sys.used_memory() / sys.total_memory()) as f32 * 100.0
} }
@ -156,6 +162,7 @@ async fn get_ram_metrics_container(sys: Arc<System>) -> f32 {
/// ///
/// *depends on* : `TrackingProcess` /// *depends on* : `TrackingProcess`
/// ///
#[allow(dead_code)]
async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> { async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> {
prcs.iter().map(|process| process.name.clone()).collect() prcs.iter().map(|process| process.name.clone()).collect()
} }
@ -173,6 +180,7 @@ async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> {
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
#[allow(dead_code)]
async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> ProcessMetrics { async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> ProcessMetrics {
let metrics = join!( let metrics = join!(
get_cpu_metrics_process(proc.clone()), get_cpu_metrics_process(proc.clone()),

View File

@ -223,7 +223,6 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
#[cfg(test)] #[cfg(test)]
mod process_unittests { mod process_unittests {
use std::io::Write;
use super::*; use super::*;
// 1 full cycle - start -> restart -> stop // 1 full cycle - start -> restart -> stop
// 2 full cycle - start -> freeze -> unfreze -> stop // 2 full cycle - start -> freeze -> unfreze -> stop