big async optimization

feature/configv2
prplV 2025-03-31 09:46:22 -04:00
parent 27e79ce731
commit 011c479550
6 changed files with 134 additions and 124 deletions

View File

@ -2,14 +2,14 @@ use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum NoxisCliError { pub enum NoxisCliError {
#[error("Can't find socket `{0}`. Noxis-rs daemon is disabled or can't be accessed using Unix-Socket")] #[error("Can't find socket `{0}`. Error : {1}")]
NoxisDaemonMissing(String), NoxisDaemonMissing(String, String),
#[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")] #[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")]
PortIsNotWritable, PortIsNotWritable,
#[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")] #[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")]
CliPromptCanNotBeSent, CliPromptCanNotBeSent,
#[error("Can't parse CLI struct and send as byte stream")] #[error("Can't parse CLI struct and send as byte stream")]
ToStringCliParsingParsing, ToStringCliParsingParsing,
#[error("Can't read Noxis response")] #[error("Can't read Noxis response due to {0}")]
CliResponseReadError CliResponseReadError(String)
} }

View File

@ -6,7 +6,7 @@ use super::Cli;
use super::cli_error::NoxisCliError; use super::cli_error::NoxisCliError;
async fn create_us_stream(cli: &Cli) -> Result<UnixStream> { async fn create_us_stream(cli: &Cli) -> Result<UnixStream> {
Ok(UnixStream::connect(&cli.socket).await.map_err(|_| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string()))?) Ok(UnixStream::connect(&cli.socket).await.map_err(|er| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string(), er.to_string()))?)
} }
pub async fn try_send(cli: Cli) -> Result<()> { pub async fn try_send(cli: Cli) -> Result<()> {
@ -16,14 +16,14 @@ pub async fn try_send(cli: Cli) -> Result<()> {
let msg = serde_json::to_vec(&cli) let msg = serde_json::to_vec(&cli)
.map_err(|_| NoxisCliError::ToStringCliParsingParsing)?; .map_err(|_| NoxisCliError::ToStringCliParsingParsing)?;
stream.try_write(&msg) stream.write_all(&msg)
.await
.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?; .map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
stream.shutdown().await?; let mut response = [0; 1024];
stream.read(&mut response)
let mut response = Vec::new(); .await
stream.read(&mut response).await .map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?;
.map_err(|_| NoxisCliError::CliResponseReadError)?;
println!("Received response: {}", String::from_utf8_lossy(&response)); println!("Received response: {}", String::from_utf8_lossy(&response));
Ok(()) Ok(())

View File

@ -11,7 +11,7 @@ env_logger = "0.11.3"
inotify = "0.10.2" inotify = "0.10.2"
log = "0.4.22" log = "0.4.22"
pcap = "2.2.0" pcap = "2.2.0"
redis = "0.25.4" redis = "0.29.2"
serde = { version = "1.0.203", features = ["derive"] } serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.118" serde_json = "1.0.118"
sysinfo = "0.32.0" sysinfo = "0.32.0"

View File

@ -16,7 +16,7 @@ use options::preboot::PrebootParams;
use tokio::sync::{broadcast, oneshot}; use tokio::sync::{broadcast, oneshot};
use options::config::v2::init_config_mechanism; use options::config::v2::init_config_mechanism;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()>{ async fn main() -> anyhow::Result<()>{
let preboot = Arc::new(PrebootParams::parse().validate()?); let preboot = Arc::new(PrebootParams::parse().validate()?);
@ -38,10 +38,20 @@ async fn main() -> anyhow::Result<()>{
handler.push(config_module); handler.push(config_module);
let cli_module = tokio::spawn(async move { let cli_module = tokio::spawn(async move {
let _ = init_cli_pipeline().await; if let Err(er) = init_cli_pipeline().await {
error!("CLI pipeline failed due to {}", er)
}
}); });
handler.push(cli_module); handler.push(cli_module);
let ctrlc = tokio::spawn(async move {
if let Err(er) = set_valid_destructor(vec![].into()).await {
error!("CTRLC mod failed!");
}
std::process::exit(0);
});
handler.push(ctrlc);
for i in handler { for i in handler {
let _ = i.await; let _ = i.await;
} }

View File

@ -1,8 +1,8 @@
use log::{error, info}; use log::{error, info, warn};
use tokio::net::{ UnixStream, UnixListener }; use tokio::net::{ UnixStream, UnixListener };
use anyhow::Result as DynResult; use anyhow::Result as DynResult;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use std::fs; use std::{fs, io::{Read, Write}, os::fd::AsFd, path::Path};
use tokio::io::{ AsyncWriteExt, AsyncReadExt}; use tokio::io::{ AsyncWriteExt, AsyncReadExt};
use noxis_cli::Cli; use noxis_cli::Cli;
@ -20,24 +20,24 @@ use noxis_cli::Cli;
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn init_cli_pipeline() -> DynResult<()> { pub async fn init_cli_pipeline() -> DynResult<()> {
let socket_path = "noxis-rs.sock"; let socket_path = "noxis.sock";
let _ = fs::remove_file(socket_path); let _ = fs::remove_file(socket_path);
match UnixListener::bind(socket_path) { match UnixListener::bind(socket_path) {
Ok(list) => { Ok(list) => {
// TODO: remove `unwrap`s // TODO: remove `unwrap`s
info!("Listening on {}", &list.local_addr()?.as_pathname().unwrap().display()); info!("Listening on {}", socket_path);
loop { loop {
match list.accept().await { match list.accept().await {
Ok((socket, addr)) => { Ok((socket, _)) => {
info!("CLI connection from {}", addr.as_pathname().unwrap().display()); // tokio::spawn();
process_connection(socket).await; process_connection(socket).await;
}, },
Err(er) => error!("Cannot poll connection to CLI due to {}", er), Err(er) => {
error!("Cannot poll connection to CLI due to {}", er);
sleep(Duration::from_millis(300)).await;
},
} }
dbg!(1);
sleep(Duration::from_millis(300)).await;
} }
// Ok(()) // Ok(())
}, },
@ -62,9 +62,14 @@ pub async fn init_cli_pipeline() -> DynResult<()> {
/// *depends on* : `tokio::net::TcpStream` /// *depends on* : `tokio::net::TcpStream`
/// ///
async fn process_connection(mut stream: UnixStream) { async fn process_connection(mut stream: UnixStream) {
let mut buf = Vec::new(); let mut buf = vec![0; 1024];
match stream.read(&mut buf).await { match stream.read(&mut buf).await {
Ok(_) => { Ok(0) => {
info!("Client disconnected ");
},
Ok(n) => {
buf.truncate(n);
info!("CLI have sent {} bytes", n);
match serde_json::from_slice::<Cli>(&buf) { match serde_json::from_slice::<Cli>(&buf) {
Ok(cli) => { Ok(cli) => {
info!("Received CLI request: {:?}", cli); info!("Received CLI request: {:?}", cli);
@ -77,10 +82,8 @@ async fn process_connection(mut stream: UnixStream) {
error!("Failed to parse CLI request: {}", e); error!("Failed to parse CLI request: {}", e);
} }
} }
} },
Err(e) => { Err(e) => error!("Failed to read from socket: {}", e),
error!("Failed to read from socket: {}", e);
}
} }
let _ = stream.shutdown().await; let _ = stream.shutdown().await;
} }

View File

@ -51,34 +51,34 @@ pub mod v2 {
.unwrap_or("settings.json"); .unwrap_or("settings.json");
// future to init work with local config // future to init work with local config
let lc_future = tokio::spawn(async move { let lc_future = tokio::spawn(
// let params = params.clone(); // let params = params.clone();
local_config_reciever( local_config_reciever(
params_clone, params_clone,
rx_pb_lc, rx_pb_lc,
rx_cli_lc, rx_cli_lc,
Arc::new(brd_tx) Arc::new(brd_tx)
).await )
}); );
// dbg!("before pb"); // dbg!("before pb");
// future to init work with pub sub mechanism // future to init work with pub sub mechanism
let pubsub_future = tokio::spawn(async move { let pubsub_future = tokio::spawn(
pubsub_config_reciever( pubsub_config_reciever(
tx_pb_lc, tx_pb_lc,
params.clone(), params.clone(),
local_config_brd_reciever local_config_brd_reciever
).await )
}); );
// dbg!("before cli"); // dbg!("before cli");
// future to catch new configs from cli pipeline // future to catch new configs from cli pipeline
let cli_future = tokio::spawn(async move { let cli_future = tokio::spawn(
from_cli_config_reciever( from_cli_config_reciever(
cli_oneshot, cli_oneshot,
tx_cli_lc tx_cli_lc
).await )
}); );
// let _ = lc_future.await; // let _ = lc_future.await;
// dbg!("before select"); // dbg!("before select");
tokio::select! { tokio::select! {
@ -120,10 +120,10 @@ pub mod v2 {
}, },
cli_config_option = cli_future => { cli_config_option = cli_future => {
match cli_config_option { match cli_config_option {
Err(_) => error!("Cli pulling new config mechanism crushed, restarting ..."), Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."),
Ok(option_config) => { Ok(option_config) => {
match option_config { match option_config {
None => error!("Cli pulling new config mechanism crushed, restarting ..."), None => error!("CLI pulling new config mechanism crushed, restarting ..."),
Some(config) => { Some(config) => {
info!("New config was pulled from CLI, saving and restarting ..."); info!("New config was pulled from CLI, saving and restarting ...");
let _ = save_new_config(&config, lc_path); let _ = save_new_config(&config, lc_path);
@ -163,88 +163,78 @@ pub mod v2 {
) -> anyhow::Result<()>{ ) -> anyhow::Result<()>{
/*...*/ /*...*/
// dbg!("start of pb"); // dbg!("start of pb");
sleep(Duration::from_secs(1)).await;
let mut tx_brd_local = tx_brd_local; let mut tx_brd_local = tx_brd_local;
let mut local_config = Processes::default(); let local_config = if !tx_brd_local.is_empty() {
tx_brd_local.recv().await?
for retry in 1..=5 { } else {
if !tx_brd_local.is_empty() { // Processes::default()
match tx_brd_local.recv().await { let mut tick = tokio::time::interval(Duration::from_millis(500));
Ok(lc) => local_config = lc, loop {
Err(er) => { tick.tick().await;
error!("Cannot get imported local config due to {}", &er); break match tx_brd_local.recv().await {
return Err(anyhow::Error::msg( Ok(conf) => conf,
format!("Cannot get imported local config due to {}", er)) Err(_) => continue,
) };
}
}
} }
match get_redis_connection(&local_config.config_server).await { };
Some(mut conn) => { match get_redis_connection(&local_config.config_server).await {
// Some(mut conn) => {
let mut pub_sub = conn.as_pubsub(); let mut pub_sub = conn.as_pubsub();
let channel_name = get_container_id().unwrap_or(String::from("default")); let channel_name = get_container_id().unwrap_or(String::from("default"));
let channel_name = channel_name.trim(); let channel_name = channel_name.trim();
match pub_sub.subscribe(channel_name) { match pub_sub.subscribe(channel_name) {
Err(er) => { Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er); error!("Cannot subscribe pubsub channel due to {}", &er);
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
}, },
Ok(_) => { Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name); info!("Successfully subscribed to {} pubsub channel", channel_name);
loop { let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3)));
// pubsub check loop {
if let Ok(msg) = pub_sub.get_message() { if let Ok(msg) = pub_sub.get_message() {
// dbg!("ok on get message"); // dbg!("ok on get message");
let payload : Result<String, _> = msg.get_payload(); let payload : Result<String, _> = msg.get_payload();
match payload { match payload {
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
Ok(payload) => { Ok(payload) => {
if let Some(remote) = parse_extern_config(&payload) { if let Some(remote) = parse_extern_config(&payload) {
match config_comparing(&local_config, &remote) { match config_comparing(&local_config, &remote) {
ConfigActuality::Local => { ConfigActuality::Local => {
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ..."); warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
}, },
ConfigActuality::Remote => { ConfigActuality::Remote => {
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
// to stop watching local config file mechanism // to stop watching local config file mechanism
let _ = local_conf_tx.send(true); let _ = local_conf_tx.send(true);
let config_path = params.config.to_str().unwrap_or("settings.json"); let config_path = params.config.to_str().unwrap_or("settings.json");
if save_new_config(&remote, &config_path).is_err() { if save_new_config(&remote, &config_path).is_err() {
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path); error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
return Err(anyhow::Error::msg( return Err(anyhow::Error::msg(
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path) format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
)) ))
} }
return Ok(()); return Ok(());
}, },
}
} }
else { }
warn!("Invalid config was pulled from Redis channel") else {
} warn!("Invalid config was pulled from Redis channel")
}, }
} },
} }
// delay
// dbg!("before sleep pubsub");
sleep(Duration::from_millis(500)).await;
} }
}, // delay
} tokio::task::yield_now().await;
}, }
None => { },
warn!("Cannot validly connect Redis connection. Blocking task for 20 secs and restarting tries (attempt {})", retry);
sleep(Duration::from_secs(20)).await;
} }
},
None => {
sleep(Duration::from_secs(20)).await;
} }
} }
error!("End of retries. Stopping pubsub..."); Ok(())
return Err(anyhow::Error::msg(
format!("End of retries. Stopping pubsub...")
))
} }
// //
@ -256,7 +246,7 @@ pub mod v2 {
/*...*/ /*...*/
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
/*...*/ /*...*/
// borrowing as mut // shadowing as mut
let mut pubsub_oneshot = pubsub_oneshot; let mut pubsub_oneshot = pubsub_oneshot;
let mut cli_oneshot = cli_oneshot; let mut cli_oneshot = cli_oneshot;
// fill with default empty config, mut to change later // fill with default empty config, mut to change later
@ -357,6 +347,7 @@ pub mod v2 {
} }
} }
sleep(Duration::from_millis(300)).await; sleep(Duration::from_millis(300)).await;
// tokio::task::yield_now().await;
} }
}, },
Err(_) => { Err(_) => {
@ -374,14 +365,20 @@ pub mod v2 {
) -> Option<Processes> { ) -> Option<Processes> {
/* match awaits til channel*/ /* match awaits til channel*/
// dbg!("start of cli"); // dbg!("start of cli");
match cli_oneshot.await { loop {
Ok(config_from_cli) => { if !cli_oneshot.is_empty() {
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation); match cli_oneshot.await {
let _ = to_local_tx.send(true); Ok(config_from_cli) => {
Some(config_from_cli) info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
}, let _ = to_local_tx.send(true);
_ => None, return Some(config_from_cli)
},
_ => return None,
}
}
sleep(Duration::from_millis(300)).await;
} }
} }
async fn export_saved_config_data_locally( async fn export_saved_config_data_locally(