From 81df3f8435982e29d985dc13e6f76d8064c66a5b Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 28 May 2025 18:23:51 +0300 Subject: [PATCH] service bug --- noxis-rs/settings.json | 2 +- noxis-rs/src/utils/prcs.rs | 5 +- noxis-rs/src/utils/services.rs | 174 ++++++--------------------------- 3 files changed, 33 insertions(+), 148 deletions(-) diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index 4dae157..62d45f2 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -20,7 +20,7 @@ "hostname": "ya.ru", "port": 443, "triggers": { - "wait": 10, + "wait": 5, "onLost": "stop" } } diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 996e46e..3214edd 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -91,6 +91,7 @@ pub mod v2 { info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); match terminate_process(&self.name).await { Ok(_) => { + info!("Process {} was stopped ...", &self.name); self.state = ProcessState::Stopped; self.pid = Pid::new(); }, @@ -105,6 +106,7 @@ pub mod v2 { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); match terminate_process(&self.name).await { Ok(_) => { + info!("Process {} was forcefully stopped ...", &self.name); self.state = ProcessState::StoppedByCli; self.pid = Pid::new(); }, @@ -119,6 +121,7 @@ pub mod v2 { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name); match freeze_process(&self.name).await { Ok(_) => { + info!("Process {} was forcefully frozen ...", &self.name); self.state = ProcessState::HoldingByCli; // self.pid = Pid::new(); }, @@ -133,8 +136,8 @@ pub mod v2 { info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); match freeze_process(&self.name).await { Ok(_) => { + info!("Process {} was frozen ...", &self.name); self.state = ProcessState::Holding; - // self.pid = Pid::new(); }, Err(er) => { error!("Cannot freeze process {} : {}", self.name, er); diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index 7885eb4..0d568e9 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -5,8 +5,11 @@ use std::sync::Arc; use tokio::time::Duration; use tokio::sync::mpsc::Sender as Sender; use async_trait::async_trait; +use std::pin::Pin; +use futures::future::Future; pub mod v2 { + use futures::FutureExt; use log::info; use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState}; @@ -72,6 +75,7 @@ pub mod v2 { self.event_registrator = event_reg; self } + pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) } @@ -111,7 +115,7 @@ pub mod v2 { } let tasks: Vec<_> = addrs.into_iter().map(|addr| async move { - match tokio::time::timeout(Duration::from_secs(1), tokio::net::TcpStream::connect(&addr)).await { + match tokio::time::timeout(Duration::from_secs(2), tokio::net::TcpStream::connect(&addr)).await { Ok(Ok(_)) => Some(addr), _ => None, } @@ -132,11 +136,16 @@ pub mod v2 { async fn trigger_on(&mut self) { match self.state { ServiceState::Ok => { - let _ = self.event_registrator - .iter() - .map(|(_, (_, el))| async { - let _ = el.send(Events::Positive(self.access_url.clone())).await; - }); + let futures : Vec + Send>>> = self.event_registrator.iter() + .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) + .map(|(prc, (serv, sender_opt))| async move { + info!("Notifying process {} ...", prc); + let _ = sender_opt.send(Events::Positive(serv.clone())); + }) + .map(|fut| fut.boxed()) + .collect(); + + futures::future::join_all(futures).await; }, ServiceState::Unavailable => { // looped check and notifying @@ -160,10 +169,20 @@ pub mod v2 { attempt += 1; let state_check_result = self.check_state().await; - + if state_check_result.is_ok() { info!("Connection to {} is `OK` now", &access_url); - self.state = ServiceState::Ok; + self.state = ServiceState::Ok; + let futures : Vec + Send>>> = self.event_registrator.iter() + .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) + .map(|(prc, (serv, sender_opt))| async move { + info!("Notifying process {} ...", prc); + let _ = sender_opt.send(Events::Positive(serv.clone())); + }) + .map(|fut| fut.boxed()) + .collect(); + + futures::future::join_all(futures).await; break; } else { let now = timer.elapsed(); @@ -178,7 +197,7 @@ pub mod v2 { info!("Trying to notify process `{}` ...", &proc_name); let sender_opt = self.event_registrator.get(&name) .map(|(trigger, sender)| - (trigger.to_service_negative_event(name.clone()), sender) + (trigger.to_service_negative_event(self.access_url.clone()), sender) ); if let Some((tr, tx)) = sender_opt { @@ -217,144 +236,7 @@ pub mod v2 { } } -/// # Fn `service_handler` -/// ## function to realize mechanism of current process' dep services monitoring -/// -/// *input* : `&str`, `&Vec`, `Arc>` -/// -/// *output* : () -/// -/// *initiator* : fn `utils::running_handler` -/// -/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer -/// -/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` -/// -// pub async fn service_handler( -// name: &str, -// services: &Vec, -// tx: Arc>, -// ) -> Result<(), CustomError> { -// // println!("service daemon on {}", name); -// for serv in services { -// if check_service(&serv.hostname, &serv.port).await.is_err() { -// if !is_active(name).await || is_frozen(name).await { -// return Err(CustomError::Fatal); -// } -// error!( -// "Service {}:{} is unreachable for process {}", -// &serv.hostname, &serv.port, &name -// ); -// match serv.triggers.on_lost.as_str() { -// "stay" => { -// tx.send(4).await.unwrap(); -// continue; -// } -// "stop" => { -// if looped_service_connecting(name, serv).await.is_err() { -// tx.send(5).await.unwrap(); -// tokio::task::yield_now().await; -// return Err(CustomError::Fatal); -// } -// } -// "hold" => { -// // if is_frozen(name).await { -// // return Err(CustomError::Fatal); -// // } -// if looped_service_connecting(name, serv).await.is_err() { -// tx.send(6).await.unwrap(); -// tokio::task::yield_now().await; -// return Err(CustomError::Fatal); -// } -// } -// _ => { -// tx.send(101).await.unwrap(); -// return Err(CustomError::Fatal); -// } -// } -// } -// } -// tokio::time::sleep(Duration::from_millis(100)).await; -// Ok(()) -// } -/// # Fn `looped_service_connecting` -/// ## for service's state check in loop (with delay and restriction of attempts) -/// -/// *input* : `&str`, `&Services` -/// -/// *output* : Ok(()) if service now available | Err(er) if still not -/// -/// *initiator* : fn `service_handler` -/// -/// *managing* : process name, current service struct -/// -/// *depends on* : fn `check_service` -/// -// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { -// if serv.triggers.wait == 0 { -// loop { -// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; -// warn!( -// "Attempting to connect from {} process to {}:{}", -// &name, &serv.hostname, &serv.port -// ); -// match check_service(&serv.hostname, &serv.port).await { -// Ok(_) => { -// log::info!( -// "Successfully connected to {} from {} process!", -// &serv.hostname, -// &name -// ); -// break; -// } -// Err(_) => { -// tokio::task::yield_now().await; -// } -// } -// } -// Ok(()) -// } else { -// let start = Instant::now(); -// while start.elapsed().as_secs() < serv.triggers.wait.into() { -// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; -// warn!( -// "Attempting to connect from {} process to {}:{}", -// &name, &serv.hostname, &serv.port -// ); -// match check_service(&serv.hostname, &serv.port).await { -// Ok(_) => { -// log::info!( -// "Successfully connected to {} from {} process!", -// &serv.hostname, -// &name -// ); -// return Ok(()); -// } -// Err(_) => { -// tokio::task::yield_now().await; -// } -// } -// } -// Err(CustomError::Fatal) -// } -// } - -/// # Fn `check_service` -/// ## for check current service's availiability -/// -/// *input* : `&str`, `&u32` -/// -/// *output* : Ok(()) if service now available | Err(er) if still not -/// -/// *initiator* : fn `service_handler`, fn `looped_service_connecting` -/// -/// *managing* : hostname, port -/// -/// *depends on* : - -/// -// ! have to be rewritten -// todo: rewrite use async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { let addr = format!("{}:{}", hostname, port);