preboot changed + config setting up

feature/configv2
prplV 2025-02-03 12:12:18 +03:00
parent b425b17d25
commit e9b6abefdf
4 changed files with 139 additions and 102 deletions

View File

@ -25,73 +25,73 @@ async fn main() -> anyhow::Result<()>{
// setting up redis connection \ // setting up redis connection \
// then conf checks to choose the most actual \ // then conf checks to choose the most actual \
let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| { // let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
error!("No actual configuration for runner. Stopping..."); // error!("No actual configuration for runner. Stopping...");
std::process::exit(1); // std::process::exit(1);
}); // });
//
info!( // info!(
"Current runner configuration: {}", // "Current runner configuration: {}",
&processes.date_of_creation // &processes.date_of_creation
); // );
info!("Runner is ready. Initializing..."); // info!("Runner is ready. Initializing...");
//
if processes.processes.is_empty() { // if processes.processes.is_empty() {
error!("Processes list is null, runner-rs initialization is stopped"); // error!("Processes list is null, runner-rs initialization is stopped");
return Err(Error::msg("Empty processes segment in config")); // return Err(Error::msg("Empty processes segment in config"));
} // }
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![]; // let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// is in need to send to the signals handler thread // // is in need to send to the signals handler thread
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![]; // let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
//
for proc in processes.processes.iter() { // for proc in processes.processes.iter() {
info!( // info!(
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", // "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
proc.name, // proc.name,
proc.path, // proc.path,
proc.dependencies.files.len(), // proc.dependencies.files.len(),
proc.dependencies.services.len() // proc.dependencies.services.len()
); // );
//
// creating msg channel // // creating msg channel
// can or should be executed in new thread // // can or should be executed in new thread
let (tx, mut rx) = mpsc::channel::<u8>(1); // let (tx, mut rx) = mpsc::channel::<u8>(1);
let proc = Arc::new(proc.clone()); // let proc = Arc::new(proc.clone());
let tx = Arc::new(tx.clone()); // let tx = Arc::new(tx.clone());
//
senders.push(Arc::clone(&tx.clone())); // senders.push(Arc::clone(&tx.clone()));
//
let event = tokio::spawn(async move { // let event = tokio::spawn(async move {
run_daemons(proc.clone(), tx.clone(), &mut rx).await; // run_daemons(proc.clone(), tx.clone(), &mut rx).await;
}); // });
handler.push(event); // handler.push(event);
} // }
//
// destructor addition // // destructor addition
handler.push(tokio::spawn(async move { // handler.push(tokio::spawn(async move {
if set_valid_destructor(Arc::new(senders)).await.is_err() { // if set_valid_destructor(Arc::new(senders)).await.is_err() {
error!("Linux signals handler creation failed. Terminating main thread..."); // error!("Linux signals handler creation failed. Terminating main thread...");
return; // return;
} // }
//
tokio::time::sleep(Duration::from_millis(200)).await; // tokio::time::sleep(Duration::from_millis(200)).await;
info!("End of job. Terminating main thread..."); // info!("End of job. Terminating main thread...");
std::process::exit(0); // std::process::exit(0);
})); // }));
//
// remote config update subscription // // remote config update subscription
handler.push(tokio::spawn(async move { // handler.push(tokio::spawn(async move {
let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await; // let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
})); // }));
//
// cli pipeline // // cli pipeline
handler.push(tokio::spawn(async move { // handler.push(tokio::spawn(async move {
let _ = init_cli_pipeline().await; // let _ = init_cli_pipeline().await;
})); // }));
//
for i in handler { // for i in handler {
let _ = i.await; // let _ = i.await;
} // }
Ok(()) Ok(())
} }

View File

@ -1,6 +1,6 @@
use super::structs::*; use super::structs::*;
use log::{error, info, warn}; use log::{error, info, warn};
use redis::{Client, Connection}; use redis::{Client, Connection, PubSub};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Write; use std::io::Write;
use std::os::unix::process::CommandExt; use std::os::unix::process::CommandExt;
@ -9,9 +9,47 @@ use std::sync::Arc;
use std::{env, fs}; use std::{env, fs};
use super::preboot::PrebootParams; use super::preboot::PrebootParams;
use tokio::time::{Duration, sleep}; use tokio::time::{Duration, sleep};
// use redis::PubSub;
use tokio::sync::oneshot::Receiver;
// const CONFIG_PATH: &str = "settings.json"; // const CONFIG_PATH: &str = "settings.json";
pub mod v2 {
use super::*;
pub async fn init_config_mechanism(cli_oneshot: Arc<Receiver<Processes>> /*...*/) { /* local + pubsub + cli oneshot check */ }
pub async fn get_pubsub<'a>(params: Arc<PrebootParams>) -> Option<PubSub<'a>> {
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
if params.no_sub || params.no_sub {
return None;
}
if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) {
if let Ok(mut conn) = client.get_connection() {
match crate::utils::get_container_id() {
Some(channel_name) => {
let channel_name = channel_name.trim();
let mut pubsub = conn.as_pubsub();
if pubsub.subscribe(&channel_name).is_ok() {
} else {
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
}
},
None => {
error!("Cannot get channel name");
}
}
}
}
error!("Error with subscribing Redis stream on update. Working only with selected config...");
None
}
pub async fn get_local_config_watcher(/*...*/) { /*...*/ }
//
pub async fn cli_config_reciever(cli_oneshot: Arc<Receiver<Processes>>) { /*...*/ }
}
/// # Fn `load_processes` /// # Fn `load_processes`
/// ## for reading and parsing *local* storing config /// ## for reading and parsing *local* storing config
/// ///
@ -54,14 +92,14 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
error!("Invalid character in config file. Config path was set to default"); error!("Invalid character in config file. Config path was set to default");
"settings.json" "settings.json"
}); });
info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url); info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url);
match load_processes(config_path) { match load_processes(config_path) {
Some(local_conf) => { Some(local_conf) => {
info!( info!(
"Found local configuration, version - {}", "Found local configuration, version - {}",
&local_conf.date_of_creation &local_conf.date_of_creation
); );
if !params.no_remote_config { if !params.no_sub {
if let Some(remote_conf) = if let Some(remote_conf) =
// TODO : rework with pubsub mech // TODO : rework with pubsub mech
once_get_remote_configuration(&format!("redis://{}/", &params.remote_server_url)) once_get_remote_configuration(&format!("redis://{}/", &params.remote_server_url))
@ -85,7 +123,7 @@ pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes>
} }
None => { None => {
warn!("No local valid conf was found. Trying to pull remote one..."); warn!("No local valid conf was found. Trying to pull remote one...");
if !params.no_remote_config { if !params.no_sub {
let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", &params.remote_server_url))); let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", &params.remote_server_url)));
if let Some(conf) = get_remote_conf_watcher(&mut conn).await { if let Some(conf) = get_remote_conf_watcher(&mut conn).await {
info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation);
@ -322,7 +360,7 @@ fn restart_main_thread() -> std::io::Result<()> {
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> { pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); let config_path = params.config.to_str().unwrap_or_else(|| "settings.json");
if params.no_sub || params.no_remote_config { if params.no_sub || params.no_sub {
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) { if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) {

View File

@ -13,7 +13,7 @@ enum EnvVars {
NoxisNoHagent, NoxisNoHagent,
NoxisNoLogs, NoxisNoLogs,
NoxisRefreshLogs, NoxisRefreshLogs,
NoxisNoRemoteConfig, // NoxisNoRemoteConfig,
NoxisNoConfigSub, NoxisNoConfigSub,
NoxisSocketPath, NoxisSocketPath,
NoxisLogTo, NoxisLogTo,
@ -29,7 +29,7 @@ impl std::fmt::Display for EnvVars {
EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"), EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"),
EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"), EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"),
EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"), EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"),
EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"), // EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"),
EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"), EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"),
EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"), EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"),
EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"), EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"),
@ -48,7 +48,7 @@ impl<'a> EnvVars {
EnvVars::NoxisNoHagent => "false", EnvVars::NoxisNoHagent => "false",
EnvVars::NoxisNoLogs => "false", EnvVars::NoxisNoLogs => "false",
EnvVars::NoxisRefreshLogs => "false", EnvVars::NoxisRefreshLogs => "false",
EnvVars::NoxisNoRemoteConfig => "false", // EnvVars::NoxisNoRemoteConfig => "false",
EnvVars::NoxisNoConfigSub => "false", EnvVars::NoxisNoConfigSub => "false",
EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock", EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock",
EnvVars::NoxisLogTo => "./", EnvVars::NoxisLogTo => "./",
@ -77,7 +77,7 @@ impl<'a> EnvVars {
Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string()); Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string());
Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string()); Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string());
Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string()); Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string());
Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string()); // Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string());
Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string()); Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string());
Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap()); Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap());
Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap()); Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap());
@ -147,12 +147,6 @@ impl std::fmt::Display for MetricsPrebootParams {
/// noxis-rs ... --refresh-logs ... /// noxis-rs ... --refresh-logs ...
/// ``` /// ```
/// ///
/// `--no-remote-config` - to disable work with Redis as config producer
/// ### usage :
/// ``` bash
/// noxis-rs ... --no-remote-config ...
/// ```
///
/// `--no-sub` - to disable Redis subscribtion mechanism /// `--no-sub` - to disable Redis subscribtion mechanism
/// ### usage : /// ### usage :
/// ``` bash /// ``` bash
@ -212,17 +206,18 @@ pub struct PrebootParams {
help="To clear logs directory" help="To clear logs directory"
)] )]
pub refresh_logs : bool, pub refresh_logs : bool,
#[arg( // #[arg(
long = "no-remote-config", // long = "no-remote-config",
action, // action,
help="To disable work with remote config server", // help="To disable work with remote config server",
conflicts_with="no_sub")] // conflicts_with="no_sub")]
pub no_remote_config : bool, // pub no_remote_config : bool,
#[arg( #[arg(
long = "no-sub", long = "no-sub",
action, action,
help="To disable subscription mechanism", help="To disable Redis subscription mechanism",
conflicts_with="no_remote_config")] )]
// conflicts_with="no_remote_config"
pub no_sub : bool, pub no_sub : bool,
// params (socket_path, log_to, remote_server_url, config) // params (socket_path, log_to, remote_server_url, config)
@ -243,7 +238,7 @@ pub struct PrebootParams {
#[arg( #[arg(
long = "remote-server-url", long = "remote-server-url",
default_value="localhost", default_value="localhost",
conflicts_with="no_remote_config", conflicts_with="no_sub",
help = "To set url of remote config server using in remote config pulling mechanism" help = "To set url of remote config server using in remote config pulling mechanism"
)] )]
pub remote_server_url : String, pub remote_server_url : String,
@ -288,15 +283,17 @@ impl PrebootParams {
// existing log dir // existing log dir
if !self.log_to.exists() && !self.no_logs { if !self.log_to.exists() && !self.no_logs {
eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default"); eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default");
self.refresh_logs = false;
self.log_to = PathBuf::from("./"); self.log_to = PathBuf::from("./");
// return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start")); // return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
} }
// existing sock file // existing sock file
if !self.config.exists() { if !self.config.exists() {
eprintln!("Error: Invalid character in config file. Config path was set to default"); eprintln!("Error: Invalid character in config file. Config path was set to default");
let config = PathBuf::from("/etc/settings.json"); // TODO : ??? wtf is going with 2 paths
if !config.exists() && self.no_remote_config { let config = PathBuf::from("/etc/enode/noxis/settings.json");
return Err(Error::msg("Noxis cannot run without config. Create local config or enable remote-config mechanism")); if !config.exists() && self.no_sub {
return Err(Error::msg("Noxis cannot run without config. Create local config or enable pubsub mechanism"));
} }
self.config = PathBuf::from("settings.json"); self.config = PathBuf::from("settings.json");
// return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start")); // return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
@ -353,20 +350,20 @@ mod preboot_unitests{
"runner-rs", "runner-rs",
"--no-sub", "--no-sub",
"--remote-server-url", "redis://127.0.0.1" "--remote-server-url", "redis://127.0.0.1"
]).is_ok())
}
#[test]
fn parsing_config_invalid_args_noremote_nosub() {
assert!(PrebootParams::try_parse_from(vec![
"runner-rs",
"--no-remote-config", "--no-sub"
]).is_err()) ]).is_err())
} }
// #[test]
// fn parsing_config_invalid_args_noremote_nosub() {
// assert!(PrebootParams::try_parse_from(vec![
// "runner-rs",
// "--no-remote-config", "--no-sub"
// ]).is_err())
// }
#[test] #[test]
fn parsing_config_invalid_args_noremote_remoteurl() { fn parsing_config_invalid_args_noremote_remoteurl() {
assert!(PrebootParams::try_parse_from(vec![ assert!(PrebootParams::try_parse_from(vec![
"runner-rs", "runner-rs",
"--no-remote-config", "--no-sub",
"--remote-server-url", "redis://127.0.0.1" "--remote-server-url", "redis://127.0.0.1"
]).is_err()) ]).is_err())
} }

View File

@ -40,6 +40,8 @@ pub struct Processes {
pub processes: Vec<TrackingProcess>, pub processes: Vec<TrackingProcess>,
} }
/// # Struct for the 2nd level in json conf file /// # Struct for the 2nd level in json conf file
/// ## for each process to contain info, such as name, path and dependencies /// ## for each process to contain info, such as name, path and dependencies
/// ///