monitor/src/utils/services.rs

135 lines
4.4 KiB
Rust

use crate::options::structs::{CustomError, Services};
use crate::utils::prcs::{is_active, is_frozen};
use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
pub async fn service_handler(
name: &str,
services: &Vec<Services>,
tx: Arc<mpsc::Sender<u8>>,
) -> Result<(), CustomError> {
// println!("service daemon on {}", name);
for serv in services {
if check_service(&serv.hostname, &serv.port).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal);
}
error!(
"Service {}:{} is unreachable for process {}",
&serv.hostname, &serv.port, &name
);
match serv.triggers.on_lost.as_str() {
"stay" => {
tx.send(4).await.unwrap();
continue;
}
"stop" => {
if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap();
tokio::task::yield_now().await;
return Err(CustomError::Fatal);
}
}
"hold" => {
// if is_frozen(name).await {
// return Err(CustomError::Fatal);
// }
if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap();
tokio::task::yield_now().await;
return Err(CustomError::Fatal);
}
}
_ => {
tx.send(101).await.unwrap();
return Err(CustomError::Fatal);
}
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
if serv.triggers.wait == 0 {
loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!(
"Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port
);
match check_service(&serv.hostname, &serv.port).await {
Ok(_) => {
log::info!(
"Successfully connected to {} from {} process!",
&serv.hostname,
&name
);
break;
}
Err(_) => {
tokio::task::yield_now().await;
}
}
}
Ok(())
} else {
let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!(
"Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port
);
match check_service(&serv.hostname, &serv.port).await {
Ok(_) => {
log::info!(
"Successfully connected to {} from {} process!",
&serv.hostname,
&name
);
return Ok(());
}
Err(_) => {
tokio::task::yield_now().await;
}
}
}
Err(CustomError::Fatal)
}
}
// ! have to be rewritten
// todo: rewrite use
async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
let addr = format!("{}:{}", hostname, port);
match addr.to_socket_addrs() {
Ok(mut addrs) => {
if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
Ok(())
} else {
Err(CustomError::Fatal)
}
}
Err(_) => Err(CustomError::Fatal),
}
}
#[cfg(test)]
mod service_unittests {
use super::check_service;
#[tokio::test]
async fn check_available_service() {
assert!(check_service("ya.ru", &443).await.is_ok());
}
#[tokio::test]
async fn check_unavailable_service() {
assert!(check_service("unavailable.service", &1111).await.is_err());
}
}