use crate::options::structs::{CustomError, Services}; use crate::utils::prcs::{is_active, is_frozen}; use log::{error, warn}; use std::net::{TcpStream, ToSocketAddrs}; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; /// # Fn `service_handler` /// ## function to realize mechanism of current process' dep services monitoring /// /// *input* : `&str`, `&Vec`, `Arc>` /// /// *output* : () /// /// *initiator* : fn `utils::running_handler` /// /// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer /// /// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` /// pub async fn service_handler( name: &str, services: &Vec, tx: Arc>, ) -> Result<(), CustomError> { // println!("service daemon on {}", name); for serv in services { if check_service(&serv.hostname, &serv.port).await.is_err() { if !is_active(name).await || is_frozen(name).await { return Err(CustomError::Fatal); } error!( "Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name ); match serv.triggers.on_lost.as_str() { "stay" => { tx.send(4).await.unwrap(); continue; } "stop" => { if looped_service_connecting(name, serv).await.is_err() { tx.send(5).await.unwrap(); tokio::task::yield_now().await; return Err(CustomError::Fatal); } } "hold" => { // if is_frozen(name).await { // return Err(CustomError::Fatal); // } if looped_service_connecting(name, serv).await.is_err() { tx.send(6).await.unwrap(); tokio::task::yield_now().await; return Err(CustomError::Fatal); } } _ => { tx.send(101).await.unwrap(); return Err(CustomError::Fatal); } } } } tokio::time::sleep(Duration::from_millis(100)).await; Ok(()) } /// # Fn `looped_service_connecting` /// ## for service's state check in loop (with delay and restriction of attempts) /// /// *input* : `&str`, `&Services` /// /// *output* : Ok(()) if service now available | Err(er) if still not /// /// *initiator* : fn `service_handler` /// /// *managing* : process name, current service struct /// /// *depends on* : fn `check_service` /// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { if serv.triggers.wait == 0 { loop { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; warn!( "Attempting to connect from {} process to {}:{}", &name, &serv.hostname, &serv.port ); match check_service(&serv.hostname, &serv.port).await { Ok(_) => { log::info!( "Successfully connected to {} from {} process!", &serv.hostname, &name ); break; } Err(_) => { tokio::task::yield_now().await; } } } Ok(()) } else { let start = Instant::now(); while start.elapsed().as_secs() < serv.triggers.wait.into() { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; warn!( "Attempting to connect from {} process to {}:{}", &name, &serv.hostname, &serv.port ); match check_service(&serv.hostname, &serv.port).await { Ok(_) => { log::info!( "Successfully connected to {} from {} process!", &serv.hostname, &name ); return Ok(()); } Err(_) => { tokio::task::yield_now().await; } } } Err(CustomError::Fatal) } } /// # Fn `check_service` /// ## for check current service's availiability /// /// *input* : `&str`, `&u32` /// /// *output* : Ok(()) if service now available | Err(er) if still not /// /// *initiator* : fn `service_handler`, fn `looped_service_connecting` /// /// *managing* : hostname, port /// /// *depends on* : - /// // ! have to be rewritten // todo: rewrite use async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { let addr = format!("{}:{}", hostname, port); match addr.to_socket_addrs() { Ok(mut addrs) => { if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { Ok(()) } else { Err(CustomError::Fatal) } } Err(_) => Err(CustomError::Fatal), } } #[cfg(test)] mod service_unittests { use super::check_service; #[tokio::test] async fn check_available_service() { assert!(check_service("ya.ru", &443).await.is_ok()); } #[tokio::test] async fn check_unavailable_service() { assert!(check_service("unavailable.service", &1111).await.is_err()); } }