Compare commits

...

2 Commits

Author SHA1 Message Date
prplV 163887d42c tcp -> unixsocket 2025-03-27 10:46:22 -04:00
prplV 35a21da431 reding config lc+pubsub rework 2025-03-27 10:33:37 -04:00
5 changed files with 120 additions and 113 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@
Cargo.lock Cargo.lock
hagent_test.sock hagent_test.sock
release release
*.sock

View File

@ -1,6 +1,6 @@
{ {
"dateOfCreation": "1721381809110", "dateOfCreation": "1721381809112",
"configServer": "localhost", "configServer": "192.168.2.37",
"processes": [ "processes": [
{ {
"name": "temp-process", "name": "temp-process",

View File

@ -36,9 +36,13 @@ async fn main() -> anyhow::Result<()>{
preboot.clone() preboot.clone()
).await; ).await;
}); });
handler.push(config_module); handler.push(config_module);
let cli_module = tokio::spawn(async move {
let _ = init_cli_pipeline().await;
});
handler.push(cli_module);
for i in handler { for i in handler {
let _ = i.await; let _ = i.await;
} }

View File

@ -1,10 +1,11 @@
use log::{error, info, warn}; use log::{error, info, warn};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream, UnixStream};
use anyhow::{Result as DynResult, Error}; use anyhow::{Result as DynResult, Error};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}}; use std::{borrow::BorrowMut, fs, net::{IpAddr, Ipv4Addr}};
// use std::io::BufReader; // use std::io::BufReader;
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt}; use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
use tokio::{io::AsyncReadExt, net::UnixListener};
use noxis_cli::Cli; use noxis_cli::Cli;
use serde_json::from_str; use serde_json::from_str;
@ -23,21 +24,20 @@ use serde_json::from_str;
/// ///
pub async fn init_cli_pipeline() -> DynResult<()> { pub async fn init_cli_pipeline() -> DynResult<()> {
match init_listener().await { match init_listener().await {
Some(list) => { Ok(list) => {
info!("Successfully opened UnixListener for CLI");
loop { loop {
if let Ok((socket, addr)) = list.accept().await { if let Ok((socket, _)) = list.accept().await {
// isolation
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
warn!("Declined attempt to connect TCP-socket from {}", addr);
continue;
}
process_connection(socket).await; process_connection(socket).await;
} }
sleep(Duration::from_millis(500)).await; sleep(Duration::from_millis(500)).await;
} }
// Ok(()) // Ok(())
}, },
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use")) Err(er) => {
error!("Failed to open UnixListener for CLI");
Err(er)
},
} }
} }
@ -54,17 +54,20 @@ pub async fn init_cli_pipeline() -> DynResult<()> {
/// ///
/// *depends on* : `tokio::net::TcpListener` /// *depends on* : `tokio::net::TcpListener`
/// ///
async fn init_listener() -> Option<TcpListener> { async fn init_listener() -> anyhow::Result<UnixListener> {
match TcpListener::bind("127.0.0.1:7753").await { // match TcpListener::bind("127.0.0.1:7753").await {
Ok(listener) => { // Ok(listener) => {
info!("Runner is listening localhost:7753"); // info!("Runner is listening localhost:7753");
Some(listener) // Some(listener)
}, // },
Err(_) => { // Err(_) => {
error!("Cannot create TCP listener for CLI"); // error!("Cannot create TCP listener for CLI");
None // None
} // }
} // }
let socket_path = "noxis-rs";
let _ = fs::remove_file(socket_path);
Ok(UnixListener::bind(socket_path)?)
} }
/// # Fn `process_connection` /// # Fn `process_connection`
@ -80,11 +83,10 @@ async fn init_listener() -> Option<TcpListener> {
/// ///
/// *depends on* : `tokio::net::TcpStream` /// *depends on* : `tokio::net::TcpStream`
/// ///
async fn process_connection(mut stream: TcpStream) { async fn process_connection(mut stream: UnixStream) {
let buf_reader = BufReader::new(stream.borrow_mut()); let buf_reader = BufReader::new(stream.borrow_mut());
let mut rqst = buf_reader.lines(); let mut rqst = buf_reader.lines();
while let Ok(Some(line)) = rqst.next_line().await { while let Ok(Some(line)) = rqst.next_line().await {
if line.is_empty() { if line.is_empty() {
break break
@ -101,6 +103,6 @@ async fn process_connection(mut stream: TcpStream) {
println!("{}", line); println!("{}", line);
} }
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!"; let response = "OK";
stream.write_all(response.as_bytes()).await.unwrap(); stream.write_all(response.as_bytes()).await.unwrap();
} }

View File

@ -119,12 +119,6 @@ pub mod v2 {
let _ = restart_main_thread(); let _ = restart_main_thread();
}, },
cli_config_option = cli_future => { cli_config_option = cli_future => {
// match cli_config_option {
// Some(config) => {},
// None => {
// error!("Cli pulling new config mechanism crushed, restarting ...")
// },
// }
match cli_config_option { match cli_config_option {
Err(_) => error!("Cli pulling new config mechanism crushed, restarting ..."), Err(_) => error!("Cli pulling new config mechanism crushed, restarting ..."),
Ok(option_config) => { Ok(option_config) => {
@ -144,22 +138,20 @@ pub mod v2 {
// TODO! futures + select! [OK] // TODO! futures + select! [OK]
// TODO! tests config // TODO! tests config
} }
pub async fn get_redis_connection(params: Arc<PrebootParams>) -> Option<Connection> { pub async fn get_redis_connection(params: &str) -> Option<Connection> {
if params.no_sub { for i in 1..=3 {
return None; let redis_url = format!("redis://{}/", params);
} info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i);
let mut connection_delay: u64 = 1; if let Ok(client) = Client::open(redis_url) {
loop {
if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) {
if let Ok(conn) = client.get_connection() { if let Ok(conn) = client.get_connection() {
info!("Successfully opened Redis connection"); info!("Successfully opened Redis connection");
return Some(conn); return Some(conn);
} }
} }
error!("Error with subscribing Redis stream on update. Retrying in {} secs...", connection_delay); error!("Error with subscribing Redis stream on update. Retrying in 5 secs...");
sleep(Duration::from_secs(connection_delay)).await; sleep(Duration::from_secs(5)).await;
connection_delay *= 2;
} }
None
} }
// loop checking redis pubsub // loop checking redis pubsub
@ -171,80 +163,88 @@ pub mod v2 {
) -> anyhow::Result<()>{ ) -> anyhow::Result<()>{
/*...*/ /*...*/
// dbg!("start of pb"); // dbg!("start of pb");
let mut tx_brd_local = tx_brd_local; sleep(Duration::from_secs(1)).await;
let mut _local_config = Processes::default();
return match get_redis_connection(params.clone()).await {
Some(mut conn) => {
//
let mut pub_sub = conn.as_pubsub();
let channel_name = get_container_id().unwrap_or(String::from("default"));
let channel_name = channel_name.trim();
match pub_sub.subscribe(channel_name) {
Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er);
Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
},
Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name);
loop {
// brd check
// if let Ok(new_lc) = tx_brd_local.recv().await {
// } let mut tx_brd_local = tx_brd_local;
if !tx_brd_local.is_empty() { let mut local_config = Processes::default();
match tx_brd_local.recv().await {
Ok(lc) => _local_config = lc, for retry in 1..=5 {
Err(er) => { if !tx_brd_local.is_empty() {
error!("Cannot get imported local config due to {}", &er); match tx_brd_local.recv().await {
return Err(anyhow::Error::msg( Ok(lc) => local_config = lc,
format!("Cannot get imported local config due to {}", er)) Err(er) => {
) error!("Cannot get imported local config due to {}", &er);
return Err(anyhow::Error::msg(
format!("Cannot get imported local config due to {}", er))
)
}
}
}
match get_redis_connection(&local_config.config_server).await {
Some(mut conn) => {
//
let mut pub_sub = conn.as_pubsub();
let channel_name = get_container_id().unwrap_or(String::from("default"));
let channel_name = channel_name.trim();
match pub_sub.subscribe(channel_name) {
Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er);
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
},
Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name);
loop {
// pubsub check
if let Ok(msg) = pub_sub.get_message() {
// dbg!("ok on get message");
let payload : Result<String, _> = msg.get_payload();
match payload {
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
Ok(payload) => {
if let Some(remote) = parse_extern_config(&payload) {
match config_comparing(&local_config, &remote) {
ConfigActuality::Local => {
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
},
ConfigActuality::Remote => {
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
// to stop watching local config file mechanism
let _ = local_conf_tx.send(true);
let config_path = params.config.to_str().unwrap_or("settings.json");
if save_new_config(&remote, &config_path).is_err() {
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
return Err(anyhow::Error::msg(
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
))
}
return Ok(());
},
}
}
else {
warn!("Invalid config was pulled from Redis channel")
}
},
} }
} }
// delay
// dbg!("before sleep pubsub");
sleep(Duration::from_millis(500)).await;
} }
// pubsub check },
if let Ok(msg) = pub_sub.get_message() { }
// dbg!("ok on get message"); },
let payload : Result<String, _> = msg.get_payload(); None => {
match payload { warn!("Cannot validly connect Redis connection. Blocking task for 20 secs and restarting tries (attempt {})", retry);
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), sleep(Duration::from_secs(20)).await;
Ok(payload) => {
if let Some(remote) = parse_extern_config(&payload) {
match config_comparing(&_local_config, &remote) {
ConfigActuality::Local => {
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
},
ConfigActuality::Remote => {
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
// to stop watching local config file mechanism
let _ = local_conf_tx.send(true);
let config_path = params.config.to_str().unwrap_or("settings.json");
if save_new_config(&remote, &config_path).is_err() {
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
return Err(anyhow::Error::msg(
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
))
}
return Ok(());
},
}
}
else {
warn!("Invalid config was pulled from Redis channel")
}
},
}
}
// delay
// dbg!("before sleep pubsub");
sleep(Duration::from_millis(500)).await;
}
},
} }
}, }
None => Err(anyhow::Error::msg("Cannot create Redis connection"))
} }
error!("End of retries. Stopping pubsub...");
return Err(anyhow::Error::msg(
format!("End of retries. Stopping pubsub...")
))
} }
// //
@ -373,7 +373,7 @@ pub mod v2 {
to_local_tx: OneShotSender<bool> to_local_tx: OneShotSender<bool>
) -> Option<Processes> { ) -> Option<Processes> {
/* match awaits til channel*/ /* match awaits til channel*/
dbg!("start of cli"); // dbg!("start of cli");
match cli_oneshot.await { match cli_oneshot.await {
Ok(config_from_cli) => { Ok(config_from_cli) => {
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation); info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);