config subscribe mech rework

pull/9/head
prplV 2024-11-08 10:49:37 +03:00
parent 52431ffd6f
commit 0a4241e356
4 changed files with 49 additions and 21 deletions

1
Cargo.lock generated
View File

@ -1,5 +1,6 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
# It is not intended for manual editing
version = 3 version = 3
[[package]] [[package]]

View File

@ -1,6 +1,6 @@
{ {
"dateOfCreation": "1721381809103", "dateOfCreation": "1721381809104",
"configServer" : "localhost", "configServer": "localhost",
"processes": [ "processes": [
{ {
"name": "temp-process", "name": "temp-process",
@ -31,3 +31,4 @@
} }
] ]
} }

View File

@ -1,6 +1,6 @@
use crate::options::structs::*; use crate::options::structs::*;
use log::{error, info, warn}; use log::{error, info, warn};
use redis::{Client, Commands, Connection, RedisResult}; use redis::{Client, Commands, Connection, PubSub, RedisResult};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Write; use std::io::Write;
use std::os::unix::process::CommandExt; use std::os::unix::process::CommandExt;
@ -204,15 +204,24 @@ fn restart_main_thread() -> std::io::Result<()> {
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> { pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> {
if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) { if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) {
if let Ok(mut conn) = client.get_connection() { if let Ok(mut conn) = client.get_connection() {
info!("Runner subscribed on config update"); match crate::utils::get_container_id() {
Some(channel_name) => {
let channel_name = channel_name.trim();
let mut pubsub = conn.as_pubsub();
if pubsub.subscribe(&channel_name).is_ok() {
info!("Runner subscribed on config update publishing in channel {}", &channel_name);
loop { loop {
tokio::time::sleep(Duration::from_secs(30)).await; if let Ok(msg) = pubsub.get_message() {
if let Some(prcs) = get_remote_config(&mut conn) { info!("New config was pulled from Redis Server");
match config_comparing(&actual_prcs, &prcs) { let get_remote_config: Result<String, redis::RedisError> = msg.get_payload();
match get_remote_config {
Ok(payload) => {
if let Some(remote_config) = parse_extern_config(&payload) {
match config_comparing(&actual_prcs, &remote_config) {
ConfigActuality::Remote => { ConfigActuality::Remote => {
info!("New config was pulled. Saving and restarting..."); warn!("Pulled config is actual. Saving and restarting...");
if save_new_config(&prcs, CONFIG_PATH).is_err() { if save_new_config(&remote_config, CONFIG_PATH).is_err() {
error!("Error with saving new config to {}", &CONFIG_PATH); error!("Error with saving new config to {}. Stopping sub mechanism...", &CONFIG_PATH);
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
if restart_main_thread().is_err() { if restart_main_thread().is_err() {
@ -222,7 +231,25 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(),
} }
_ => continue, _ => continue,
} }
return Ok(()); }
else {
error!("Invalid conig was pulled");
}
},
Err(_) => {
error!("Cannot extract new config from message");
break;
},
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
}
} else {
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
}
},
None => {
error!("Cannot get channel name");
} }
} }
} }

View File

@ -1,11 +1,10 @@
// module needed to check host-agent health condition and to communicate with it // module needed to check host-agent health condition and to communicate with it
use crate::options::structs::CustomError;
use tokio::net::UnixStream; use tokio::net::UnixStream;
// //
// code will be here // code will be here
// //
async fn open_unix_socket() -> Result<UnixStream, std::io::Error> { async fn open_unix_socket() -> Result<UnixStream, std::io::Error> {
let socket = UnixStream::connect("/var/run/runner-rs.sock").await?; let socket = UnixStream::connect("/var/run/enode/hostagent.sock=").await?;
Ok(socket) Ok(socket)
} }