monitor/src/options/config.rs

517 lines
18 KiB
Rust

use crate::options::structs::*;
use log::{error, info, warn};
use redis::{Client, Connection};
use std::fs::OpenOptions;
use std::io::Write;
use std::os::unix::process::CommandExt;
use std::process::Command;
use std::sync::Arc;
use std::{env, fs};
use tokio::time::Duration;
const CONFIG_PATH: &str = "settings.json";
/// # Fn `load_processes`
/// ## for reading and parsing *local* storing config
///
/// *input* : `&str`
///
/// *output* : `None` if local conf file doesn't exist or invalid | `Some(conf)` on finish reading and parsing
///
/// *initiator* : func `get_actual_config`
///
/// *managing* : conf file name in `&str` format
///
/// *depends on* : struct `Processes`
///
fn load_processes(json_filename: &str) -> Option<Processes> {
if let Ok(res) = fs::read_to_string(json_filename) {
if let Ok(conf) = serde_json::from_str::<Processes>(&res) {
return Some(conf);
}
}
None
}
/// # Fn `get_actual_config`
/// ## for getting actual Monitor's config from local and remote storages
///
/// *input* : -
///
/// *output* : `None` on fatal error in mechanisms | `Some(conf)` on finish reading and parsing
///
/// *initiator* : main thread
///
/// *managing* : -
///
/// *depends on* : struct `Processes`
///
pub async fn get_actual_config() -> Option<Processes> {
// * if no local conf -> loop and +inf getting conf from redis server
// * if local conf -> once getting conf from redis server
match load_processes(CONFIG_PATH) {
Some(local_conf) => {
info!(
"Found local configuration, version - {}",
&local_conf.date_of_creation
);
if let Some(remote_conf) =
// TODO : rework with pubsub mech
once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server))
{
return match config_comparing(&local_conf, &remote_conf) {
ConfigActuality::Local => {
info!("Local config is actual");
Some(local_conf)
}
ConfigActuality::Remote => {
info!("Pulled config is more actual. Saving changes!");
if save_new_config(&remote_conf, CONFIG_PATH).is_err() {
error!("Saving changes process failed due to unexpected error...")
}
Some(remote_conf)
}
};
}
Some(local_conf)
}
None => {
warn!("No local valid conf was found. Trying to pull remote one...");
let mut conn = get_connection_watcher(&open_watcher("redis://localhost/"));
let remote_config = get_remote_conf_watcher(&mut conn).await;
if let Some(conf) = remote_config {
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
let _ = save_new_config(&conf, CONFIG_PATH);
return Some(conf);
}
None
}
}
}
/// # Fn `get_remote_conf_watcher`
/// ## for infinitive pulling remote config
///
/// *input* : `&mut Connection`
///
/// *output* : `None` on fatal error | `Some(conf)` on succesfull pulling
///
/// *initiator* : fn `get_actual_config`
///
/// *managing* : mut ref `Connection` object
///
/// *depends on* : struct `Processes`
///
async fn get_remote_conf_watcher(conn : &mut Connection) -> Option<Processes> {
let mut conn = conn.as_pubsub();
let cont = crate::utils::get_container_id();
loop {
match cont {
Some(ref cont) => {
let cont = cont.trim();
if conn.subscribe(cont).is_err() {
// todo : delay
continue;
}
match conn.get_message() {
Ok(msg) => {
let msg: Result<String, redis::RedisError> = msg.get_payload();
if let Ok(payload) = msg {
if let Some(remote) = parse_extern_config(&payload) {
return Some(remote)
}
else {
error!("Pulled invalid config, cannot start. Waiting for remote conf...");
}
} else {
error!("Cannot get Redis message payload. Waiting for remote conf...");
}
// todo : delay
continue;
},
Err(_) => {
// todo : delay
continue;
},
}
},
None => {
error!("Cannot get container id. Returning");
break
},
}
}
None
}
/// # Fn `get_remote_conf_watcher`
/// ## for trying to pull remote config
///
/// > only for situation when local isn't None (no need to fck redis server)
///
/// *input* : `&str`
///
/// *output* : `None` on empty pubsub or error | `Some(conf)` on succesfull pulling
///
/// *initiator* : fn `get_actual_config`
///
/// *managing* : &str of Redis Server credentials
///
/// *depends on* : struct `Processes`
///
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
let cont = crate::utils::get_container_id();
match Client::open(serv_info) {
Ok(client) => {
match client.get_connection() {
Ok(mut conn) => {
let mut conn = conn.as_pubsub();
match conn.subscribe(cont) {
Ok(_) => {
if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() {
error!("Cannot set reading pubsub timeout and pull remote config");
return None;
}
match conn.get_message() {
Ok(msg) => {
info!("Pulled config from Redis Server");
let get_payload: Result<String, redis::RedisError> = msg.get_payload();
match get_payload {
Ok(payload) => {
let remote = parse_extern_config(&payload);
if remote.is_none() {
error!("Pulled config is invalid. Check it in Redis Server");
}
return remote;
},
Err(_) => {
error!("Cannot extract payload from new message. Check Redis Server state");
return None;
},
}
},
Err(_) => {
warn!("Cannot get config from Redis Server. Empty channel");
return None;
},
}
},
Err(_) => {
error!("Redis subscription process failed. Check Redis configuration!");
return None;
}
}
}
Err(_) => {
error!("Redis connection attempt is failed. Check Redis configuration!");
None
}
}
}
Err(_) => {
error!("Redis-Client opening attempt is failed. Check network configuration!");
None
}
}
}
// ! watchers
/// # Fn `open_watcher`
/// ## for infinitive opening Redis client
///
/// > only for situation when local isn't None (no need to fck redis server)
///
/// *input* : `Option<Processes>`
///
/// *output* : redis::Client on successful opening client
///
/// *initiator* : fn `get_actual_config`
///
/// *managing* : &str of Redis Server credentials
///
/// *depends on* : struct `redis::Client`
///
fn open_watcher(serv_info: &str) -> Client {
loop {
match Client::open(serv_info) {
Ok(redis) => {
info!("Successfully opened Redis-Client");
return redis;
}
Err(_) => {
error!("Redis-Client opening attempt is failed. Check network configuration! Retrying...");
std::thread::sleep(Duration::from_secs(4));
}
}
}
}
/// # Fn `get_connection_watcher`
/// ## for infinitive establishing Redis connection on existing client
///
/// > only for situation when local isn't None (no need to fck redis server)
///
/// *input* : `&Client`
///
/// *output* : `Connection`
///
/// *initiator* : fn `get_actual_config`
///
/// *managing* : &Client for opening connection
///
/// *depends on* : struct `redis::Connection`
///
fn get_connection_watcher(client: &Client) -> Connection {
loop {
match client.get_connection() {
Ok(conn) => {
info!("Successfully got Redis connection object");
return conn;
}
Err(_) => {
error!(
"Redis connection attempt is failed. Check Redis configuration! Retrying..."
);
std::thread::sleep(Duration::from_secs(4));
}
}
}
}
/// # Fn `restart_main_thread`
/// ## for restart monitor with new config
///
/// *input* : -
///
/// *output* : `Ok(())` on valid restart | `Err(er)` on error
///
/// *initiator* : fn `subscribe_config_stream`
///
/// *managing* : -
///
/// *depends on* : -
///
fn restart_main_thread() -> std::io::Result<()> {
let current_exe = env::current_exe()?;
Command::new(current_exe).exec();
Ok(())
}
/// # Fn `subscribe_config_stream`
/// ## for subscribe on changes, pulling to Redis pubsub to get more actual config
///
/// *input* : `Arc<Processes>`
///
/// *output* : `Ok(())` on end of work | `Err(er)` on error with subscribing mechanism
///
/// *initiator* : fn `subscribe_config_stream`
///
/// *managing* : `Arc<Processes>` to compare old config with new pulled
///
/// *depends on* : `Processes`
///
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> {
if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) {
if let Ok(mut conn) = client.get_connection() {
match crate::utils::get_container_id() {
Some(channel_name) => {
let channel_name = channel_name.trim();
let mut pubsub = conn.as_pubsub();
if pubsub.subscribe(&channel_name).is_ok() {
info!("Runner subscribed on config update publishing in channel {}", &channel_name);
loop {
if let Ok(msg) = pubsub.get_message() {
info!("New config was pulled from Redis Server");
let get_remote_config: Result<String, redis::RedisError> = msg.get_payload();
match get_remote_config {
Ok(payload) => {
if let Some(remote_config) = parse_extern_config(&payload) {
match config_comparing(&actual_prcs, &remote_config) {
ConfigActuality::Remote => {
warn!("Pulled config is actual. Saving and restarting...");
if save_new_config(&remote_config, CONFIG_PATH).is_err() {
error!("Error with saving new config to {}. Stopping sub mechanism...", &CONFIG_PATH);
return Err(CustomError::Fatal);
}
if restart_main_thread().is_err() {
error!("Error with restarting Runner. Stopping sub mechanism...");
return Err(CustomError::Fatal);
}
}
_ => continue,
}
}
else {
error!("Invalid conig was pulled");
}
},
Err(_) => {
error!("Cannot extract new config from message");
break;
},
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
}
} else {
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
}
},
None => {
error!("Cannot get channel name");
}
}
}
}
error!("Error with subscribing Redis stream on update. Working only with selected config...");
Err(CustomError::Fatal)
}
/// # Fn `config_comparing`
/// ## for compare old and new configs
///
/// *input* : local: `&Processes`, remote: `&Processes`
///
/// *output* : `ConfigActuality::Local` or `ConfigActuality::Remote`
///
/// *initiator* : fn `subscribe_config_stream`, fn `get_actual_config`
///
/// *managing* : two objects `&Processes`
///
/// *depends on* : `Processes`, `ConfigActuality`
///
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
let local_date: u64 = local.date_of_creation.parse().unwrap();
let remote_date: u64 = remote.date_of_creation.parse().unwrap();
match local_date.cmp(&remote_date) {
std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => ConfigActuality::Local,
std::cmp::Ordering::Less => ConfigActuality::Remote,
}
}
// ! TEMPORARILY DEPRECATED !
// fn native_date_from_millis(mls: &str) -> Option<chrono::DateTime<Utc>> {
// match mls.parse::<i64>(){
// Ok(val) => return chrono::DateTime::from_timestamp_millis(val),
// Err(_) => return None,
// }
// }
/// # Fn `save_new_config`
/// ## mechanism for saving new config in local storage
///
/// *input* : `&Processes`, `&str`
///
/// *output* : `Ok(())` on succesfull saving | Err(er) on fs error
///
/// *initiator* : fn `subscribe_config_stream`, fn `get_actual_config`
///
/// *managing* : new config object: `&Processes` and config file name: `&str`
///
/// *depends on* : `Processes`
///
fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> {
match serde_json::to_string_pretty(&config) {
// Ok(st) => match fs::write(config_file, st) {
// Ok(_) => Ok(()),
// Err(_) => Err(CustomError::Fatal),
// },
Ok(st) => {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(config_file);
match file {
Ok(fs) => {
let mut writer = fs;
match writeln!(writer, "{}", st) {
Ok(_) => Ok(()),
Err(_) => Err(CustomError::Fatal),
}
}
Err(_) => return Err(CustomError::Fatal),
}
}
Err(_) => Err(CustomError::Fatal),
}
}
/// # Fn `parse_extern_config`
/// ## for parsing &str to Processes
///
/// *input* : `&str`
///
/// *output* : parsed config in Some(Processes) | None on error with parsing
///
/// *initiator* : fn `subscribe_config_stream`, fn `once_get_remote_configuration`, fn `get_remote_conf`
///
/// *managing* : unparsed config `&str`
///
/// *depends on* : `Processes`
///
fn parse_extern_config(json_string: &str) -> Option<Processes> {
if let Ok(des) = serde_json::from_str::<Processes>(json_string) {
return Some(des);
}
None
}
// unit tests
#[cfg(test)]
mod config_unittests {
use super::*;
#[test]
fn parsing_valid_conf() {
assert!(load_processes("tests/examples/settings.json").is_some());
}
#[test]
fn parsing_invalid_conf() {
assert!(load_processes("tests/examples/invalid_config.json").is_none());
}
#[test]
fn configuration_comparing() {
// old one (kinda local)
let a = Processes {
date_of_creation: String::from("1"),
config_server: String::new(),
processes: vec![],
};
// new one (kinda remote)
let b = Processes {
date_of_creation: String::from("2"),
config_server: String::new(),
processes: vec![],
};
assert_eq!(config_comparing(&a, &b), ConfigActuality::Remote);
}
// TODO : strange output
// #[test]
// fn get_actual_config_mechanism() {
// assert!(get_actual_config().is_some())
// }
#[test]
fn save_config() {
let a = Processes {
date_of_creation: String::from("1"),
config_server: String::new(),
processes: vec![],
};
assert!(save_new_config(&a, "tests/examples/save-conf.json").is_ok());
}
#[test]
fn save_to_zero_file() {
let a = Processes {
date_of_creation: String::from("1"),
config_server: String::new(),
processes: vec![],
};
assert!(save_new_config(&a, "tests/examples/none.json").is_ok());
}
}