monitor/noxis-rs/src/utils/services.rs

336 lines
12 KiB
Rust

use crate::options::structs::{CustomError, Services};
use super::prcs::{is_active, is_frozen};
use log::{error, warn};
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use tokio::sync::mpsc::Sender as Sender;
pub mod v2 {
use log::info;
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque};
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
// type wrapper for service wait queue
type ConnectionQueue<'a> = BTreeMap<u32, VecDeque<&'a str>>;
#[derive(Debug)]
struct ServicesController<'a> {
// i.e. yandex.ru
name : &'a str,
// i.e. yandex.ru:443
access_url : String,
// "OK" or "Unavailable"
state: ServiceState,
// btree map with key as max wait time and it's key to hashmap
config: ConnectionQueue<'a>,
// Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers<'a>,
}
impl<'a> ServicesController<'a> {
pub fn new() -> ServicesController<'a> {
ServicesController {
name : "",
access_url : String::new(),
state : ServiceState::Unavailable,
config: ConnectionQueue::new(),
event_registrator : EventHandlers::new(),
}
}
pub fn with_params(
&mut self,
hostname: &'a str,
port: Option<&'a str>,
conn_queue: ConnectionQueue<'a>,
event_reg: EventHandlers<'a>,
) -> &mut ServicesController<'a> {
self.name = hostname;
self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)));
self.config = conn_queue;
self.event_registrator = event_reg;
self
}
pub fn add_process(
&mut self,
proc_name: &'a str,
trigger: Triggers<'a>,
sender: MpscSender<'a>,
) {
// queue add
if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait)
.and_modify(|el| el.push_back(proc_name))
.or_insert({
let mut temp = VecDeque::new();
temp.push_back(proc_name);
temp
});
}
// event add
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
}
async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?;
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
}
Ok(())
}
async fn trigger_on(&'a mut self) {
match self.state {
ServiceState::Ok => {
let _ = self.event_registrator
.iter()
.map(|(_, (_, el))| async {
let _ = el.send(Events::Positive(&self.access_url)).await;
});
},
ServiceState::Unavailable => {
// looped check and notifying
self.looped_check().await;
},
}
}
async fn looped_check(self: &'a mut Self) {
let longest = self.config.last_entry().unwrap();
let longest = longest.key();
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
let timer = tokio::time::Instant::now();
let mut attempt: u32 = 1;
let access_url = Arc::new(self.access_url.clone());
// let event_registrator = &mut self.event_registrator;
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
// let access_url = access_url.clone();
loop {
interapter.tick().await;
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
attempt += 1;
let state_check_result = self.check_state().await;
if state_check_result.is_ok() {
info!("Connection to {} is `OK` now", &access_url);
self.state = ServiceState::Ok;
break;
} else {
let now = timer.elapsed();
let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.flat_map(|(_, a)| a.iter().copied())
.collect::<VecDeque<&str>>();
for name in iterator {
let sender_opt = self.event_registrator.get(name)
.map(|(trigger, sender)|
(trigger.to_service_negative_event(name), sender)
);
if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await;
} else {
error!("Cannot find {} channel sender in {} service", name, &self.access_url)
}
}
}
}
}).await {
info!("Timeout of establishing connection to {}. ", &access_url);
}
}
}
impl<'a> ProcessUnit<'a> for ServicesController<'a> {
async fn process(&'a mut self) {
// check_service(hostname, port)
let current_state = self.check_state().await;
match (&self.state, current_state) {
(ServiceState::Unavailable, Ok(_)) => {
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Ok;
self.trigger_on().await;
},
(ServiceState::Ok, Err(_)) => {
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Unavailable;
self.trigger_on().await;
},
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
_ => { /* DEAD END WITH NO INTEREST */ },
}
}
}
}
/// # Fn `service_handler`
/// ## function to realize mechanism of current process' dep services monitoring
///
/// *input* : `&str`, `&Vec<Services>`, `Arc<mpsc::Sender<u8>>`
///
/// *output* : ()
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer
///
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
///
pub async fn service_handler(
name: &str,
services: &Vec<Services>,
tx: Arc<mpsc::Sender<u8>>,
) -> Result<(), CustomError> {
// println!("service daemon on {}", name);
for serv in services {
if check_service(&serv.hostname, &serv.port).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(CustomError::Fatal);
}
error!(
"Service {}:{} is unreachable for process {}",
&serv.hostname, &serv.port, &name
);
match serv.triggers.on_lost.as_str() {
"stay" => {
tx.send(4).await.unwrap();
continue;
}
"stop" => {
if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap();
tokio::task::yield_now().await;
return Err(CustomError::Fatal);
}
}
"hold" => {
// if is_frozen(name).await {
// return Err(CustomError::Fatal);
// }
if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap();
tokio::task::yield_now().await;
return Err(CustomError::Fatal);
}
}
_ => {
tx.send(101).await.unwrap();
return Err(CustomError::Fatal);
}
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
/// # Fn `looped_service_connecting`
/// ## for service's state check in loop (with delay and restriction of attempts)
///
/// *input* : `&str`, `&Services`
///
/// *output* : Ok(()) if service now available | Err(er) if still not
///
/// *initiator* : fn `service_handler`
///
/// *managing* : process name, current service struct
///
/// *depends on* : fn `check_service`
///
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
if serv.triggers.wait == 0 {
loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!(
"Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port
);
match check_service(&serv.hostname, &serv.port).await {
Ok(_) => {
log::info!(
"Successfully connected to {} from {} process!",
&serv.hostname,
&name
);
break;
}
Err(_) => {
tokio::task::yield_now().await;
}
}
}
Ok(())
} else {
let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
warn!(
"Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port
);
match check_service(&serv.hostname, &serv.port).await {
Ok(_) => {
log::info!(
"Successfully connected to {} from {} process!",
&serv.hostname,
&name
);
return Ok(());
}
Err(_) => {
tokio::task::yield_now().await;
}
}
}
Err(CustomError::Fatal)
}
}
/// # Fn `check_service`
/// ## for check current service's availiability
///
/// *input* : `&str`, `&u32`
///
/// *output* : Ok(()) if service now available | Err(er) if still not
///
/// *initiator* : fn `service_handler`, fn `looped_service_connecting`
///
/// *managing* : hostname, port
///
/// *depends on* : -
///
// ! have to be rewritten
// todo: rewrite use
async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
let addr = format!("{}:{}", hostname, port);
match addr.to_socket_addrs() {
Ok(mut addrs) => {
if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
Ok(())
} else {
Err(CustomError::Fatal)
}
}
Err(_) => Err(CustomError::Fatal),
}
}
#[cfg(test)]
mod service_unittests {
use super::check_service;
#[tokio::test]
async fn check_available_service() {
assert!(check_service("ya.ru", &443).await.is_ok());
}
#[tokio::test]
async fn check_unavailable_service() {
assert!(check_service("unavailable.service", &1111).await.is_err());
}
}