From 535398e58a006c23bf8b330be719c185ba38d7d0 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 12 Nov 2024 16:14:11 +0300 Subject: [PATCH] fixed all config error and questions --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/options/config.rs | 74 ++++++++++++++++++++----------------------- src/utils/hagent.rs | 5 ++- 4 files changed, 39 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc70f37..a9efa2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,7 +618,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "runner-rs" -version = "0.9.24" +version = "0.9.25" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 1216761..67d68e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "runner-rs" -version = "0.9.24" +version = "0.9.25" edition = "2021" [profile.dev] diff --git a/src/options/config.rs b/src/options/config.rs index af233c5..7643fdd 100644 --- a/src/options/config.rs +++ b/src/options/config.rs @@ -1,6 +1,6 @@ use crate::options::structs::*; use log::{error, info, warn}; -use redis::{Client, Commands, Connection, RedisResult}; +use redis::{Client, Connection}; use std::fs::OpenOptions; use std::io::Write; use std::os::unix::process::CommandExt; @@ -11,7 +11,6 @@ use tokio::time::Duration; const CONFIG_PATH: &str = "settings.json"; -type Res = RedisResult)>>>; // 4ever sync fn load_processes(json_filename: &str) -> Option { if let Ok(res) = fs::read_to_string(json_filename) { @@ -109,49 +108,46 @@ async fn get_remote_conf_watcher(conn : &mut Connection) -> Option { // ! once iter exec // ! only for situation when local isn't None (no need to fck redis server) fn once_get_remote_configuration(serv_info: &str) -> Option { + let cont = crate::utils::get_container_id(); match Client::open(serv_info) { Ok(client) => { match client.get_connection() { Ok(mut conn) => { - if let Ok(len) = conn.xlen::<&str, usize>("config_stream") { - if len == 0 { - warn!("No configuration in DB yet"); - None - } else { - let conf: Res = conn.xrevrange_count("config_stream", "+", "-", 1); - let config: &Vec<(String, Vec<(String, String)>)>; - - if conf.is_ok() { - // guaranteed and safe unwrapping - let conf = conf.unwrap(); - config = &conf[0]; - if config.is_empty() { - error!( - "Empty config was pulled. Check stream and configs state!" - ); - return None; - } - match parse_extern_config(&config[0].1[0].1) { - Some(prcs) => { - info!( - "Config {} was pulled from Redis-Server", - &prcs.date_of_creation - ); - Some(prcs) - } - None => { - error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); - None - } - } - } else { - error!("Configuration pulling from Redis stream failed. Check stream state!"); - None + let mut conn = conn.as_pubsub(); + match conn.subscribe(cont) { + Ok(_) => { + if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() { + error!("Cannot set reading pubsub timeout and pull remote config"); + return None; } + match conn.get_message() { + Ok(msg) => { + info!("Pulled config from Redis Server"); + let get_payload: Result = msg.get_payload(); + match get_payload { + Ok(payload) => { + let remote = parse_extern_config(&payload); + if remote.is_none() { + error!("Pulled config is invalid. Check it in Redis Server"); + } + return remote; + }, + Err(_) => { + error!("Cannot extract payload from new message. Check Redis Server state"); + return None; + }, + } + }, + Err(_) => { + warn!("Cannot get config from Redis Server. Empty channel"); + return None; + }, + } + }, + Err(_) => { + error!("Redis subscription process failed. Check Redis configuration!"); + return None; } - } else { - error!("Cannot find config_stream. Check Redis-stream accessibility!"); - None } } Err(_) => { diff --git a/src/utils/hagent.rs b/src/utils/hagent.rs index 3f02098..6e5910f 100644 --- a/src/utils/hagent.rs +++ b/src/utils/hagent.rs @@ -1,8 +1,7 @@ // module needed to check host-agent health condition and to communicate with it +/// asdasdasds use tokio::{io::Interest, net::UnixStream}; -// -// code will be here -// + async fn open_unix_socket() -> Result { let socket = UnixStream::connect("/var/run/enode/hostagent.sock").await?; Ok(socket)