diff --git a/Cargo.lock b/Cargo.lock index 2706e0d..0b7e9be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +# It is not intended for manual editing version = 3 [[package]] diff --git a/settings.json b/settings.json index ff72861..da6fa64 100644 --- a/settings.json +++ b/settings.json @@ -1,6 +1,6 @@ { - "dateOfCreation": "1721381809103", - "configServer" : "localhost", + "dateOfCreation": "1721381809104", + "configServer": "localhost", "processes": [ { "name": "temp-process", @@ -31,3 +31,4 @@ } ] } + diff --git a/src/options/config.rs b/src/options/config.rs index 3cc8e64..cce619d 100644 --- a/src/options/config.rs +++ b/src/options/config.rs @@ -1,6 +1,6 @@ use crate::options::structs::*; use log::{error, info, warn}; -use redis::{Client, Commands, Connection, RedisResult}; +use redis::{Client, Commands, Connection, PubSub, RedisResult}; use std::fs::OpenOptions; use std::io::Write; use std::os::unix::process::CommandExt; @@ -204,25 +204,52 @@ fn restart_main_thread() -> std::io::Result<()> { pub async fn subscribe_config_stream(actual_prcs: Arc) -> Result<(), CustomError> { if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) { if let Ok(mut conn) = client.get_connection() { - info!("Runner subscribed on config update"); - loop { - tokio::time::sleep(Duration::from_secs(30)).await; - if let Some(prcs) = get_remote_config(&mut conn) { - match config_comparing(&actual_prcs, &prcs) { - ConfigActuality::Remote => { - info!("New config was pulled. Saving and restarting..."); - if save_new_config(&prcs, CONFIG_PATH).is_err() { - error!("Error with saving new config to {}", &CONFIG_PATH); - return Err(CustomError::Fatal); - } - if restart_main_thread().is_err() { - error!("Error with restarting Runner. Stopping sub mechanism..."); - return Err(CustomError::Fatal); + match crate::utils::get_container_id() { + Some(channel_name) => { + let channel_name = channel_name.trim(); + let mut pubsub = conn.as_pubsub(); + if pubsub.subscribe(&channel_name).is_ok() { + info!("Runner subscribed on config update publishing in channel {}", &channel_name); + loop { + if let Ok(msg) = pubsub.get_message() { + info!("New config was pulled from Redis Server"); + let get_remote_config: Result = msg.get_payload(); + match get_remote_config { + Ok(payload) => { + if let Some(remote_config) = parse_extern_config(&payload) { + match config_comparing(&actual_prcs, &remote_config) { + ConfigActuality::Remote => { + warn!("Pulled config is actual. Saving and restarting..."); + if save_new_config(&remote_config, CONFIG_PATH).is_err() { + error!("Error with saving new config to {}. Stopping sub mechanism...", &CONFIG_PATH); + return Err(CustomError::Fatal); + } + if restart_main_thread().is_err() { + error!("Error with restarting Runner. Stopping sub mechanism..."); + return Err(CustomError::Fatal); + } + } + _ => continue, + } + } + else { + error!("Invalid conig was pulled"); + } + }, + Err(_) => { + error!("Cannot extract new config from message"); + break; + }, + } } + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; } - _ => continue, + } else { + error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name); } - return Ok(()); + }, + None => { + error!("Cannot get channel name"); } } } diff --git a/src/utils/hagent.rs b/src/utils/hagent.rs index 9ae224b..39c464c 100644 --- a/src/utils/hagent.rs +++ b/src/utils/hagent.rs @@ -1,11 +1,10 @@ // module needed to check host-agent health condition and to communicate with it -use crate::options::structs::CustomError; use tokio::net::UnixStream; // // code will be here // async fn open_unix_socket() -> Result { - let socket = UnixStream::connect("/var/run/runner-rs.sock").await?; + let socket = UnixStream::connect("/var/run/enode/hostagent.sock=").await?; Ok(socket) }