fixed no local config mech

pull/5/head
prplV 2024-11-12 15:24:19 +03:00
parent 7677fa1f55
commit 9fb57ba3a7
5 changed files with 62 additions and 59 deletions

2
Cargo.lock generated
View File

@ -618,7 +618,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]]
name = "runner-rs"
version = "0.5.5"
version = "0.9.24"
dependencies = [
"anyhow",
"chrono",

View File

@ -1,6 +1,6 @@
[package]
name = "runner-rs"
version = "0.5.5"
version = "0.9.24"
edition = "2021"
[profile.dev]

View File

@ -19,7 +19,7 @@ async fn main() {
// setting up redis connection \
// then conf checks to choose the most actual \
let processes: Processes = get_actual_config().unwrap_or_else(|| {
let processes: Processes = get_actual_config().await.unwrap_or_else(|| {
error!("No actual configuration for runner. Stopping...");
std::process::exit(101);
});

View File

@ -22,9 +22,9 @@ fn load_processes(json_filename: &str) -> Option<Processes> {
None
}
pub fn get_actual_config() -> Option<Processes> {
// * if no conf -> loop and +inf getting conf from redis server
// let mut local = load_processes(&CONFIG_PATH);
pub async fn get_actual_config() -> Option<Processes> {
// * if no local conf -> loop and +inf getting conf from redis server
// * if local conf -> once getting conf from redis server
match load_processes(CONFIG_PATH) {
Some(local_conf) => {
info!(
@ -32,6 +32,7 @@ pub fn get_actual_config() -> Option<Processes> {
&local_conf.date_of_creation
);
if let Some(remote_conf) =
// TODO : rework with pubsub mech
once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server))
{
return match config_comparing(&local_conf, &remote_conf) {
@ -51,15 +52,60 @@ pub fn get_actual_config() -> Option<Processes> {
Some(local_conf)
}
None => {
// ? ? OUTSTANDING CONSTRUCTION ?
warn!("No local valid conf was found. Trying to pull remote one...");
let mut conn = get_connection_watcher(&open_watcher("redis://localhost/"));
get_stream_info_watcher(&mut conn);
let remote_config = invalid_config_watcher(&mut conn);
let _ = save_new_config(&remote_config, CONFIG_PATH);
Some(remote_config)
let remote_config = get_remote_conf_watcher(&mut conn).await;
if let Some(conf) = remote_config {
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
let _ = save_new_config(&conf, CONFIG_PATH);
return Some(conf);
}
None
}
}
}
async fn get_remote_conf_watcher(conn : &mut Connection) -> Option<Processes> {
let mut conn = conn.as_pubsub();
let cont = crate::utils::get_container_id();
loop {
match cont {
Some(ref cont) => {
let cont = cont.trim();
if conn.subscribe(cont).is_err() {
// todo : delay
continue;
}
match conn.get_message() {
Ok(msg) => {
let msg: Result<String, redis::RedisError> = msg.get_payload();
if let Ok(payload) = msg {
if let Some(remote) = parse_extern_config(&payload) {
return Some(remote)
}
else {
error!("Pulled invalid config, cannot start. Waiting for remote conf...");
}
} else {
error!("Cannot get Redis message payload. Waiting for remote conf...");
}
// todo : delay
continue;
},
Err(_) => {
// todo : delay
continue;
},
}
},
None => {
error!("Cannot get container id. Returning");
break
},
}
}
None
}
// ! once iter exec
// ! only for situation when local isn't None (no need to fck redis server)
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
@ -154,47 +200,6 @@ fn get_connection_watcher(client: &Client) -> Connection {
}
}
}
fn get_stream_info_watcher(conn: &mut Connection) {
loop {
if let Ok(val) = conn.xlen::<&str, usize>("config_stream") {
if val != 0 {
info!("Redis stream is able and not empty now");
return;
}
}
error!("Configuration pulling from Redis stream failed. Check stream state! Retrying...");
std::thread::sleep(Duration::from_secs(4));
}
}
fn invalid_config_watcher(conn: &mut Connection) -> Processes {
loop {
if let Some(prcs) = get_remote_config(conn) {
info!(
"Got new config from Redis-Server, version - {}",
&prcs.date_of_creation
);
return prcs;
}
error!("Got INVALID configuration. Update config! Retrying...");
std::thread::sleep(Duration::from_secs(4));
}
}
// ! end of watchers
fn get_remote_config(conn: &mut Connection) -> Option<Processes> {
let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
if let Ok(result) = res {
if result.is_empty() {
return None;
}
let config = &result[0];
if !config.is_empty() {
return parse_extern_config(&config[0].1[0].1);
}
}
None
}
fn restart_main_thread() -> std::io::Result<()> {
let current_exe = env::current_exe()?;

View File

@ -35,10 +35,6 @@ async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), std::io::Er
#[cfg(test)]
mod hagent_unittets {
use log::info;
use crate::utils::metrics;
use super::*;
#[tokio::test]
// maybe bool : true -> alive, false -> dead
@ -53,9 +49,11 @@ mod hagent_unittets {
// --Result<maybe Response>
// one-shot func
async fn send_metrics_to_hagent() {
let procm = crate::options::structs::ProcessMetrics::new("test-prc", 15.0, 5.0);
let contm = crate::options::structs::ContainerMetrics::new("test", 32.0, 12.0, vec![procm.process_name.clone()]);
let metrics = crate::options::structs::Metrics::new(contm, vec![procm]);
use crate::options::structs::{ProcessMetrics, ContainerMetrics, Metrics};
let procm = ProcessMetrics::new("test-prc", 15.0, 5.0);
let contm = ContainerMetrics::new("test", 32.0, 12.0, vec![procm.process_name.clone()]);
let metrics = Metrics::new(contm, vec![procm]);
let metrics = &serde_json::to_string_pretty(&metrics).unwrap();
let sock = open_unix_socket().await;