config ref
parent
e0720a5f54
commit
5f63459e4f
|
|
@ -414,259 +414,6 @@ fn load_processes(json_filename: &str) -> Option<Processes> {
|
|||
None
|
||||
}
|
||||
|
||||
/// # Fn `get_actual_config`
|
||||
/// ## for getting actual Monitor's config from local and remote storages
|
||||
///
|
||||
/// *input* : -
|
||||
///
|
||||
/// *output* : `None` on fatal error in mechanisms | `Some(conf)` on finish reading and parsing
|
||||
///
|
||||
/// *initiator* : main thread
|
||||
///
|
||||
/// *managing* : -
|
||||
///
|
||||
/// *depends on* : struct `Processes`
|
||||
///
|
||||
pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes> {
|
||||
// * if no local conf -> loop and +inf getting conf from redis server
|
||||
// * if local conf -> once getting conf from redis server
|
||||
let config_path = params.config.to_str().unwrap_or_else(|| {
|
||||
error!("Invalid character in config file. Config path was set to default");
|
||||
"settings.json"
|
||||
});
|
||||
info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url);
|
||||
match load_processes(config_path) {
|
||||
Some(local_conf) => {
|
||||
info!(
|
||||
"Found local configuration, version - {}",
|
||||
&local_conf.date_of_creation
|
||||
);
|
||||
if !params.no_sub {
|
||||
if let Some(remote_conf) =
|
||||
// TODO : rework with pubsub mech
|
||||
once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url))
|
||||
{
|
||||
return match config_comparing(&local_conf, &remote_conf) {
|
||||
ConfigActuality::Local => {
|
||||
info!("Local config is actual");
|
||||
Some(local_conf)
|
||||
}
|
||||
ConfigActuality::Remote => {
|
||||
info!("Pulled config is more actual. Saving changes!");
|
||||
if save_new_config(&remote_conf, config_path).is_err() {
|
||||
error!("Saving changes process failed due to unexpected error...")
|
||||
}
|
||||
Some(remote_conf)
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Some(local_conf)
|
||||
}
|
||||
None => {
|
||||
warn!("No local valid conf was found. Trying to pull remote one...");
|
||||
if !params.no_sub {
|
||||
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url)));
|
||||
if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
|
||||
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
|
||||
let _ = save_new_config(&conf, config_path);
|
||||
return Some(conf);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `get_remote_conf_watcher`
|
||||
/// ## for infinitive pulling remote config
|
||||
///
|
||||
/// *input* : `&mut Connection`
|
||||
///
|
||||
/// *output* : `None` on fatal error | `Some(conf)` on succesfull pulling
|
||||
///
|
||||
/// *initiator* : fn `get_actual_config`
|
||||
///
|
||||
/// *managing* : mut ref `Connection` object
|
||||
///
|
||||
/// *depends on* : struct `Processes`
|
||||
///
|
||||
async fn get_remote_conf_watcher(conn : &mut Connection) -> Option<Processes> {
|
||||
let mut conn = conn.as_pubsub();
|
||||
let cont = crate::utils::get_container_id();
|
||||
loop {
|
||||
match cont {
|
||||
Some(ref cont) => {
|
||||
let cont = cont.trim();
|
||||
if conn.subscribe(cont).is_err() {
|
||||
// todo : delay
|
||||
continue;
|
||||
}
|
||||
match conn.get_message() {
|
||||
Ok(msg) => {
|
||||
let msg: Result<String, redis::RedisError> = msg.get_payload();
|
||||
if let Ok(payload) = msg {
|
||||
if let Some(remote) = parse_extern_config(&payload) {
|
||||
return Some(remote)
|
||||
}
|
||||
else {
|
||||
error!("Pulled invalid config, cannot start. Waiting for remote conf...");
|
||||
}
|
||||
} else {
|
||||
error!("Cannot get Redis message payload. Waiting for remote conf...");
|
||||
}
|
||||
// todo : delay
|
||||
continue;
|
||||
},
|
||||
Err(_) => {
|
||||
// todo : delay
|
||||
continue;
|
||||
},
|
||||
}
|
||||
},
|
||||
None => {
|
||||
error!("Cannot get container id. Returning");
|
||||
break
|
||||
},
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// # Fn `get_remote_conf_watcher`
|
||||
/// ## for trying to pull remote config
|
||||
///
|
||||
/// > only for situation when local isn't None (no need to fck redis server)
|
||||
///
|
||||
/// *input* : `&str`
|
||||
///
|
||||
/// *output* : `None` on empty pubsub or error | `Some(conf)` on succesfull pulling
|
||||
///
|
||||
/// *initiator* : fn `get_actual_config`
|
||||
///
|
||||
/// *managing* : &str of Redis Server credentials
|
||||
///
|
||||
/// *depends on* : struct `Processes`
|
||||
///
|
||||
fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
|
||||
let cont = crate::utils::get_container_id();
|
||||
match Client::open(serv_info) {
|
||||
Ok(client) => {
|
||||
match client.get_connection() {
|
||||
Ok(mut conn) => {
|
||||
let mut conn = conn.as_pubsub();
|
||||
match conn.subscribe(cont) {
|
||||
Ok(_) => {
|
||||
if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() {
|
||||
error!("Cannot set reading pubsub timeout and pull remote config");
|
||||
return None;
|
||||
}
|
||||
match conn.get_message() {
|
||||
Ok(msg) => {
|
||||
info!("Pulled config from Redis Server");
|
||||
let get_payload: Result<String, redis::RedisError> = msg.get_payload();
|
||||
match get_payload {
|
||||
Ok(payload) => {
|
||||
let remote = parse_extern_config(&payload);
|
||||
if remote.is_none() {
|
||||
error!("Pulled config is invalid. Check it in Redis Server");
|
||||
}
|
||||
remote
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Cannot extract payload from new message. Check Redis Server state");
|
||||
None
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
None
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Redis subscription process failed. Check Redis configuration!");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Redis connection attempt is failed. Check Redis configuration!");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Redis-Client opening attempt is failed. Check network configuration!");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ! watchers
|
||||
|
||||
/// # Fn `open_watcher`
|
||||
/// ## for infinitive opening Redis client
|
||||
///
|
||||
/// > only for situation when local isn't None (no need to fck redis server)
|
||||
///
|
||||
/// *input* : `Option<Processes>`
|
||||
///
|
||||
/// *output* : redis::Client on successful opening client
|
||||
///
|
||||
/// *initiator* : fn `get_actual_config`
|
||||
///
|
||||
/// *managing* : &str of Redis Server credentials
|
||||
///
|
||||
/// *depends on* : struct `redis::Client`
|
||||
///
|
||||
fn open_watcher(serv_info: &str) -> Client {
|
||||
loop {
|
||||
match Client::open(serv_info) {
|
||||
Ok(redis) => {
|
||||
info!("Successfully opened Redis-Client");
|
||||
return redis;
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Redis-Client opening attempt is failed. Check network configuration! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `get_connection_watcher`
|
||||
/// ## for infinitive establishing Redis connection on existing client
|
||||
///
|
||||
/// > only for situation when local isn't None (no need to fck redis server)
|
||||
///
|
||||
/// *input* : `&Client`
|
||||
///
|
||||
/// *output* : `Connection`
|
||||
///
|
||||
/// *initiator* : fn `get_actual_config`
|
||||
///
|
||||
/// *managing* : &Client for opening connection
|
||||
///
|
||||
/// *depends on* : struct `redis::Connection`
|
||||
///
|
||||
fn get_connection_watcher(client: &Client) -> Connection {
|
||||
loop {
|
||||
match client.get_connection() {
|
||||
Ok(conn) => {
|
||||
info!("Successfully got Redis connection object");
|
||||
return conn;
|
||||
}
|
||||
Err(_) => {
|
||||
error!(
|
||||
"Redis connection attempt is failed. Check Redis configuration! Retrying..."
|
||||
);
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Fn `restart_main_thread`
|
||||
/// ## for restart monitor with new config
|
||||
///
|
||||
|
|
@ -686,83 +433,6 @@ fn restart_main_thread() -> std::io::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// # Fn `subscribe_config_stream`
|
||||
/// ## for subscribe on changes, pulling to Redis pubsub to get more actual config
|
||||
///
|
||||
/// *input* : `Arc<Processes>`
|
||||
///
|
||||
/// *output* : `Ok(())` on end of work | `Err(er)` on error with subscribing mechanism
|
||||
///
|
||||
/// *initiator* : fn `subscribe_config_stream`
|
||||
///
|
||||
/// *managing* : `Arc<Processes>` to compare old config with new pulled
|
||||
///
|
||||
/// *depends on* : `Processes`
|
||||
///
|
||||
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
|
||||
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
|
||||
|
||||
if params.no_sub {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||
if let Ok(mut conn) = client.get_connection() {
|
||||
match crate::utils::get_container_id() {
|
||||
Some(channel_name) => {
|
||||
let channel_name = channel_name.trim();
|
||||
let mut pubsub = conn.as_pubsub();
|
||||
if pubsub.subscribe(&channel_name).is_ok() {
|
||||
info!("Runner subscribed on config update publishing in channel {}", &channel_name);
|
||||
loop {
|
||||
if let Ok(msg) = pubsub.get_message() {
|
||||
let get_remote_config: Result<String, redis::RedisError> = msg.get_payload();
|
||||
match get_remote_config {
|
||||
Ok(payload) => {
|
||||
if let Some(remote_config) = parse_extern_config(&payload) {
|
||||
match config_comparing(&actual_prcs, &remote_config) {
|
||||
ConfigActuality::Remote => {
|
||||
warn!("Pulled config is actual. Saving and restarting...");
|
||||
if save_new_config(&remote_config, config_path).is_err() {
|
||||
error!("Error with saving new config to {}. Stopping sub mechanism...", config_path);
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
if restart_main_thread().is_err() {
|
||||
error!("Error with restarting Runner. Stopping sub mechanism...");
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
warn!("Pulled new config. Current config is more actual ...");
|
||||
continue
|
||||
},
|
||||
}
|
||||
}
|
||||
else {
|
||||
error!("Invalid conig was pulled");
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Cannot extract new config from message");
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_secs(30)).await;
|
||||
}
|
||||
} else {
|
||||
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
error!("Cannot get channel name");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("Error with subscribing Redis stream on update. Working only with selected config...");
|
||||
Err(CustomError::Fatal)
|
||||
}
|
||||
|
||||
/// # Fn `config_comparing`
|
||||
/// ## for compare old and new configs
|
||||
///
|
||||
|
|
@ -789,14 +459,6 @@ pub fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActualit
|
|||
}
|
||||
}
|
||||
|
||||
// ! TEMPORARILY DEPRECATED !
|
||||
// fn native_date_from_millis(mls: &str) -> Option<chrono::DateTime<Utc>> {
|
||||
// match mls.parse::<i64>(){
|
||||
// Ok(val) => return chrono::DateTime::from_timestamp_millis(val),
|
||||
// Err(_) => return None,
|
||||
// }
|
||||
// }
|
||||
|
||||
/// # Fn `save_new_config`
|
||||
/// ## mechanism for saving new config in local storage
|
||||
///
|
||||
|
|
@ -885,11 +547,6 @@ mod config_unittests {
|
|||
|
||||
assert_eq!(config_comparing(&a, &b), ConfigActuality::Remote);
|
||||
}
|
||||
// TODO : strange output
|
||||
// #[test]
|
||||
// fn get_actual_config_mechanism() {
|
||||
// assert!(get_actual_config().is_some())
|
||||
// }
|
||||
#[test]
|
||||
fn save_config() {
|
||||
let a = Processes {
|
||||
|
|
|
|||
Loading…
Reference in New Issue