Compare commits
No commits in common. "163887d42c6f1414951ab9b9d74442967c10fb86" and "064611823ab1f81fa6319d20487cc9ceeb6711b5" have entirely different histories.
163887d42c
...
064611823a
|
|
@ -4,4 +4,3 @@
|
||||||
Cargo.lock
|
Cargo.lock
|
||||||
hagent_test.sock
|
hagent_test.sock
|
||||||
release
|
release
|
||||||
*.sock
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"dateOfCreation": "1721381809112",
|
"dateOfCreation": "1721381809110",
|
||||||
"configServer": "192.168.2.37",
|
"configServer": "localhost",
|
||||||
"processes": [
|
"processes": [
|
||||||
{
|
{
|
||||||
"name": "temp-process",
|
"name": "temp-process",
|
||||||
|
|
|
||||||
|
|
@ -36,12 +36,8 @@ async fn main() -> anyhow::Result<()>{
|
||||||
preboot.clone()
|
preboot.clone()
|
||||||
).await;
|
).await;
|
||||||
});
|
});
|
||||||
handler.push(config_module);
|
|
||||||
|
|
||||||
let cli_module = tokio::spawn(async move {
|
handler.push(config_module);
|
||||||
let _ = init_cli_pipeline().await;
|
|
||||||
});
|
|
||||||
handler.push(cli_module);
|
|
||||||
|
|
||||||
for i in handler {
|
for i in handler {
|
||||||
let _ = i.await;
|
let _ = i.await;
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use tokio::net::{TcpListener, TcpStream, UnixStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use anyhow::{Result as DynResult, Error};
|
use anyhow::{Result as DynResult, Error};
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use std::{borrow::BorrowMut, fs, net::{IpAddr, Ipv4Addr}};
|
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
|
||||||
// use std::io::BufReader;
|
// use std::io::BufReader;
|
||||||
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
|
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
|
||||||
use tokio::{io::AsyncReadExt, net::UnixListener};
|
|
||||||
use noxis_cli::Cli;
|
use noxis_cli::Cli;
|
||||||
use serde_json::from_str;
|
use serde_json::from_str;
|
||||||
|
|
||||||
|
|
@ -24,20 +23,21 @@ use serde_json::from_str;
|
||||||
///
|
///
|
||||||
pub async fn init_cli_pipeline() -> DynResult<()> {
|
pub async fn init_cli_pipeline() -> DynResult<()> {
|
||||||
match init_listener().await {
|
match init_listener().await {
|
||||||
Ok(list) => {
|
Some(list) => {
|
||||||
info!("Successfully opened UnixListener for CLI");
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok((socket, _)) = list.accept().await {
|
if let Ok((socket, addr)) = list.accept().await {
|
||||||
|
// isolation
|
||||||
|
if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() {
|
||||||
|
warn!("Declined attempt to connect TCP-socket from {}", addr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
process_connection(socket).await;
|
process_connection(socket).await;
|
||||||
}
|
}
|
||||||
sleep(Duration::from_millis(500)).await;
|
sleep(Duration::from_millis(500)).await;
|
||||||
}
|
}
|
||||||
// Ok(())
|
// Ok(())
|
||||||
},
|
},
|
||||||
Err(er) => {
|
None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use"))
|
||||||
error!("Failed to open UnixListener for CLI");
|
|
||||||
Err(er)
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -54,20 +54,17 @@ pub async fn init_cli_pipeline() -> DynResult<()> {
|
||||||
///
|
///
|
||||||
/// *depends on* : `tokio::net::TcpListener`
|
/// *depends on* : `tokio::net::TcpListener`
|
||||||
///
|
///
|
||||||
async fn init_listener() -> anyhow::Result<UnixListener> {
|
async fn init_listener() -> Option<TcpListener> {
|
||||||
// match TcpListener::bind("127.0.0.1:7753").await {
|
match TcpListener::bind("127.0.0.1:7753").await {
|
||||||
// Ok(listener) => {
|
Ok(listener) => {
|
||||||
// info!("Runner is listening localhost:7753");
|
info!("Runner is listening localhost:7753");
|
||||||
// Some(listener)
|
Some(listener)
|
||||||
// },
|
},
|
||||||
// Err(_) => {
|
Err(_) => {
|
||||||
// error!("Cannot create TCP listener for CLI");
|
error!("Cannot create TCP listener for CLI");
|
||||||
// None
|
None
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
let socket_path = "noxis-rs";
|
|
||||||
let _ = fs::remove_file(socket_path);
|
|
||||||
Ok(UnixListener::bind(socket_path)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # Fn `process_connection`
|
/// # Fn `process_connection`
|
||||||
|
|
@ -83,10 +80,11 @@ async fn init_listener() -> anyhow::Result<UnixListener> {
|
||||||
///
|
///
|
||||||
/// *depends on* : `tokio::net::TcpStream`
|
/// *depends on* : `tokio::net::TcpStream`
|
||||||
///
|
///
|
||||||
async fn process_connection(mut stream: UnixStream) {
|
async fn process_connection(mut stream: TcpStream) {
|
||||||
let buf_reader = BufReader::new(stream.borrow_mut());
|
let buf_reader = BufReader::new(stream.borrow_mut());
|
||||||
let mut rqst = buf_reader.lines();
|
let mut rqst = buf_reader.lines();
|
||||||
|
|
||||||
|
|
||||||
while let Ok(Some(line)) = rqst.next_line().await {
|
while let Ok(Some(line)) = rqst.next_line().await {
|
||||||
if line.is_empty() {
|
if line.is_empty() {
|
||||||
break
|
break
|
||||||
|
|
@ -103,6 +101,6 @@ async fn process_connection(mut stream: UnixStream) {
|
||||||
println!("{}", line);
|
println!("{}", line);
|
||||||
}
|
}
|
||||||
|
|
||||||
let response = "OK";
|
let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!";
|
||||||
stream.write_all(response.as_bytes()).await.unwrap();
|
stream.write_all(response.as_bytes()).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -119,6 +119,12 @@ pub mod v2 {
|
||||||
let _ = restart_main_thread();
|
let _ = restart_main_thread();
|
||||||
},
|
},
|
||||||
cli_config_option = cli_future => {
|
cli_config_option = cli_future => {
|
||||||
|
// match cli_config_option {
|
||||||
|
// Some(config) => {},
|
||||||
|
// None => {
|
||||||
|
// error!("Cli pulling new config mechanism crushed, restarting ...")
|
||||||
|
// },
|
||||||
|
// }
|
||||||
match cli_config_option {
|
match cli_config_option {
|
||||||
Err(_) => error!("Cli pulling new config mechanism crushed, restarting ..."),
|
Err(_) => error!("Cli pulling new config mechanism crushed, restarting ..."),
|
||||||
Ok(option_config) => {
|
Ok(option_config) => {
|
||||||
|
|
@ -138,20 +144,22 @@ pub mod v2 {
|
||||||
// TODO! futures + select! [OK]
|
// TODO! futures + select! [OK]
|
||||||
// TODO! tests config
|
// TODO! tests config
|
||||||
}
|
}
|
||||||
pub async fn get_redis_connection(params: &str) -> Option<Connection> {
|
pub async fn get_redis_connection(params: Arc<PrebootParams>) -> Option<Connection> {
|
||||||
for i in 1..=3 {
|
if params.no_sub {
|
||||||
let redis_url = format!("redis://{}/", params);
|
return None;
|
||||||
info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i);
|
}
|
||||||
if let Ok(client) = Client::open(redis_url) {
|
let mut connection_delay: u64 = 1;
|
||||||
|
loop {
|
||||||
|
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||||
if let Ok(conn) = client.get_connection() {
|
if let Ok(conn) = client.get_connection() {
|
||||||
info!("Successfully opened Redis connection");
|
info!("Successfully opened Redis connection");
|
||||||
return Some(conn);
|
return Some(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
error!("Error with subscribing Redis stream on update. Retrying in 5 secs...");
|
error!("Error with subscribing Redis stream on update. Retrying in {} secs...", connection_delay);
|
||||||
sleep(Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(connection_delay)).await;
|
||||||
|
connection_delay *= 2;
|
||||||
}
|
}
|
||||||
None
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// loop checking redis pubsub
|
// loop checking redis pubsub
|
||||||
|
|
@ -163,24 +171,9 @@ pub mod v2 {
|
||||||
) -> anyhow::Result<()>{
|
) -> anyhow::Result<()>{
|
||||||
/*...*/
|
/*...*/
|
||||||
// dbg!("start of pb");
|
// dbg!("start of pb");
|
||||||
sleep(Duration::from_secs(1)).await;
|
|
||||||
|
|
||||||
let mut tx_brd_local = tx_brd_local;
|
let mut tx_brd_local = tx_brd_local;
|
||||||
let mut local_config = Processes::default();
|
let mut _local_config = Processes::default();
|
||||||
|
return match get_redis_connection(params.clone()).await {
|
||||||
for retry in 1..=5 {
|
|
||||||
if !tx_brd_local.is_empty() {
|
|
||||||
match tx_brd_local.recv().await {
|
|
||||||
Ok(lc) => local_config = lc,
|
|
||||||
Err(er) => {
|
|
||||||
error!("Cannot get imported local config due to {}", &er);
|
|
||||||
return Err(anyhow::Error::msg(
|
|
||||||
format!("Cannot get imported local config due to {}", er))
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
match get_redis_connection(&local_config.config_server).await {
|
|
||||||
Some(mut conn) => {
|
Some(mut conn) => {
|
||||||
//
|
//
|
||||||
let mut pub_sub = conn.as_pubsub();
|
let mut pub_sub = conn.as_pubsub();
|
||||||
|
|
@ -189,11 +182,26 @@ pub mod v2 {
|
||||||
match pub_sub.subscribe(channel_name) {
|
match pub_sub.subscribe(channel_name) {
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
error!("Cannot subscribe pubsub channel due to {}", &er);
|
error!("Cannot subscribe pubsub channel due to {}", &er);
|
||||||
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
|
Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
|
||||||
},
|
},
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!("Successfully subscribed to {} pubsub channel", channel_name);
|
info!("Successfully subscribed to {} pubsub channel", channel_name);
|
||||||
loop {
|
loop {
|
||||||
|
// brd check
|
||||||
|
// if let Ok(new_lc) = tx_brd_local.recv().await {
|
||||||
|
|
||||||
|
// }
|
||||||
|
if !tx_brd_local.is_empty() {
|
||||||
|
match tx_brd_local.recv().await {
|
||||||
|
Ok(lc) => _local_config = lc,
|
||||||
|
Err(er) => {
|
||||||
|
error!("Cannot get imported local config due to {}", &er);
|
||||||
|
return Err(anyhow::Error::msg(
|
||||||
|
format!("Cannot get imported local config due to {}", er))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// pubsub check
|
// pubsub check
|
||||||
if let Ok(msg) = pub_sub.get_message() {
|
if let Ok(msg) = pub_sub.get_message() {
|
||||||
// dbg!("ok on get message");
|
// dbg!("ok on get message");
|
||||||
|
|
@ -202,7 +210,7 @@ pub mod v2 {
|
||||||
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
|
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
|
||||||
Ok(payload) => {
|
Ok(payload) => {
|
||||||
if let Some(remote) = parse_extern_config(&payload) {
|
if let Some(remote) = parse_extern_config(&payload) {
|
||||||
match config_comparing(&local_config, &remote) {
|
match config_comparing(&_local_config, &remote) {
|
||||||
ConfigActuality::Local => {
|
ConfigActuality::Local => {
|
||||||
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
|
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
|
||||||
},
|
},
|
||||||
|
|
@ -235,17 +243,9 @@ pub mod v2 {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
None => {
|
None => Err(anyhow::Error::msg("Cannot create Redis connection"))
|
||||||
warn!("Cannot validly connect Redis connection. Blocking task for 20 secs and restarting tries (attempt {})", retry);
|
|
||||||
sleep(Duration::from_secs(20)).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
error!("End of retries. Stopping pubsub...");
|
|
||||||
return Err(anyhow::Error::msg(
|
|
||||||
format!("End of retries. Stopping pubsub...")
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
//
|
||||||
async fn local_config_reciever(
|
async fn local_config_reciever(
|
||||||
|
|
@ -373,7 +373,7 @@ pub mod v2 {
|
||||||
to_local_tx: OneShotSender<bool>
|
to_local_tx: OneShotSender<bool>
|
||||||
) -> Option<Processes> {
|
) -> Option<Processes> {
|
||||||
/* match awaits til channel*/
|
/* match awaits til channel*/
|
||||||
// dbg!("start of cli");
|
dbg!("start of cli");
|
||||||
match cli_oneshot.await {
|
match cli_oneshot.await {
|
||||||
Ok(config_from_cli) => {
|
Ok(config_from_cli) => {
|
||||||
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
|
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue