Compare commits

..

2 Commits

Author SHA1 Message Date
prplV da3d8cd129 config preboot adj 2024-12-11 13:38:29 +03:00
prplV e7817a97b6 () -> anyhow::result<()> 2024-12-11 13:11:15 +03:00
3 changed files with 57 additions and 48 deletions

View File

@ -1,6 +1,7 @@
mod options; mod options;
mod utils; mod utils;
use anyhow::Error;
use clap::Parser; use clap::Parser;
use log::{error, info}; use log::{error, info};
use options::config::*; use options::config::*;
@ -17,12 +18,13 @@ use utils::*;
use options::preboot::PrebootParams; use options::preboot::PrebootParams;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() { async fn main() -> anyhow::Result<()>{
let preboot = PrebootParams::parse().validate(); let preboot = Arc::new(PrebootParams::parse().validate()?);
if let Err(_) = preboot { // if let Err(_) = preboot {
return; // return;
} // }
// let preboot = Arc::new(preboot);
let _ = setup_logger(); let _ = setup_logger();
@ -30,9 +32,9 @@ async fn main() {
// setting up redis connection \ // setting up redis connection \
// then conf checks to choose the most actual \ // then conf checks to choose the most actual \
let processes: Processes = get_actual_config().await.unwrap_or_else(|| { let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| {
error!("No actual configuration for runner. Stopping..."); error!("No actual configuration for runner. Stopping...");
std::process::exit(101); std::process::exit(1);
}); });
info!( info!(
@ -43,7 +45,7 @@ async fn main() {
if processes.processes.is_empty() { if processes.processes.is_empty() {
error!("Processes list is null, runner-rs initialization is stopped"); error!("Processes list is null, runner-rs initialization is stopped");
return; return Err(Error::msg("Empty processes segment in config"));
} }
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![]; let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// is in need to send to the signals handler thread // is in need to send to the signals handler thread
@ -86,7 +88,7 @@ async fn main() {
// remote config update subscription // remote config update subscription
handler.push(tokio::spawn(async move { handler.push(tokio::spawn(async move {
let _ = subscribe_config_stream(Arc::new(processes)).await; let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await;
})); }));
// cli pipeline // cli pipeline
@ -97,7 +99,7 @@ async fn main() {
for i in handler { for i in handler {
let _ = i.await; let _ = i.await;
} }
return; Ok(())
} }
// todo: integration tests // todo: integration tests

View File

@ -8,6 +8,7 @@ use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
use std::{env, fs}; use std::{env, fs};
use tokio::time::Duration; use tokio::time::Duration;
use super::preboot::PrebootParams;
const CONFIG_PATH: &str = "settings.json"; const CONFIG_PATH: &str = "settings.json";
@ -46,18 +47,21 @@ fn load_processes(json_filename: &str) -> Option<Processes> {
/// ///
/// *depends on* : struct `Processes` /// *depends on* : struct `Processes`
/// ///
pub async fn get_actual_config() -> Option<Processes> { pub async fn get_actual_config(params : Arc<PrebootParams>) -> Option<Processes> {
// * if no local conf -> loop and +inf getting conf from redis server // * if no local conf -> loop and +inf getting conf from redis server
// * if local conf -> once getting conf from redis server // * if local conf -> once getting conf from redis server
match load_processes(CONFIG_PATH) { let config_path = params.config.to_str()?;
info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url);
match load_processes(config_path) {
Some(local_conf) => { Some(local_conf) => {
info!( info!(
"Found local configuration, version - {}", "Found local configuration, version - {}",
&local_conf.date_of_creation &local_conf.date_of_creation
); );
if !params.no_remote_config {
if let Some(remote_conf) = if let Some(remote_conf) =
// TODO : rework with pubsub mech // TODO : rework with pubsub mech
once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server)) once_get_remote_configuration(&format!("redis://{}/", &params.remote_server_url))
{ {
return match config_comparing(&local_conf, &remote_conf) { return match config_comparing(&local_conf, &remote_conf) {
ConfigActuality::Local => { ConfigActuality::Local => {
@ -73,10 +77,12 @@ pub async fn get_actual_config() -> Option<Processes> {
} }
}; };
} }
}
Some(local_conf) Some(local_conf)
} }
None => { None => {
warn!("No local valid conf was found. Trying to pull remote one..."); warn!("No local valid conf was found. Trying to pull remote one...");
if !params.no_remote_config {
let mut conn = get_connection_watcher(&open_watcher("redis://localhost/")); let mut conn = get_connection_watcher(&open_watcher("redis://localhost/"));
let remote_config = get_remote_conf_watcher(&mut conn).await; let remote_config = get_remote_conf_watcher(&mut conn).await;
if let Some(conf) = remote_config { if let Some(conf) = remote_config {
@ -84,6 +90,7 @@ pub async fn get_actual_config() -> Option<Processes> {
let _ = save_new_config(&conf, CONFIG_PATH); let _ = save_new_config(&conf, CONFIG_PATH);
return Some(conf); return Some(conf);
} }
}
None None
} }
} }
@ -311,7 +318,10 @@ fn restart_main_thread() -> std::io::Result<()> {
/// ///
/// *depends on* : `Processes` /// *depends on* : `Processes`
/// ///
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> { pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<PrebootParams>) -> Result<(), CustomError> {
if !(params.no_sub && params.no_remote_config) {
return Err(CustomError::Fatal);
}
if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) { if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) {
if let Ok(mut conn) = client.get_connection() { if let Ok(mut conn) = client.get_connection() {
match crate::utils::get_container_id() { match crate::utils::get_container_id() {

View File

@ -4,7 +4,7 @@ use clap::Parser;
use std::path::PathBuf; use std::path::PathBuf;
#[derive(clap::ValueEnum, Debug, Clone)] #[derive(clap::ValueEnum, Debug, Clone)]
enum MetricsPrebootParams { pub enum MetricsPrebootParams {
Full, Full,
System, System,
Processes, Processes,
@ -68,28 +68,28 @@ pub struct PrebootParams {
conflicts_with="no_hostagent", conflicts_with="no_hostagent",
help="To set .sock file's path used in communication with host-agent" help="To set .sock file's path used in communication with host-agent"
)] )]
socket_path : PathBuf, pub socket_path : PathBuf,
#[arg( #[arg(
long = "log-to", long = "log-to",
default_value="./", default_value="./",
conflicts_with="no_logs", conflicts_with="no_logs",
help="To set a path to logs directory" help="To set a path to logs directory"
)] )]
log_to : PathBuf, pub log_to : PathBuf,
#[arg( #[arg(
long = "remote-server-url", long = "remote-server-url",
default_value="redis://localhost", default_value="localhost",
conflicts_with="no_remote_config", conflicts_with="no_remote_config",
help = "To set url of remote config server using in remote config pulling mechanism" help = "To set url of remote config server using in remote config pulling mechanism"
)] )]
remote_server_url : String, pub remote_server_url : String,
#[arg( #[arg(
long = "config", long = "config",
short, short,
default_value="settings.json", default_value="settings.json",
help="To set local config file path" help="To set local config file path"
)] )]
config : PathBuf, pub config : PathBuf,
// value enum params (metrics) // value enum params (metrics)
#[arg( #[arg(
@ -98,24 +98,21 @@ pub struct PrebootParams {
default_value_t=MetricsPrebootParams::Full, default_value_t=MetricsPrebootParams::Full,
help="To set metrics grubbing mode" help="To set metrics grubbing mode"
)] )]
metrics: MetricsPrebootParams, pub metrics: MetricsPrebootParams,
} }
impl PrebootParams { impl PrebootParams {
pub fn validate(self) -> Result<Self> { pub fn validate(self) -> Result<Self> {
if !self.socket_path.exists() { if !self.socket_path.exists() {
eprintln!("Socket-file {} doesn't exist. Cannot start", &self.socket_path.display()); return Err(Error::msg("Socket-file not found or Noxis can't read it. Cannot start"));
return Err(Error::msg("Socket-file Not Found"));
} }
// existing log dir // existing log dir
if !self.log_to.exists() { if !self.log_to.exists() {
eprintln!("Log directory {} doesn't exist", &self.log_to.display()); return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start"));
return Err(Error::msg("Log Directory Not Found. Cannot start"));
} }
// existing sock file // existing sock file
if !self.config.exists() { if !self.config.exists() {
eprintln!("Local config file {} doesn't exist", &self.config.display()); return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start"));
return Err(Error::msg("Local Config Not Found. Cannot start"));
} }
// redis server check // redis server check
Ok(self) Ok(self)