service bug

migrate
prplV 2025-05-28 18:23:51 +03:00
parent 75940cb187
commit 81df3f8435
3 changed files with 33 additions and 148 deletions

View File

@ -20,7 +20,7 @@
"hostname": "ya.ru", "hostname": "ya.ru",
"port": 443, "port": 443,
"triggers": { "triggers": {
"wait": 10, "wait": 5,
"onLost": "stop" "onLost": "stop"
} }
} }

View File

@ -91,6 +91,7 @@ pub mod v2 {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
match terminate_process(&self.name).await { match terminate_process(&self.name).await {
Ok(_) => { Ok(_) => {
info!("Process {} was stopped ...", &self.name);
self.state = ProcessState::Stopped; self.state = ProcessState::Stopped;
self.pid = Pid::new(); self.pid = Pid::new();
}, },
@ -105,6 +106,7 @@ pub mod v2 {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name);
match terminate_process(&self.name).await { match terminate_process(&self.name).await {
Ok(_) => { Ok(_) => {
info!("Process {} was forcefully stopped ...", &self.name);
self.state = ProcessState::StoppedByCli; self.state = ProcessState::StoppedByCli;
self.pid = Pid::new(); self.pid = Pid::new();
}, },
@ -119,6 +121,7 @@ pub mod v2 {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name); info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name);
match freeze_process(&self.name).await { match freeze_process(&self.name).await {
Ok(_) => { Ok(_) => {
info!("Process {} was forcefully frozen ...", &self.name);
self.state = ProcessState::HoldingByCli; self.state = ProcessState::HoldingByCli;
// self.pid = Pid::new(); // self.pid = Pid::new();
}, },
@ -133,8 +136,8 @@ pub mod v2 {
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
match freeze_process(&self.name).await { match freeze_process(&self.name).await {
Ok(_) => { Ok(_) => {
info!("Process {} was frozen ...", &self.name);
self.state = ProcessState::Holding; self.state = ProcessState::Holding;
// self.pid = Pid::new();
}, },
Err(er) => { Err(er) => {
error!("Cannot freeze process {} : {}", self.name, er); error!("Cannot freeze process {} : {}", self.name, er);

View File

@ -5,8 +5,11 @@ use std::sync::Arc;
use tokio::time::Duration; use tokio::time::Duration;
use tokio::sync::mpsc::Sender as Sender; use tokio::sync::mpsc::Sender as Sender;
use async_trait::async_trait; use async_trait::async_trait;
use std::pin::Pin;
use futures::future::Future;
pub mod v2 { pub mod v2 {
use futures::FutureExt;
use log::info; use log::info;
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState}; use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
@ -72,6 +75,7 @@ pub mod v2 {
self.event_registrator = event_reg; self.event_registrator = event_reg;
self self
} }
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
} }
@ -111,7 +115,7 @@ pub mod v2 {
} }
let tasks: Vec<_> = addrs.into_iter().map(|addr| async move { let tasks: Vec<_> = addrs.into_iter().map(|addr| async move {
match tokio::time::timeout(Duration::from_secs(1), tokio::net::TcpStream::connect(&addr)).await { match tokio::time::timeout(Duration::from_secs(2), tokio::net::TcpStream::connect(&addr)).await {
Ok(Ok(_)) => Some(addr), Ok(Ok(_)) => Some(addr),
_ => None, _ => None,
} }
@ -132,11 +136,16 @@ pub mod v2 {
async fn trigger_on(&mut self) { async fn trigger_on(&mut self) {
match self.state { match self.state {
ServiceState::Ok => { ServiceState::Ok => {
let _ = self.event_registrator let futures : Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self.event_registrator.iter()
.iter() .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt)))
.map(|(_, (_, el))| async { .map(|(prc, (serv, sender_opt))| async move {
let _ = el.send(Events::Positive(self.access_url.clone())).await; info!("Notifying process {} ...", prc);
}); let _ = sender_opt.send(Events::Positive(serv.clone()));
})
.map(|fut| fut.boxed())
.collect();
futures::future::join_all(futures).await;
}, },
ServiceState::Unavailable => { ServiceState::Unavailable => {
// looped check and notifying // looped check and notifying
@ -160,10 +169,20 @@ pub mod v2 {
attempt += 1; attempt += 1;
let state_check_result = self.check_state().await; let state_check_result = self.check_state().await;
if state_check_result.is_ok() { if state_check_result.is_ok() {
info!("Connection to {} is `OK` now", &access_url); info!("Connection to {} is `OK` now", &access_url);
self.state = ServiceState::Ok; self.state = ServiceState::Ok;
let futures : Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self.event_registrator.iter()
.map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt)))
.map(|(prc, (serv, sender_opt))| async move {
info!("Notifying process {} ...", prc);
let _ = sender_opt.send(Events::Positive(serv.clone()));
})
.map(|fut| fut.boxed())
.collect();
futures::future::join_all(futures).await;
break; break;
} else { } else {
let now = timer.elapsed(); let now = timer.elapsed();
@ -178,7 +197,7 @@ pub mod v2 {
info!("Trying to notify process `{}` ...", &proc_name); info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&name) let sender_opt = self.event_registrator.get(&name)
.map(|(trigger, sender)| .map(|(trigger, sender)|
(trigger.to_service_negative_event(name.clone()), sender) (trigger.to_service_negative_event(self.access_url.clone()), sender)
); );
if let Some((tr, tx)) = sender_opt { if let Some((tr, tx)) = sender_opt {
@ -217,144 +236,7 @@ pub mod v2 {
} }
} }
/// # Fn `service_handler`
/// ## function to realize mechanism of current process' dep services monitoring
///
/// *input* : `&str`, `&Vec<Services>`, `Arc<mpsc::Sender<u8>>`
///
/// *output* : ()
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer
///
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
///
// pub async fn service_handler(
// name: &str,
// services: &Vec<Services>,
// tx: Arc<mpsc::Sender<u8>>,
// ) -> Result<(), CustomError> {
// // println!("service daemon on {}", name);
// for serv in services {
// if check_service(&serv.hostname, &serv.port).await.is_err() {
// if !is_active(name).await || is_frozen(name).await {
// return Err(CustomError::Fatal);
// }
// error!(
// "Service {}:{} is unreachable for process {}",
// &serv.hostname, &serv.port, &name
// );
// match serv.triggers.on_lost.as_str() {
// "stay" => {
// tx.send(4).await.unwrap();
// continue;
// }
// "stop" => {
// if looped_service_connecting(name, serv).await.is_err() {
// tx.send(5).await.unwrap();
// tokio::task::yield_now().await;
// return Err(CustomError::Fatal);
// }
// }
// "hold" => {
// // if is_frozen(name).await {
// // return Err(CustomError::Fatal);
// // }
// if looped_service_connecting(name, serv).await.is_err() {
// tx.send(6).await.unwrap();
// tokio::task::yield_now().await;
// return Err(CustomError::Fatal);
// }
// }
// _ => {
// tx.send(101).await.unwrap();
// return Err(CustomError::Fatal);
// }
// }
// }
// }
// tokio::time::sleep(Duration::from_millis(100)).await;
// Ok(())
// }
/// # Fn `looped_service_connecting`
/// ## for service's state check in loop (with delay and restriction of attempts)
///
/// *input* : `&str`, `&Services`
///
/// *output* : Ok(()) if service now available | Err(er) if still not
///
/// *initiator* : fn `service_handler`
///
/// *managing* : process name, current service struct
///
/// *depends on* : fn `check_service`
///
// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
// if serv.triggers.wait == 0 {
// loop {
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
// warn!(
// "Attempting to connect from {} process to {}:{}",
// &name, &serv.hostname, &serv.port
// );
// match check_service(&serv.hostname, &serv.port).await {
// Ok(_) => {
// log::info!(
// "Successfully connected to {} from {} process!",
// &serv.hostname,
// &name
// );
// break;
// }
// Err(_) => {
// tokio::task::yield_now().await;
// }
// }
// }
// Ok(())
// } else {
// let start = Instant::now();
// while start.elapsed().as_secs() < serv.triggers.wait.into() {
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
// warn!(
// "Attempting to connect from {} process to {}:{}",
// &name, &serv.hostname, &serv.port
// );
// match check_service(&serv.hostname, &serv.port).await {
// Ok(_) => {
// log::info!(
// "Successfully connected to {} from {} process!",
// &serv.hostname,
// &name
// );
// return Ok(());
// }
// Err(_) => {
// tokio::task::yield_now().await;
// }
// }
// }
// Err(CustomError::Fatal)
// }
// }
/// # Fn `check_service`
/// ## for check current service's availiability
///
/// *input* : `&str`, `&u32`
///
/// *output* : Ok(()) if service now available | Err(er) if still not
///
/// *initiator* : fn `service_handler`, fn `looped_service_connecting`
///
/// *managing* : hostname, port
///
/// *depends on* : -
///
// ! have to be rewritten
// todo: rewrite use
async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
let addr = format!("{}:{}", hostname, port); let addr = format!("{}:{}", hostname, port);