diff --git a/Cargo.toml b/Cargo.toml index 5b477a0..645c6e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ resolver = "2" members = [ "noxis-rs", - "noxis-cli", + "noxis-cli", "noxis-proxy", ] [profile.dev] diff --git a/noxis-proxy/.env.example b/noxis-proxy/.env.example new file mode 100644 index 0000000..e67257a --- /dev/null +++ b/noxis-proxy/.env.example @@ -0,0 +1,2 @@ +NOXIS_SOCKET_PATH = "/path/to/noxis.sock" +NOXIS_PROXY_PORT = "numport" \ No newline at end of file diff --git a/noxis-proxy/Cargo.toml b/noxis-proxy/Cargo.toml new file mode 100644 index 0000000..75dd591 --- /dev/null +++ b/noxis-proxy/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "noxis-proxy" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0.98" +axum = { version = "0.8.4", features = ["ws"] } +dotenv = "0.15.0" +tokio = { version = "1.45.1", features = ["full"] } +tracing = "0.1.41" +tracing-subscriber = "0.3.19" diff --git a/noxis-proxy/src/main.rs b/noxis-proxy/src/main.rs new file mode 100644 index 0000000..22e4237 --- /dev/null +++ b/noxis-proxy/src/main.rs @@ -0,0 +1,96 @@ +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + State, + }, + response::IntoResponse, + routing::get, + Router, +}; +use std::{ + path::PathBuf, str::FromStr, +}; +use tokio::net::UnixStream; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[derive(Clone)] +struct AppState { + socket_path: PathBuf, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + dotenv::dotenv().ok(); + + tracing_subscriber::fmt() + .with_max_level(tracing::Level::from_str(&std::env::var("NOXIS_LOG_LEVEL").unwrap_or_else(|_| String::from("INFO"))).unwrap_or_else(|_| tracing::Level::INFO)) + .with_writer(std::io::stdout) + .compact() + .init(); + + let app_state = AppState { + socket_path : PathBuf::new().join(std::env::var("NOXIS_SOCKET_PATH").unwrap_or_else(|_| String::from("./noxis.sock"))) + }; + let app = Router::new() + .route("/ws", get(ws_handler)) + .route("/hello", get(hello)) + .with_state(app_state); + + let bind = format!("0.0.0.0:{}", std::env::var("NOXIS_PROXY_PORT").unwrap_or_else(|_| String::from("3000"))); + + tracing::info!("Serving on {}", &bind); + + let listener = tokio::net::TcpListener::bind(bind) + .await?; + axum::serve(listener, app).await?; + Ok(()) +} + +async fn ws_handler( + ws: WebSocketUpgrade, + State(state): State, +) -> impl IntoResponse { + tracing::info!("New WebSocket connection"); + ws.on_upgrade(|socket| handle_socket(socket, state)) +} + +async fn hello( + State(_state): State, +) -> impl IntoResponse { + String::from("HELLO") +} + +async fn handle_socket(mut ws: WebSocket, state: AppState) { + tracing::info!("handle websocket"); + + let ws_receiver = tokio::spawn(async move { + while let Some(Ok(msg)) = ws.recv().await { + let mut unix_socket = match UnixStream::connect(&state.socket_path).await { + Ok(socket) => socket, + Err(e) => { + eprintln!("Failed to connect to Unix socket: {}", e); + let _ = ws.send(Message::Text("ERROR: Unix socket connection failed".into())).await; + return; + } + }; + if let Message::Text(text) = msg { + if let Err(e) = unix_socket.write_all(text.as_bytes()).await { + eprintln!("Failed to write to Unix socket: {}", e); + break; + } + let mut buf = Vec::new(); + match unix_socket.read_to_end(&mut buf).await { + Ok(n) if n > 0 => { + let response = String::from_utf8_lossy(&buf[..n]); + if ws.send(Message::Text(response.into_owned().into())).await.is_err() { + break; + } + } + Ok(_) | Err(_) => break, + } + } + } + }); + + let _ = ws_receiver.await; +} diff --git a/noxis-rs/.env.example b/noxis-rs/.env.example index 8efbd9b..1909961 100644 --- a/noxis-rs/.env.example +++ b/noxis-rs/.env.example @@ -11,4 +11,6 @@ NOXIS_REMOTE_SERVER_URL = "ip.ip.ip.ip:port" NOXIS_CONFIG_PATH = "./settings.json" NOXIS_METRICS_MODE = "full" NOXIS_SOCKET_PATH = "/path/to/noxis.sock" +NOXIS_BACKUP_FOLDER = "/path/to/backups/folder" + NOXIS_MAX_LOG_LEVEL = "TRACE" \ No newline at end of file diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 73ed76b..086e0f1 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -22,3 +22,4 @@ futures = "0.3.31" async-trait = "0.1.88" crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } lazy_static = "1.5.0" +ulid = "1.2.1" diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index 263600a..7c74d01 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -11,7 +11,35 @@ "src": "./tests/examples/", "triggers": { "onDelete": "stop", - "onChange": "restart" + "onChange": "restart", + "doRestore" : true + } + }, + { + "filename": "none.json", + "src": "./tests/examples/", + "triggers": { + "onDelete": "stop", + "onChange": "restart", + "doRestore" : false + } + }, + { + "filename": "invalid_config.json", + "src": "./tests/examples/", + "triggers": { + "onDelete": "stop", + "onChange": "restart", + "doRestore" : false + } + }, + { + "filename": "save-conf.json", + "src": "./tests/examples/", + "triggers": { + "onDelete": "stop", + "onChange": "restart", + "doRestore" : true } } ], @@ -23,6 +51,14 @@ "wait": 2, "onLost": "stop" } + }, + { + "hostname": "8.8.8.8", + "port": 443, + "triggers": { + "wait": 2, + "onLost": "stop" + } } ] } diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 5e92b4f..c39cbef 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -108,6 +108,7 @@ async fn main() -> anyhow::Result<()> { handler.push(ctrlc); let tx_bus = tx_to_bus.clone(); + let preboot_cli = preboot.clone(); let monitoring = tokio::spawn(async move { let config = { let mut tick = tokio::time::interval(Duration::from_millis(500)); @@ -119,7 +120,7 @@ async fn main() -> anyhow::Result<()> { }; } }; - if let Err(er) = init_monitoring(config, rx_to_supervisor, tx_bus).await { + if let Err(er) = init_monitoring(config, preboot_cli, rx_to_supervisor, tx_bus).await { error!("Monitoring mod failed due to {}", er); } }); diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 47a9256..22eab0b 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -189,6 +189,7 @@ pub struct PrebootParams { pub config: PathBuf, pub metrics: MetricsPrebootParams, pub self_socket: PathBuf, + pub backup_folder: PathBuf, } /// # implementation for `MetricsPrebootParams` @@ -267,6 +268,29 @@ impl PrebootParams { } } }, + backup_folder: { + match var("NOXIS_BACKUP_FOLDER") { + Ok(val) => { + let path = PathBuf::from(val); + if path.exists() && path.is_dir() { + path + } else { + PathBuf::from(std::env::current_dir() + .expect("Crushed on getting current_dir path. Check fs state!") + ) + } + }, + Err(_) => { + let default = std::env::current_dir() + .expect("Crushed on getting current_dir path. Check fs state!"); + warn!( + "$NOXIS_BACKUP_FOLDER wans't set. Default value - {}", + default.display() + ); + PathBuf::from(default) + } + } + }, } } } diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 0d710e2..46c3c23 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -470,6 +470,8 @@ pub struct FileTriggers { pub on_delete: String, #[serde(rename = "onChange")] pub on_change: String, + #[serde(rename = "doRestore")] + pub do_restore: bool, } /// # Metrics struct diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 86f29ab..385e527 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -26,6 +26,7 @@ lazy_static! { pub mod v2 { use super::*; + use crate::options::preboot::PrebootParams; use crate::utils::metrics::processes::{ProcessesAll, ProcessesQuery}; use crate::{ options::structs::{ @@ -65,10 +66,14 @@ pub mod v2 { bus: (bus_reciever, bus_sender), } } - pub async fn with_config(mut self, config: Processes) -> Supervisor { + pub async fn with_config( + mut self, + config: Processes, + preboot : Arc + ) -> Supervisor { self.config = Arc::from(config); let _ = self.config.processes.iter().for_each(|prc| { - let (rx, tx) = mpsc::channel::(10); + let (rx, tx) = mpsc::channel::(100); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); if !self.prcs.contains(&temp) { self.prcs.push_back(temp); @@ -84,8 +89,21 @@ pub mod v2 { }; hm.insert(proc_name.clone(), (triggers, rx.clone())); + let backup_file = { + if file.triggers.do_restore { + use ulid::Ulid; + format!("{}{}.bak", { + let path = preboot.backup_folder.to_string_lossy(); + if path.ends_with("/") { path.to_string() } + else { format!("{}/", path) } + }, Ulid::new()) + } else { + String::new() + } + }; + let tempfile = - FilesController::new(&file.filename.as_str(), hm).with_path(&file.src); + FilesController::new(&file.filename.as_str(), hm).with_path(&file.src, backup_file); if let Ok(file) = tempfile { if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) { @@ -164,6 +182,7 @@ pub mod v2 { name: file.filename.to_string(), path: file.src.to_string(), status: file_cont.get_state(), + backup_file : file_cont.get_backup_file(), triggers: file.triggers.to_owned(), }); } @@ -351,11 +370,12 @@ pub mod v2 { pub async fn init_monitoring( config: Processes, + preboot : Arc, bus_reciever: BusReciever, bus_sender: BusSender, ) -> anyhow::Result<()> { let mut supervisor = Supervisor::new(bus_reciever, bus_sender) - .with_config(config) + .with_config(config, preboot) .await; info!("Monitoring: {} ", &supervisor.get_stats()); supervisor.process().await; diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index 9309cc1..dcbf93e 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -29,6 +29,7 @@ pub mod v2 { name: Arc, path: String, code_name: Arc, + backup_file : String, state: FileState, watcher: Option, triggers: EventHandlers, @@ -51,10 +52,11 @@ pub mod v2 { watcher: None, triggers, code_name: name.clone(), + backup_file: String::new(), } } #[inline(always)] - pub fn with_path(mut self, path: impl AsRef) -> anyhow::Result { + pub fn with_path(mut self, path: impl AsRef, backup : String) -> anyhow::Result { self.path = path.as_ref().to_string_lossy().into_owned(); self.watcher = { match create_watcher(&self.name, &self.path) { @@ -69,6 +71,11 @@ pub mod v2 { } }; self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name)); + self.backup_file = backup; + match create_backup(&self.code_name, &self.backup_file) { + Ok(_) => info!("Backup file for {} was created ({})", &self.code_name, &self.backup_file), + Err(er) => warn!("{}. Ignoring ...", er), + } Ok(self) } pub fn add_event(&mut self, file_controller: FilesController) { @@ -97,6 +104,9 @@ pub mod v2 { pub fn get_code_name(&self) -> Arc { self.code_name.clone() } + pub fn get_backup_file(&self) -> String { + self.backup_file.to_string() + } } #[async_trait] impl ProcessUnit for FilesController { @@ -120,9 +130,24 @@ pub mod v2 { a || mask.mask == EventMask::DELETE_SELF, b || mask.mask == EventMask::MODIFY, ) - }); - if let (recreate_watcher, true) = (need_to_recreate, was_modifired) { - warn!("File {} ({}) was changed", self.name, &self.path); + } + ); + if self.backup_file.is_empty() { + + } else { + + } + if let (mut recreate_watcher, true) = (need_to_recreate, was_modifired) { + if self.backup_file.is_empty() { + warn!("File {} ({}) was changed", self.name, &self.path); + self.trigger_on(Some(FileTriggerType::OnChange)).await; + } else { + recreate_watcher = true; + match restore_file(&self.code_name, &self.backup_file).await { + Ok(_) => info!("File {} was successfully restored", &self.code_name), + Err(er) => error!("Cannot restore file {} : {}", &self.code_name, er), + } + } if recreate_watcher { self.watcher = match create_watcher(&self.name, &self.path) { Ok(notifier) => Some(notifier), @@ -135,21 +160,42 @@ pub mod v2 { } } } - self.trigger_on(Some(FileTriggerType::OnChange)).await; - return; } } } - None => { /* DEAD END */ } + None => return, } } else { if let FileState::Ok = self.state { - warn!( + if self.backup_file.is_empty() { + warn!( "File {} ({}) was not found in determined scope", self.name, &self.path - ); - self.state = FileState::NotFound; - self.trigger_on(Some(FileTriggerType::OnDelete)).await; + ); + self.state = FileState::NotFound; + self.trigger_on(Some(FileTriggerType::OnDelete)).await; + } else { + warn!( + "File {} ({}) was not found in determined scope. Restoring from backup-file ...", + self.name, &self.path + ); + match restore_file(&self.code_name, &self.backup_file).await { + Err(er) => error!("Cannot restore file {} : {}", &self.code_name, er), + Ok(_) => { + info!("File {} was successfully restored", &self.code_name); + self.watcher = match create_watcher(&self.name, &self.path) { + Ok(notifier) => Some(notifier), + Err(er) => { + error!( + "Failed to recreate watcher for {} ({}) : {}", + self.name, &self.path, er + ); + None + } + } + }, + } + } } return; } @@ -158,6 +204,18 @@ pub mod v2 { } } +pub fn create_backup(target: &str, backup: &str) -> anyhow::Result { + return if !backup.is_empty() { + Ok(std::fs::copy(target, backup)?) + } else { + Err(anyhow::Error::msg(format!("No need to create backup-file for {}", target))) + } +} + +pub async fn restore_file(target: &str, backup: &str) -> anyhow::Result { + Ok(tokio::fs::copy(backup, target).await?) +} + /// # Fn `create_watcher` /// ## for creating watcher on file's delete | update events /// diff --git a/noxis-rs/src/utils/metrics.rs b/noxis-rs/src/utils/metrics.rs index 07c77d8..9147938 100644 --- a/noxis-rs/src/utils/metrics.rs +++ b/noxis-rs/src/utils/metrics.rs @@ -326,6 +326,7 @@ pub mod processes { pub name: String, pub path: String, pub status: FileState, + pub backup_file : String, pub triggers: FileTriggers, } #[derive(Debug, serde::Serialize)] @@ -496,6 +497,8 @@ pub struct ProcessExtended { name: String, status: ProcessState, pid: Pid, + start_time: String, + duration: String, dependencies: processes::deps::Dependencies, cpu_usage: f32, ram_usage: u64, @@ -509,10 +512,31 @@ impl ProcessExtended { system.refresh_processes(sysinfo::ProcessesToUpdate::All, true); return if let Some(prc) = system.process(proc.pid.new_sysinfo_pid()) { let disk_usage = prc.disk_usage(); + let duration = chrono::Duration::new(prc.run_time() as i64, 0); + let start_time = chrono::DateTime::from_timestamp(prc.start_time() as i64, 0); Self { name: proc.name, status: proc.state, pid: proc.pid, + start_time : { + match start_time { + Some(date) => date.to_string(), + None => String::new() + } + }, + duration: { + match duration { + Some(duration) => { + format!("{}:{}:{}:{}", + duration.num_days(), + duration.num_hours(), + duration.num_minutes(), + duration.num_seconds() + ) + }, + None => String::new() + } + }, dependencies: proc.dependencies, cpu_usage: prc.cpu_usage(), ram_usage: prc.memory(), @@ -525,6 +549,8 @@ impl ProcessExtended { name: proc.name, status: proc.state, pid: proc.pid, + start_time : String::new(), + duration: String::new(), dependencies: proc.dependencies, cpu_usage: 0.0, ram_usage: 0, diff --git a/noxis-rs/tests/examples/none.json b/noxis-rs/tests/examples/none.json index e6c4897..224fb83 100644 --- a/noxis-rs/tests/examples/none.json +++ b/noxis-rs/tests/examples/none.json @@ -1,5 +1,7 @@ { "dateOfCreation": "1", - "configServer": "", + "processes": [] +} +, "processes": [] } diff --git a/noxis-rs/tests/examples/save-conf.json b/noxis-rs/tests/examples/save-conf.json index e6c4897..224fb83 100644 --- a/noxis-rs/tests/examples/save-conf.json +++ b/noxis-rs/tests/examples/save-conf.json @@ -1,5 +1,7 @@ { "dateOfCreation": "1", - "configServer": "", + "processes": [] +} +, "processes": [] } diff --git a/noxis-rs/tests/examples/settings.json b/noxis-rs/tests/examples/settings.json index 8c80261..324275d 100644 --- a/noxis-rs/tests/examples/settings.json +++ b/noxis-rs/tests/examples/settings.json @@ -1,6 +1,5 @@ { "dateOfCreation": "1721381809103", - "configServer" : "localhost", "processes": [ { "name": "temp-process", @@ -12,7 +11,8 @@ "src": "/home/vladislav/web/runner-rs/examples/", "triggers": { "onDelete": "hold", - "onChange": "stop" + "onChange": "stop", + "doRestore" : true } } ],