diff --git a/noxis-cli/src/cli_error.rs b/noxis-cli/src/cli_error.rs index 7589738..d5bae9b 100644 --- a/noxis-cli/src/cli_error.rs +++ b/noxis-cli/src/cli_error.rs @@ -2,14 +2,14 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum NoxisCliError { - #[error("Can't find socket `{0}`. Noxis-rs daemon is disabled or can't be accessed using Unix-Socket")] - NoxisDaemonMissing(String), + #[error("Can't find socket `{0}`. Error : {1}")] + NoxisDaemonMissing(String, String), #[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")] PortIsNotWritable, #[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")] CliPromptCanNotBeSent, #[error("Can't parse CLI struct and send as byte stream")] ToStringCliParsingParsing, - #[error("Can't read Noxis response")] - CliResponseReadError + #[error("Can't read Noxis response due to {0}")] + CliResponseReadError(String) } \ No newline at end of file diff --git a/noxis-cli/src/cli_net.rs b/noxis-cli/src/cli_net.rs index 7bb0178..a3300ed 100644 --- a/noxis-cli/src/cli_net.rs +++ b/noxis-cli/src/cli_net.rs @@ -6,7 +6,7 @@ use super::Cli; use super::cli_error::NoxisCliError; async fn create_us_stream(cli: &Cli) -> Result { - Ok(UnixStream::connect(&cli.socket).await.map_err(|_| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string()))?) + Ok(UnixStream::connect(&cli.socket).await.map_err(|er| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string(), er.to_string()))?) } pub async fn try_send(cli: Cli) -> Result<()> { @@ -16,14 +16,14 @@ pub async fn try_send(cli: Cli) -> Result<()> { let msg = serde_json::to_vec(&cli) .map_err(|_| NoxisCliError::ToStringCliParsingParsing)?; - stream.try_write(&msg) + stream.write_all(&msg) + .await .map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?; - stream.shutdown().await?; - - let mut response = Vec::new(); - stream.read(&mut response).await - .map_err(|_| NoxisCliError::CliResponseReadError)?; + let mut response = [0; 1024]; + stream.read(&mut response) + .await + .map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?; println!("Received response: {}", String::from_utf8_lossy(&response)); Ok(()) diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 0745074..86cf9bd 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -11,7 +11,7 @@ env_logger = "0.11.3" inotify = "0.10.2" log = "0.4.22" pcap = "2.2.0" -redis = "0.25.4" +redis = "0.29.2" serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.118" sysinfo = "0.32.0" diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 951eadf..7341bac 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -16,7 +16,7 @@ use options::preboot::PrebootParams; use tokio::sync::{broadcast, oneshot}; use options::config::v2::init_config_mechanism; -#[tokio::main(flavor = "multi_thread")] +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ let preboot = Arc::new(PrebootParams::parse().validate()?); @@ -36,12 +36,22 @@ async fn main() -> anyhow::Result<()>{ ).await; }); handler.push(config_module); - + let cli_module = tokio::spawn(async move { - let _ = init_cli_pipeline().await; + if let Err(er) = init_cli_pipeline().await { + error!("CLI pipeline failed due to {}", er) + } }); handler.push(cli_module); + let ctrlc = tokio::spawn(async move { + if let Err(er) = set_valid_destructor(vec![].into()).await { + error!("CTRLC mod failed!"); + } + std::process::exit(0); + }); + handler.push(ctrlc); + for i in handler { let _ = i.await; } diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index 1c85dae..c31df72 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -1,8 +1,8 @@ -use log::{error, info}; +use log::{error, info, warn}; use tokio::net::{ UnixStream, UnixListener }; use anyhow::Result as DynResult; use tokio::time::{sleep, Duration}; -use std::fs; +use std::{fs, io::{Read, Write}, os::fd::AsFd, path::Path}; use tokio::io::{ AsyncWriteExt, AsyncReadExt}; use noxis_cli::Cli; @@ -20,24 +20,24 @@ use noxis_cli::Cli; /// *depends on* : - /// pub async fn init_cli_pipeline() -> DynResult<()> { - let socket_path = "noxis-rs.sock"; + let socket_path = "noxis.sock"; let _ = fs::remove_file(socket_path); match UnixListener::bind(socket_path) { Ok(list) => { // TODO: remove `unwrap`s - info!("Listening on {}", &list.local_addr()?.as_pathname().unwrap().display()); + info!("Listening on {}", socket_path); loop { match list.accept().await { - Ok((socket, addr)) => { - info!("CLI connection from {}", addr.as_pathname().unwrap().display()); + Ok((socket, _)) => { + // tokio::spawn(); process_connection(socket).await; }, - Err(er) => error!("Cannot poll connection to CLI due to {}", er), + Err(er) => { + error!("Cannot poll connection to CLI due to {}", er); + sleep(Duration::from_millis(300)).await; + }, } - dbg!(1); - sleep(Duration::from_millis(300)).await; - } // Ok(()) }, @@ -62,9 +62,14 @@ pub async fn init_cli_pipeline() -> DynResult<()> { /// *depends on* : `tokio::net::TcpStream` /// async fn process_connection(mut stream: UnixStream) { - let mut buf = Vec::new(); + let mut buf = vec![0; 1024]; match stream.read(&mut buf).await { - Ok(_) => { + Ok(0) => { + info!("Client disconnected "); + }, + Ok(n) => { + buf.truncate(n); + info!("CLI have sent {} bytes", n); match serde_json::from_slice::(&buf) { Ok(cli) => { info!("Received CLI request: {:?}", cli); @@ -77,10 +82,8 @@ async fn process_connection(mut stream: UnixStream) { error!("Failed to parse CLI request: {}", e); } } - } - Err(e) => { - error!("Failed to read from socket: {}", e); - } + }, + Err(e) => error!("Failed to read from socket: {}", e), } let _ = stream.shutdown().await; } diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index a1f3e56..9c2e69f 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -51,34 +51,34 @@ pub mod v2 { .unwrap_or("settings.json"); // future to init work with local config - let lc_future = tokio::spawn(async move { + let lc_future = tokio::spawn( // let params = params.clone(); local_config_reciever( params_clone, rx_pb_lc, rx_cli_lc, Arc::new(brd_tx) - ).await - }); + ) + ); // dbg!("before pb"); // future to init work with pub sub mechanism - let pubsub_future = tokio::spawn(async move { + let pubsub_future = tokio::spawn( pubsub_config_reciever( tx_pb_lc, params.clone(), local_config_brd_reciever - ).await - }); + ) + ); // dbg!("before cli"); // future to catch new configs from cli pipeline - let cli_future = tokio::spawn(async move { + let cli_future = tokio::spawn( from_cli_config_reciever( cli_oneshot, tx_cli_lc - ).await + ) - }); + ); // let _ = lc_future.await; // dbg!("before select"); tokio::select! { @@ -120,10 +120,10 @@ pub mod v2 { }, cli_config_option = cli_future => { match cli_config_option { - Err(_) => error!("Cli pulling new config mechanism crushed, restarting ..."), + Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."), Ok(option_config) => { match option_config { - None => error!("Cli pulling new config mechanism crushed, restarting ..."), + None => error!("CLI pulling new config mechanism crushed, restarting ..."), Some(config) => { info!("New config was pulled from CLI, saving and restarting ..."); let _ = save_new_config(&config, lc_path); @@ -163,88 +163,78 @@ pub mod v2 { ) -> anyhow::Result<()>{ /*...*/ // dbg!("start of pb"); - sleep(Duration::from_secs(1)).await; - let mut tx_brd_local = tx_brd_local; - let mut local_config = Processes::default(); - - for retry in 1..=5 { - if !tx_brd_local.is_empty() { - match tx_brd_local.recv().await { - Ok(lc) => local_config = lc, - Err(er) => { - error!("Cannot get imported local config due to {}", &er); - return Err(anyhow::Error::msg( - format!("Cannot get imported local config due to {}", er)) - ) - } - } + let local_config = if !tx_brd_local.is_empty() { + tx_brd_local.recv().await? + } else { + // Processes::default() + let mut tick = tokio::time::interval(Duration::from_millis(500)); + loop { + tick.tick().await; + break match tx_brd_local.recv().await { + Ok(conf) => conf, + Err(_) => continue, + }; } - match get_redis_connection(&local_config.config_server).await { - Some(mut conn) => { - // - let mut pub_sub = conn.as_pubsub(); - let channel_name = get_container_id().unwrap_or(String::from("default")); - let channel_name = channel_name.trim(); - match pub_sub.subscribe(channel_name) { - Err(er) => { - error!("Cannot subscribe pubsub channel due to {}", &er); - return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) - }, - Ok(_) => { - info!("Successfully subscribed to {} pubsub channel", channel_name); - loop { - // pubsub check - if let Ok(msg) = pub_sub.get_message() { - // dbg!("ok on get message"); - let payload : Result = msg.get_payload(); - match payload { - Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), - Ok(payload) => { - if let Some(remote) = parse_extern_config(&payload) { - match config_comparing(&local_config, &remote) { - ConfigActuality::Local => { - warn!("Pulled new config from Redis channel, it's outdated. Ignoring ..."); - }, - ConfigActuality::Remote => { - info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); - // to stop watching local config file mechanism - let _ = local_conf_tx.send(true); - let config_path = params.config.to_str().unwrap_or("settings.json"); - - if save_new_config(&remote, &config_path).is_err() { - error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path); - return Err(anyhow::Error::msg( - format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path) - )) - } - return Ok(()); - }, - } + }; + match get_redis_connection(&local_config.config_server).await { + Some(mut conn) => { + let mut pub_sub = conn.as_pubsub(); + let channel_name = get_container_id().unwrap_or(String::from("default")); + let channel_name = channel_name.trim(); + match pub_sub.subscribe(channel_name) { + Err(er) => { + error!("Cannot subscribe pubsub channel due to {}", &er); + return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) + }, + Ok(_) => { + info!("Successfully subscribed to {} pubsub channel", channel_name); + let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3))); + loop { + if let Ok(msg) = pub_sub.get_message() { + // dbg!("ok on get message"); + let payload : Result = msg.get_payload(); + match payload { + Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), + Ok(payload) => { + if let Some(remote) = parse_extern_config(&payload) { + match config_comparing(&local_config, &remote) { + ConfigActuality::Local => { + warn!("Pulled new config from Redis channel, it's outdated. Ignoring ..."); + }, + ConfigActuality::Remote => { + info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); + // to stop watching local config file mechanism + let _ = local_conf_tx.send(true); + let config_path = params.config.to_str().unwrap_or("settings.json"); + + if save_new_config(&remote, &config_path).is_err() { + error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path); + return Err(anyhow::Error::msg( + format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path) + )) + } + return Ok(()); + }, } - else { - warn!("Invalid config was pulled from Redis channel") - } - }, - } + } + else { + warn!("Invalid config was pulled from Redis channel") + } + }, } - // delay - // dbg!("before sleep pubsub"); - sleep(Duration::from_millis(500)).await; } - }, - } - }, - None => { - warn!("Cannot validly connect Redis connection. Blocking task for 20 secs and restarting tries (attempt {})", retry); - sleep(Duration::from_secs(20)).await; + // delay + tokio::task::yield_now().await; + } + }, } + }, + None => { + sleep(Duration::from_secs(20)).await; } } - error!("End of retries. Stopping pubsub..."); - return Err(anyhow::Error::msg( - format!("End of retries. Stopping pubsub...") - )) + Ok(()) } // @@ -256,7 +246,7 @@ pub mod v2 { /*...*/ ) -> anyhow::Result<()> { /*...*/ - // borrowing as mut + // shadowing as mut let mut pubsub_oneshot = pubsub_oneshot; let mut cli_oneshot = cli_oneshot; // fill with default empty config, mut to change later @@ -357,6 +347,7 @@ pub mod v2 { } } sleep(Duration::from_millis(300)).await; + // tokio::task::yield_now().await; } }, Err(_) => { @@ -374,14 +365,20 @@ pub mod v2 { ) -> Option { /* match awaits til channel*/ // dbg!("start of cli"); - match cli_oneshot.await { - Ok(config_from_cli) => { - info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation); - let _ = to_local_tx.send(true); - Some(config_from_cli) - }, - _ => None, + loop { + if !cli_oneshot.is_empty() { + match cli_oneshot.await { + Ok(config_from_cli) => { + info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation); + let _ = to_local_tx.send(true); + return Some(config_from_cli) + }, + _ => return None, + } + } + sleep(Duration::from_millis(300)).await; } + } async fn export_saved_config_data_locally(