Compare commits

..

No commits in common. "985ddc6065c61746bc7ace6ad032e7bfe0904801" and "6e006dc66566a9d0c8d2a2cd05a6a0e94e2526b5" have entirely different histories.

7 changed files with 117 additions and 170 deletions

2
Cargo.lock generated
View File

@ -618,7 +618,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]] [[package]]
name = "runner-rs" name = "runner-rs"
version = "0.9.25" version = "0.5.5"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "runner-rs" name = "runner-rs"
version = "0.9.25" version = "0.5.5"
edition = "2021" edition = "2021"
[profile.dev] [profile.dev]

View File

@ -19,7 +19,7 @@ async fn main() {
// setting up redis connection \ // setting up redis connection \
// then conf checks to choose the most actual \ // then conf checks to choose the most actual \
let processes: Processes = get_actual_config().await.unwrap_or_else(|| { let processes: Processes = get_actual_config().unwrap_or_else(|| {
error!("No actual configuration for runner. Stopping..."); error!("No actual configuration for runner. Stopping...");
std::process::exit(101); std::process::exit(101);
}); });

View File

@ -1,6 +1,6 @@
use crate::options::structs::*; use crate::options::structs::*;
use log::{error, info, warn}; use log::{error, info, warn};
use redis::{Client, Connection}; use redis::{Client, Commands, Connection, RedisResult};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Write; use std::io::Write;
use std::os::unix::process::CommandExt; use std::os::unix::process::CommandExt;
@ -11,6 +11,7 @@ use tokio::time::Duration;
const CONFIG_PATH: &str = "settings.json"; const CONFIG_PATH: &str = "settings.json";
type Res = RedisResult<Vec<Vec<(String, Vec<(String, String)>)>>>;
// 4ever sync // 4ever sync
fn load_processes(json_filename: &str) -> Option<Processes> { fn load_processes(json_filename: &str) -> Option<Processes> {
if let Ok(res) = fs::read_to_string(json_filename) { if let Ok(res) = fs::read_to_string(json_filename) {
@ -21,9 +22,9 @@ fn load_processes(json_filename: &str) -> Option<Processes> {
None None
} }
pub async fn get_actual_config() -> Option<Processes> { pub fn get_actual_config() -> Option<Processes> {
// * if no local conf -> loop and +inf getting conf from redis server // * if no conf -> loop and +inf getting conf from redis server
// * if local conf -> once getting conf from redis server // let mut local = load_processes(&CONFIG_PATH);
match load_processes(CONFIG_PATH) { match load_processes(CONFIG_PATH) {
Some(local_conf) => { Some(local_conf) => {
info!( info!(
@ -31,7 +32,6 @@ pub async fn get_actual_config() -> Option<Processes> {
&local_conf.date_of_creation &local_conf.date_of_creation
); );
if let Some(remote_conf) = if let Some(remote_conf) =
// TODO : rework with pubsub mech
once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server)) once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server))
{ {
return match config_comparing(&local_conf, &remote_conf) { return match config_comparing(&local_conf, &remote_conf) {
@ -51,103 +51,61 @@ pub async fn get_actual_config() -> Option<Processes> {
Some(local_conf) Some(local_conf)
} }
None => { None => {
warn!("No local valid conf was found. Trying to pull remote one..."); // ? ? OUTSTANDING CONSTRUCTION ?
let mut conn = get_connection_watcher(&open_watcher("redis://localhost/")); let mut conn = get_connection_watcher(&open_watcher("redis://localhost/"));
let remote_config = get_remote_conf_watcher(&mut conn).await; get_stream_info_watcher(&mut conn);
if let Some(conf) = remote_config { let remote_config = invalid_config_watcher(&mut conn);
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); let _ = save_new_config(&remote_config, CONFIG_PATH);
let _ = save_new_config(&conf, CONFIG_PATH); Some(remote_config)
return Some(conf);
}
None
} }
} }
} }
async fn get_remote_conf_watcher(conn : &mut Connection) -> Option<Processes> {
let mut conn = conn.as_pubsub();
let cont = crate::utils::get_container_id();
loop {
match cont {
Some(ref cont) => {
let cont = cont.trim();
if conn.subscribe(cont).is_err() {
// todo : delay
continue;
}
match conn.get_message() {
Ok(msg) => {
let msg: Result<String, redis::RedisError> = msg.get_payload();
if let Ok(payload) = msg {
if let Some(remote) = parse_extern_config(&payload) {
return Some(remote)
}
else {
error!("Pulled invalid config, cannot start. Waiting for remote conf...");
}
} else {
error!("Cannot get Redis message payload. Waiting for remote conf...");
}
// todo : delay
continue;
},
Err(_) => {
// todo : delay
continue;
},
}
},
None => {
error!("Cannot get container id. Returning");
break
},
}
}
None
}
// ! once iter exec // ! once iter exec
// ! only for situation when local isn't None (no need to fck redis server) // ! only for situation when local isn't None (no need to fck redis server)
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> { fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
let cont = crate::utils::get_container_id();
match Client::open(serv_info) { match Client::open(serv_info) {
Ok(client) => { Ok(client) => {
match client.get_connection() { match client.get_connection() {
Ok(mut conn) => { Ok(mut conn) => {
let mut conn = conn.as_pubsub(); if let Ok(len) = conn.xlen::<&str, usize>("config_stream") {
match conn.subscribe(cont) { if len == 0 {
Ok(_) => { warn!("No configuration in DB yet");
if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() { None
error!("Cannot set reading pubsub timeout and pull remote config"); } else {
return None; let conf: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
} let config: &Vec<(String, Vec<(String, String)>)>;
match conn.get_message() {
Ok(msg) => { if conf.is_ok() {
info!("Pulled config from Redis Server"); // guaranteed and safe unwrapping
let get_payload: Result<String, redis::RedisError> = msg.get_payload(); let conf = conf.unwrap();
match get_payload { config = &conf[0];
Ok(payload) => { if config.is_empty() {
let remote = parse_extern_config(&payload); error!(
if remote.is_none() { "Empty config was pulled. Check stream and configs state!"
error!("Pulled config is invalid. Check it in Redis Server"); );
}
return remote;
},
Err(_) => {
error!("Cannot extract payload from new message. Check Redis Server state");
return None;
},
}
},
Err(_) => {
warn!("Cannot get config from Redis Server. Empty channel");
return None; return None;
}, }
match parse_extern_config(&config[0].1[0].1) {
Some(prcs) => {
info!(
"Config {} was pulled from Redis-Server",
&prcs.date_of_creation
);
Some(prcs)
}
None => {
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
None
}
}
} else {
error!("Configuration pulling from Redis stream failed. Check stream state!");
None
} }
},
Err(_) => {
error!("Redis subscription process failed. Check Redis configuration!");
return None;
} }
} else {
error!("Cannot find config_stream. Check Redis-stream accessibility!");
None
} }
} }
Err(_) => { Err(_) => {
@ -196,6 +154,47 @@ fn get_connection_watcher(client: &Client) -> Connection {
} }
} }
} }
fn get_stream_info_watcher(conn: &mut Connection) {
loop {
if let Ok(val) = conn.xlen::<&str, usize>("config_stream") {
if val != 0 {
info!("Redis stream is able and not empty now");
return;
}
}
error!("Configuration pulling from Redis stream failed. Check stream state! Retrying...");
std::thread::sleep(Duration::from_secs(4));
}
}
fn invalid_config_watcher(conn: &mut Connection) -> Processes {
loop {
if let Some(prcs) = get_remote_config(conn) {
info!(
"Got new config from Redis-Server, version - {}",
&prcs.date_of_creation
);
return prcs;
}
error!("Got INVALID configuration. Update config! Retrying...");
std::thread::sleep(Duration::from_secs(4));
}
}
// ! end of watchers
fn get_remote_config(conn: &mut Connection) -> Option<Processes> {
let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
if let Ok(result) = res {
if result.is_empty() {
return None;
}
let config = &result[0];
if !config.is_empty() {
return parse_extern_config(&config[0].1[0].1);
}
}
None
}
fn restart_main_thread() -> std::io::Result<()> { fn restart_main_thread() -> std::io::Result<()> {
let current_exe = env::current_exe()?; let current_exe = env::current_exe()?;

View File

@ -84,7 +84,7 @@ pub struct FIleTriggers {
/// ///
/// ///
#[derive(Debug, Clone, Serialize,)] #[derive(Debug, Clone)]
pub struct Metrics { pub struct Metrics {
pub container_metrics : ContainerMetrics, pub container_metrics : ContainerMetrics,
pub processes_metrics : Vec<ProcessMetrics>, pub processes_metrics : Vec<ProcessMetrics>,
@ -103,19 +103,17 @@ impl Metrics {
/// ///
/// ///
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone)]
pub struct ContainerMetrics { pub struct ContainerMetrics {
container_id : String, pub cpu_load : f32,
cpu_load : f32, pub ram_load : f32,
ram_load : f32,
// pub net_activity : ??? // pub net_activity : ???
processes : Vec<String>, pub processes : Vec<String>,
} }
impl ContainerMetrics { impl ContainerMetrics {
pub fn new(container_id : &str, cpu: f32, ram: f32, subsystems: Vec<String>,) -> Self{ pub fn new(cpu: f32, ram: f32, subsystems: Vec<String>,) -> Self{
ContainerMetrics { ContainerMetrics {
container_id : String::from(container_id),
cpu_load : cpu, cpu_load : cpu,
ram_load : ram, ram_load : ram,
processes : subsystems, processes : subsystems,
@ -125,24 +123,22 @@ impl ContainerMetrics {
/// ///
/// ///
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone)]
pub struct ProcessMetrics { pub struct ProcessMetrics {
pub process_name : String, pub cpu_load : f32,
cpu_load : f32, pub ram_load : f32,
ram_load : f32,
} }
impl ProcessMetrics { impl ProcessMetrics {
pub fn new(process_name :&str, cpu: f32, ram: f32) -> Self { pub fn new(cpu: f32, ram: f32) -> Self {
ProcessMetrics { ProcessMetrics {
process_name : String::from(process_name),
cpu_load : cpu, cpu_load : cpu,
ram_load : ram, ram_load : ram,
} }
} }
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone)]
pub struct PacketInfo { pub struct PacketInfo {
protocol : String, protocol : String,
dst_ip : Ipv4Addr, dst_ip : Ipv4Addr,

View File

@ -1,37 +1,13 @@
// module needed to check host-agent health condition and to communicate with it // module needed to check host-agent health condition and to communicate with it
/// asdasdasds use tokio::net::UnixStream;
use tokio::{io::Interest, net::UnixStream}; //
// code will be here
//
async fn open_unix_socket() -> Result<UnixStream, std::io::Error> { async fn open_unix_socket() -> Result<UnixStream, std::io::Error> {
let socket = UnixStream::connect("/var/run/enode/hostagent.sock").await?; let socket = UnixStream::connect("/var/run/enode/hostagent.sock=").await?;
Ok(socket) Ok(socket)
} }
async fn ha_healthcheck(socket: &UnixStream) -> Result<(), std::io::Error >{
socket.ready(Interest::WRITABLE).await?;
if socket.writable().await.is_ok() {
if let Err(er) = socket.try_write(b"Hello HAgent") {
return Err(er);
}
} else {
return Err(std::io::ErrorKind::WouldBlock.into());
}
Ok(())
}
async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), std::io::Error > {
socket.ready(Interest::WRITABLE).await?;
if socket.writable().await.is_ok() {
if let Err(er) = socket.try_write(data.as_bytes()) {
return Err(er);
}
} else {
return Err(std::io::ErrorKind::WouldBlock.into());
}
Ok(())
}
#[cfg(test)] #[cfg(test)]
mod hagent_unittets { mod hagent_unittets {
use super::*; use super::*;
@ -39,28 +15,13 @@ mod hagent_unittets {
// maybe bool : true -> alive, false -> dead // maybe bool : true -> alive, false -> dead
// simple request on api // simple request on api
async fn hagent_healthcheck() { async fn hagent_healthcheck() {
let sock = open_unix_socket().await; assert!(true);
assert!(sock.is_ok());
let sock = sock.unwrap();
assert!(ha_healthcheck(&sock).await.is_ok());
} }
#[tokio::test] #[tokio::test]
// --Result<maybe Response> // Result<maybe Response>
// one-shot func // one-shot func
async fn send_metrics_to_hagent() { async fn send_metrics_to_hagent() {
use crate::options::structs::{ProcessMetrics, ContainerMetrics, Metrics}; assert!(true);
let procm = ProcessMetrics::new("test-prc", 15.0, 5.0);
let contm = ContainerMetrics::new("test", 32.0, 12.0, vec![procm.process_name.clone()]);
let metrics = Metrics::new(contm, vec![procm]);
let metrics = &serde_json::to_string_pretty(&metrics).unwrap();
let sock = open_unix_socket().await;
assert!(sock.is_ok());
let sock = sock.unwrap();
assert!(ha_healthcheck(&sock).await.is_ok());
assert!(ha_send_data(&sock, &metrics).await.is_ok());
} }
#[tokio::test] #[tokio::test]
async fn open_unixsocket_test() { async fn open_unixsocket_test() {

View File

@ -2,15 +2,15 @@
// cpu load, ram/rom load and net activity // cpu load, ram/rom load and net activity
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::Arc; use std::{str::FromStr, sync::Arc};
use crate::options::structs::TrackingProcess; use crate::options::structs::TrackingProcess;
use sysinfo::{Process, System}; use sysinfo::{Pid, Process, System};
use tokio::join; use tokio::join;
use crate::utils::prcs::get_pid;
use crate::options::structs::{ProcessMetrics, ContainerMetrics, PacketInfo}; use crate::options::structs::{ProcessMetrics, ContainerMetrics, PacketInfo};
use crate::utils::get_container_id; use pcap::{Device, Capture, Active};
// use pcap::{Device, Capture, Active}; use std::net::Ipv4Addr;
// use std::net::Ipv4Addr; use anyhow::{Result, Ok};
// use anyhow::{Result, Ok};
type PacketBuffer = Arc<Mutex<Vec<PacketInfo>>>; type PacketBuffer = Arc<Mutex<Vec<PacketInfo>>>;
@ -84,12 +84,7 @@ async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProce
get_ram_metrics_container(sys.clone()), get_ram_metrics_container(sys.clone()),
get_subsystems(prcs.clone()) get_subsystems(prcs.clone())
); );
ContainerMetrics::new( ContainerMetrics::new(metrics.0, metrics.1, metrics.2)
&get_container_id().unwrap_or(String::from("unknown")),
metrics.0,
metrics.1,
metrics.2
)
} }
async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 { async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
sys.global_cpu_usage() sys.global_cpu_usage()
@ -112,11 +107,7 @@ async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> Proces
get_cpu_metrics_process(proc.clone()), get_cpu_metrics_process(proc.clone()),
get_ram_metrics_process(proc.clone(), sys.clone()) get_ram_metrics_process(proc.clone(), sys.clone())
); );
ProcessMetrics::new( ProcessMetrics::new(metrics.0, metrics.1)
proc.name().to_str().unwrap_or("unknown"),
metrics.0,
metrics.1
)
} }
async fn get_cpu_metrics_process(proc: Arc<Process>) -> f32 { async fn get_cpu_metrics_process(proc: Arc<Process>) -> f32 {
proc.cpu_usage() proc.cpu_usage()