Merge pull request #15 from prplV/feature/prplV/prc-stat-time

Feature/prpl v/prc stat time
migrate
Vladislav Drozdov 2025-06-05 16:59:21 +03:00 committed by GitHub
commit 0634ecd0d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 306 additions and 22 deletions

View File

@ -2,7 +2,7 @@
resolver = "2" resolver = "2"
members = [ members = [
"noxis-rs", "noxis-rs",
"noxis-cli", "noxis-cli", "noxis-proxy",
] ]
[profile.dev] [profile.dev]

2
noxis-proxy/.env.example Normal file
View File

@ -0,0 +1,2 @@
NOXIS_SOCKET_PATH = "/path/to/noxis.sock"
NOXIS_PROXY_PORT = "numport"

12
noxis-proxy/Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "noxis-proxy"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.98"
axum = { version = "0.8.4", features = ["ws"] }
dotenv = "0.15.0"
tokio = { version = "1.45.1", features = ["full"] }
tracing = "0.1.41"
tracing-subscriber = "0.3.19"

96
noxis-proxy/src/main.rs Normal file
View File

@ -0,0 +1,96 @@
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
routing::get,
Router,
};
use std::{
path::PathBuf, str::FromStr,
};
use tokio::net::UnixStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Clone)]
struct AppState {
socket_path: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::from_str(&std::env::var("NOXIS_LOG_LEVEL").unwrap_or_else(|_| String::from("INFO"))).unwrap_or_else(|_| tracing::Level::INFO))
.with_writer(std::io::stdout)
.compact()
.init();
let app_state = AppState {
socket_path : PathBuf::new().join(std::env::var("NOXIS_SOCKET_PATH").unwrap_or_else(|_| String::from("./noxis.sock")))
};
let app = Router::new()
.route("/ws", get(ws_handler))
.route("/hello", get(hello))
.with_state(app_state);
let bind = format!("0.0.0.0:{}", std::env::var("NOXIS_PROXY_PORT").unwrap_or_else(|_| String::from("3000")));
tracing::info!("Serving on {}", &bind);
let listener = tokio::net::TcpListener::bind(bind)
.await?;
axum::serve(listener, app).await?;
Ok(())
}
async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
) -> impl IntoResponse {
tracing::info!("New WebSocket connection");
ws.on_upgrade(|socket| handle_socket(socket, state))
}
async fn hello(
State(_state): State<AppState>,
) -> impl IntoResponse {
String::from("HELLO")
}
async fn handle_socket(mut ws: WebSocket, state: AppState) {
tracing::info!("handle websocket");
let ws_receiver = tokio::spawn(async move {
while let Some(Ok(msg)) = ws.recv().await {
let mut unix_socket = match UnixStream::connect(&state.socket_path).await {
Ok(socket) => socket,
Err(e) => {
eprintln!("Failed to connect to Unix socket: {}", e);
let _ = ws.send(Message::Text("ERROR: Unix socket connection failed".into())).await;
return;
}
};
if let Message::Text(text) = msg {
if let Err(e) = unix_socket.write_all(text.as_bytes()).await {
eprintln!("Failed to write to Unix socket: {}", e);
break;
}
let mut buf = Vec::new();
match unix_socket.read_to_end(&mut buf).await {
Ok(n) if n > 0 => {
let response = String::from_utf8_lossy(&buf[..n]);
if ws.send(Message::Text(response.into_owned().into())).await.is_err() {
break;
}
}
Ok(_) | Err(_) => break,
}
}
}
});
let _ = ws_receiver.await;
}

View File

@ -11,4 +11,6 @@ NOXIS_REMOTE_SERVER_URL = "ip.ip.ip.ip:port"
NOXIS_CONFIG_PATH = "./settings.json" NOXIS_CONFIG_PATH = "./settings.json"
NOXIS_METRICS_MODE = "full" NOXIS_METRICS_MODE = "full"
NOXIS_SOCKET_PATH = "/path/to/noxis.sock" NOXIS_SOCKET_PATH = "/path/to/noxis.sock"
NOXIS_BACKUP_FOLDER = "/path/to/backups/folder"
NOXIS_MAX_LOG_LEVEL = "TRACE" NOXIS_MAX_LOG_LEVEL = "TRACE"

View File

@ -22,3 +22,4 @@ futures = "0.3.31"
async-trait = "0.1.88" async-trait = "0.1.88"
crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }
lazy_static = "1.5.0" lazy_static = "1.5.0"
ulid = "1.2.1"

View File

@ -11,7 +11,35 @@
"src": "./tests/examples/", "src": "./tests/examples/",
"triggers": { "triggers": {
"onDelete": "stop", "onDelete": "stop",
"onChange": "restart" "onChange": "restart",
"doRestore" : true
}
},
{
"filename": "none.json",
"src": "./tests/examples/",
"triggers": {
"onDelete": "stop",
"onChange": "restart",
"doRestore" : false
}
},
{
"filename": "invalid_config.json",
"src": "./tests/examples/",
"triggers": {
"onDelete": "stop",
"onChange": "restart",
"doRestore" : false
}
},
{
"filename": "save-conf.json",
"src": "./tests/examples/",
"triggers": {
"onDelete": "stop",
"onChange": "restart",
"doRestore" : true
} }
} }
], ],
@ -23,6 +51,14 @@
"wait": 2, "wait": 2,
"onLost": "stop" "onLost": "stop"
} }
},
{
"hostname": "8.8.8.8",
"port": 443,
"triggers": {
"wait": 2,
"onLost": "stop"
}
} }
] ]
} }

View File

@ -108,6 +108,7 @@ async fn main() -> anyhow::Result<()> {
handler.push(ctrlc); handler.push(ctrlc);
let tx_bus = tx_to_bus.clone(); let tx_bus = tx_to_bus.clone();
let preboot_cli = preboot.clone();
let monitoring = tokio::spawn(async move { let monitoring = tokio::spawn(async move {
let config = { let config = {
let mut tick = tokio::time::interval(Duration::from_millis(500)); let mut tick = tokio::time::interval(Duration::from_millis(500));
@ -119,7 +120,7 @@ async fn main() -> anyhow::Result<()> {
}; };
} }
}; };
if let Err(er) = init_monitoring(config, rx_to_supervisor, tx_bus).await { if let Err(er) = init_monitoring(config, preboot_cli, rx_to_supervisor, tx_bus).await {
error!("Monitoring mod failed due to {}", er); error!("Monitoring mod failed due to {}", er);
} }
}); });

View File

@ -189,6 +189,7 @@ pub struct PrebootParams {
pub config: PathBuf, pub config: PathBuf,
pub metrics: MetricsPrebootParams, pub metrics: MetricsPrebootParams,
pub self_socket: PathBuf, pub self_socket: PathBuf,
pub backup_folder: PathBuf,
} }
/// # implementation for `MetricsPrebootParams` /// # implementation for `MetricsPrebootParams`
@ -267,6 +268,29 @@ impl PrebootParams {
} }
} }
}, },
backup_folder: {
match var("NOXIS_BACKUP_FOLDER") {
Ok(val) => {
let path = PathBuf::from(val);
if path.exists() && path.is_dir() {
path
} else {
PathBuf::from(std::env::current_dir()
.expect("Crushed on getting current_dir path. Check fs state!")
)
}
},
Err(_) => {
let default = std::env::current_dir()
.expect("Crushed on getting current_dir path. Check fs state!");
warn!(
"$NOXIS_BACKUP_FOLDER wans't set. Default value - {}",
default.display()
);
PathBuf::from(default)
}
}
},
} }
} }
} }

View File

@ -470,6 +470,8 @@ pub struct FileTriggers {
pub on_delete: String, pub on_delete: String,
#[serde(rename = "onChange")] #[serde(rename = "onChange")]
pub on_change: String, pub on_change: String,
#[serde(rename = "doRestore")]
pub do_restore: bool,
} }
/// # Metrics struct /// # Metrics struct

View File

@ -26,6 +26,7 @@ lazy_static! {
pub mod v2 { pub mod v2 {
use super::*; use super::*;
use crate::options::preboot::PrebootParams;
use crate::utils::metrics::processes::{ProcessesAll, ProcessesQuery}; use crate::utils::metrics::processes::{ProcessesAll, ProcessesQuery};
use crate::{ use crate::{
options::structs::{ options::structs::{
@ -65,10 +66,14 @@ pub mod v2 {
bus: (bus_reciever, bus_sender), bus: (bus_reciever, bus_sender),
} }
} }
pub async fn with_config(mut self, config: Processes) -> Supervisor { pub async fn with_config(
mut self,
config: Processes,
preboot : Arc<PrebootParams>
) -> Supervisor {
self.config = Arc::from(config); self.config = Arc::from(config);
let _ = self.config.processes.iter().for_each(|prc| { let _ = self.config.processes.iter().for_each(|prc| {
let (rx, tx) = mpsc::channel::<Events>(10); let (rx, tx) = mpsc::channel::<Events>(100);
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
if !self.prcs.contains(&temp) { if !self.prcs.contains(&temp) {
self.prcs.push_back(temp); self.prcs.push_back(temp);
@ -84,8 +89,21 @@ pub mod v2 {
}; };
hm.insert(proc_name.clone(), (triggers, rx.clone())); hm.insert(proc_name.clone(), (triggers, rx.clone()));
let backup_file = {
if file.triggers.do_restore {
use ulid::Ulid;
format!("{}{}.bak", {
let path = preboot.backup_folder.to_string_lossy();
if path.ends_with("/") { path.to_string() }
else { format!("{}/", path) }
}, Ulid::new())
} else {
String::new()
}
};
let tempfile = let tempfile =
FilesController::new(&file.filename.as_str(), hm).with_path(&file.src); FilesController::new(&file.filename.as_str(), hm).with_path(&file.src, backup_file);
if let Ok(file) = tempfile { if let Ok(file) = tempfile {
if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) { if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) {
@ -164,6 +182,7 @@ pub mod v2 {
name: file.filename.to_string(), name: file.filename.to_string(),
path: file.src.to_string(), path: file.src.to_string(),
status: file_cont.get_state(), status: file_cont.get_state(),
backup_file : file_cont.get_backup_file(),
triggers: file.triggers.to_owned(), triggers: file.triggers.to_owned(),
}); });
} }
@ -351,11 +370,12 @@ pub mod v2 {
pub async fn init_monitoring( pub async fn init_monitoring(
config: Processes, config: Processes,
preboot : Arc<PrebootParams>,
bus_reciever: BusReciever, bus_reciever: BusReciever,
bus_sender: BusSender, bus_sender: BusSender,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut supervisor = Supervisor::new(bus_reciever, bus_sender) let mut supervisor = Supervisor::new(bus_reciever, bus_sender)
.with_config(config) .with_config(config, preboot)
.await; .await;
info!("Monitoring: {} ", &supervisor.get_stats()); info!("Monitoring: {} ", &supervisor.get_stats());
supervisor.process().await; supervisor.process().await;

View File

@ -29,6 +29,7 @@ pub mod v2 {
name: Arc<str>, name: Arc<str>,
path: String, path: String,
code_name: Arc<str>, code_name: Arc<str>,
backup_file : String,
state: FileState, state: FileState,
watcher: Option<Inotify>, watcher: Option<Inotify>,
triggers: EventHandlers, triggers: EventHandlers,
@ -51,10 +52,11 @@ pub mod v2 {
watcher: None, watcher: None,
triggers, triggers,
code_name: name.clone(), code_name: name.clone(),
backup_file: String::new(),
} }
} }
#[inline(always)] #[inline(always)]
pub fn with_path(mut self, path: impl AsRef<Path>) -> anyhow::Result<FilesController> { pub fn with_path(mut self, path: impl AsRef<Path>, backup : String) -> anyhow::Result<FilesController> {
self.path = path.as_ref().to_string_lossy().into_owned(); self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = { self.watcher = {
match create_watcher(&self.name, &self.path) { match create_watcher(&self.name, &self.path) {
@ -69,6 +71,11 @@ pub mod v2 {
} }
}; };
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name)); self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
self.backup_file = backup;
match create_backup(&self.code_name, &self.backup_file) {
Ok(_) => info!("Backup file for {} was created ({})", &self.code_name, &self.backup_file),
Err(er) => warn!("{}. Ignoring ...", er),
}
Ok(self) Ok(self)
} }
pub fn add_event(&mut self, file_controller: FilesController) { pub fn add_event(&mut self, file_controller: FilesController) {
@ -97,6 +104,9 @@ pub mod v2 {
pub fn get_code_name(&self) -> Arc<str> { pub fn get_code_name(&self) -> Arc<str> {
self.code_name.clone() self.code_name.clone()
} }
pub fn get_backup_file(&self) -> String {
self.backup_file.to_string()
}
} }
#[async_trait] #[async_trait]
impl ProcessUnit for FilesController { impl ProcessUnit for FilesController {
@ -120,9 +130,24 @@ pub mod v2 {
a || mask.mask == EventMask::DELETE_SELF, a || mask.mask == EventMask::DELETE_SELF,
b || mask.mask == EventMask::MODIFY, b || mask.mask == EventMask::MODIFY,
) )
}); }
if let (recreate_watcher, true) = (need_to_recreate, was_modifired) { );
if self.backup_file.is_empty() {
} else {
}
if let (mut recreate_watcher, true) = (need_to_recreate, was_modifired) {
if self.backup_file.is_empty() {
warn!("File {} ({}) was changed", self.name, &self.path); warn!("File {} ({}) was changed", self.name, &self.path);
self.trigger_on(Some(FileTriggerType::OnChange)).await;
} else {
recreate_watcher = true;
match restore_file(&self.code_name, &self.backup_file).await {
Ok(_) => info!("File {} was successfully restored", &self.code_name),
Err(er) => error!("Cannot restore file {} : {}", &self.code_name, er),
}
}
if recreate_watcher { if recreate_watcher {
self.watcher = match create_watcher(&self.name, &self.path) { self.watcher = match create_watcher(&self.name, &self.path) {
Ok(notifier) => Some(notifier), Ok(notifier) => Some(notifier),
@ -135,21 +160,42 @@ pub mod v2 {
} }
} }
} }
self.trigger_on(Some(FileTriggerType::OnChange)).await;
return;
} }
} }
} }
None => { /* DEAD END */ } None => return,
} }
} else { } else {
if let FileState::Ok = self.state { if let FileState::Ok = self.state {
if self.backup_file.is_empty() {
warn!( warn!(
"File {} ({}) was not found in determined scope", "File {} ({}) was not found in determined scope",
self.name, &self.path self.name, &self.path
); );
self.state = FileState::NotFound; self.state = FileState::NotFound;
self.trigger_on(Some(FileTriggerType::OnDelete)).await; self.trigger_on(Some(FileTriggerType::OnDelete)).await;
} else {
warn!(
"File {} ({}) was not found in determined scope. Restoring from backup-file ...",
self.name, &self.path
);
match restore_file(&self.code_name, &self.backup_file).await {
Err(er) => error!("Cannot restore file {} : {}", &self.code_name, er),
Ok(_) => {
info!("File {} was successfully restored", &self.code_name);
self.watcher = match create_watcher(&self.name, &self.path) {
Ok(notifier) => Some(notifier),
Err(er) => {
error!(
"Failed to recreate watcher for {} ({}) : {}",
self.name, &self.path, er
);
None
}
}
},
}
}
} }
return; return;
} }
@ -158,6 +204,18 @@ pub mod v2 {
} }
} }
pub fn create_backup(target: &str, backup: &str) -> anyhow::Result<u64> {
return if !backup.is_empty() {
Ok(std::fs::copy(target, backup)?)
} else {
Err(anyhow::Error::msg(format!("No need to create backup-file for {}", target)))
}
}
pub async fn restore_file(target: &str, backup: &str) -> anyhow::Result<u64> {
Ok(tokio::fs::copy(backup, target).await?)
}
/// # Fn `create_watcher` /// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events /// ## for creating watcher on file's delete | update events
/// ///

View File

@ -326,6 +326,7 @@ pub mod processes {
pub name: String, pub name: String,
pub path: String, pub path: String,
pub status: FileState, pub status: FileState,
pub backup_file : String,
pub triggers: FileTriggers, pub triggers: FileTriggers,
} }
#[derive(Debug, serde::Serialize)] #[derive(Debug, serde::Serialize)]
@ -496,6 +497,8 @@ pub struct ProcessExtended {
name: String, name: String,
status: ProcessState, status: ProcessState,
pid: Pid, pid: Pid,
start_time: String,
duration: String,
dependencies: processes::deps::Dependencies, dependencies: processes::deps::Dependencies,
cpu_usage: f32, cpu_usage: f32,
ram_usage: u64, ram_usage: u64,
@ -509,10 +512,31 @@ impl ProcessExtended {
system.refresh_processes(sysinfo::ProcessesToUpdate::All, true); system.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
return if let Some(prc) = system.process(proc.pid.new_sysinfo_pid()) { return if let Some(prc) = system.process(proc.pid.new_sysinfo_pid()) {
let disk_usage = prc.disk_usage(); let disk_usage = prc.disk_usage();
let duration = chrono::Duration::new(prc.run_time() as i64, 0);
let start_time = chrono::DateTime::from_timestamp(prc.start_time() as i64, 0);
Self { Self {
name: proc.name, name: proc.name,
status: proc.state, status: proc.state,
pid: proc.pid, pid: proc.pid,
start_time : {
match start_time {
Some(date) => date.to_string(),
None => String::new()
}
},
duration: {
match duration {
Some(duration) => {
format!("{}:{}:{}:{}",
duration.num_days(),
duration.num_hours(),
duration.num_minutes(),
duration.num_seconds()
)
},
None => String::new()
}
},
dependencies: proc.dependencies, dependencies: proc.dependencies,
cpu_usage: prc.cpu_usage(), cpu_usage: prc.cpu_usage(),
ram_usage: prc.memory(), ram_usage: prc.memory(),
@ -525,6 +549,8 @@ impl ProcessExtended {
name: proc.name, name: proc.name,
status: proc.state, status: proc.state,
pid: proc.pid, pid: proc.pid,
start_time : String::new(),
duration: String::new(),
dependencies: proc.dependencies, dependencies: proc.dependencies,
cpu_usage: 0.0, cpu_usage: 0.0,
ram_usage: 0, ram_usage: 0,

View File

@ -1,5 +1,7 @@
{ {
"dateOfCreation": "1", "dateOfCreation": "1",
"configServer": "", "processes": []
}
,
"processes": [] "processes": []
} }

View File

@ -1,5 +1,7 @@
{ {
"dateOfCreation": "1", "dateOfCreation": "1",
"configServer": "", "processes": []
}
,
"processes": [] "processes": []
} }

View File

@ -1,6 +1,5 @@
{ {
"dateOfCreation": "1721381809103", "dateOfCreation": "1721381809103",
"configServer" : "localhost",
"processes": [ "processes": [
{ {
"name": "temp-process", "name": "temp-process",
@ -12,7 +11,8 @@
"src": "/home/vladislav/web/runner-rs/examples/", "src": "/home/vladislav/web/runner-rs/examples/",
"triggers": { "triggers": {
"onDelete": "hold", "onDelete": "hold",
"onChange": "stop" "onChange": "stop",
"doRestore" : true
} }
} }
], ],