monitor/noxis-rs/src/options/cli_pipeline.rs

89 lines
2.6 KiB
Rust

use log::{error, info};
use tokio::net::{ UnixStream, UnixListener };
use tokio::time::{sleep, Duration};
use std::fs;
use tokio::io::{ AsyncWriteExt, AsyncReadExt};
use noxis_cli::Cli;
/// # Fn `init_cli_pipeline`
/// ## for catching all input requests from CLI
///
/// *input* : -
///
/// *output* : `anyhow::Result<()>` to wrap errors
///
/// *initiator* : fn `main`
///
/// *managing* : `TcpListener` object to handle requests
///
/// *depends on* : -
///
pub async fn init_cli_pipeline() -> anyhow::Result<()> {
let socket_path = "noxis.sock";
let _ = fs::remove_file(socket_path);
match UnixListener::bind(socket_path) {
Ok(list) => {
// TODO: remove `unwrap`s
info!("Listening on {}", socket_path);
loop {
match list.accept().await {
Ok((socket, _)) => {
// tokio::spawn();
process_connection(socket).await;
},
Err(er) => {
error!("Cannot poll connection to CLI due to {}", er);
sleep(Duration::from_millis(300)).await;
},
}
}
// Ok(())
},
Err(er) => {
error!("Failed to open UnixListener for CLI");
Err(er.into())
},
}
}
/// # Fn `process_connection`
/// ## for processing input CLI requests
///
/// *input* : mut stream: `TcpStream`
///
/// *output* : -
///
/// *initiator* : fn `init_cli_pipeline`
///
/// *managing* : mutable object of `TcpStream`
///
/// *depends on* : `tokio::net::TcpStream`
///
async fn process_connection(mut stream: UnixStream) {
let mut buf = vec![0; 1024];
match stream.read(&mut buf).await {
Ok(0) => {
info!("Client disconnected ");
},
Ok(n) => {
buf.truncate(n);
info!("CLI have sent {} bytes", n);
match serde_json::from_slice::<Cli>(&buf) {
Ok(cli) => {
info!("Received CLI request: {:?}", cli);
let response = "OK";
if let Err(e) = stream.write_all(response.as_bytes()).await {
error!("Failed to send response: {}", e);
}
}
Err(e) => {
error!("Failed to parse CLI request: {}", e);
}
}
},
Err(e) => error!("Failed to read from socket: {}", e),
}
let _ = stream.shutdown().await;
}