357 lines
13 KiB
Rust
357 lines
13 KiB
Rust
use crate::options::structs::CustomError;
|
|
use log::{error, warn};
|
|
use std::net::{TcpStream, ToSocketAddrs};
|
|
use std::sync::Arc;
|
|
use tokio::time::Duration;
|
|
use tokio::sync::mpsc::Sender as Sender;
|
|
use async_trait::async_trait;
|
|
|
|
pub mod v2 {
|
|
use log::info;
|
|
|
|
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState};
|
|
|
|
use super::*;
|
|
use std::collections::{HashMap, BTreeMap, VecDeque};
|
|
|
|
type MpscSender = Arc<Sender<Events>>;
|
|
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
|
|
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
|
|
// type wrapper for service wait queue
|
|
type ConnectionQueue = BTreeMap<u32, VecDeque<Arc<str>>>;
|
|
|
|
#[derive(Debug)]
|
|
pub struct ServicesController {
|
|
// i.e. yandex.ru
|
|
#[allow(unused)]
|
|
name : String,
|
|
// i.e. yandex.ru:443
|
|
access_url : Arc<str>,
|
|
// "OK" or "Unavailable"
|
|
state: ServiceState,
|
|
// btree map with key as max wait time and it's key to hashmap
|
|
config: ConnectionQueue,
|
|
// Map of processes with their (trigger and mpsc sender)
|
|
event_registrator : EventHandlers,
|
|
}
|
|
|
|
impl PartialEq for ServicesController {
|
|
fn eq(&self, other: &Self) -> bool {
|
|
self.access_url == other.access_url
|
|
}
|
|
}
|
|
|
|
impl ServicesController {
|
|
pub fn new() -> ServicesController {
|
|
ServicesController {
|
|
name : String::new(),
|
|
access_url : Arc::from(String::new()),
|
|
state : ServiceState::Unavailable,
|
|
config: ConnectionQueue::new(),
|
|
event_registrator : EventHandlers::new(),
|
|
}
|
|
}
|
|
pub fn with_access_name(
|
|
mut self,
|
|
hostname: &str,
|
|
access_url: &str,
|
|
) -> ServicesController {
|
|
self.name = hostname.to_string();
|
|
self.access_url = Arc::from(access_url);
|
|
self
|
|
}
|
|
|
|
pub fn with_params(
|
|
mut self,
|
|
conn_queue: ConnectionQueue,
|
|
event_reg: EventHandlers,
|
|
) -> ServicesController {
|
|
self.config = conn_queue;
|
|
self.event_registrator = event_reg;
|
|
self
|
|
}
|
|
|
|
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
|
|
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p)))
|
|
}
|
|
pub fn add_process(
|
|
&mut self,
|
|
proc_name: &str,
|
|
trigger: Triggers,
|
|
sender: MpscSender,
|
|
) {
|
|
let proc_name: Arc<str> = Arc::from(proc_name);
|
|
// queue add
|
|
if let Triggers::Service { wait, .. } = trigger {
|
|
self.config.entry(wait)
|
|
.and_modify(|el| el.push_back(proc_name.clone()))
|
|
.or_insert({
|
|
let mut temp = VecDeque::new();
|
|
temp.push_back(proc_name.clone());
|
|
temp
|
|
});
|
|
}
|
|
// event add
|
|
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
|
|
}
|
|
async fn check_state(&self) -> anyhow::Result<()> {
|
|
let mut addrs = self.access_url.to_socket_addrs()?;
|
|
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
|
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
|
|
}
|
|
Ok(())
|
|
}
|
|
async fn trigger_on(&mut self) {
|
|
match self.state {
|
|
ServiceState::Ok => {
|
|
let _ = self.event_registrator
|
|
.iter()
|
|
.map(|(_, (_, el))| async {
|
|
let _ = el.send(Events::Positive(self.access_url.clone())).await;
|
|
});
|
|
},
|
|
ServiceState::Unavailable => {
|
|
// looped check and notifying
|
|
self.looped_check().await;
|
|
},
|
|
}
|
|
}
|
|
async fn looped_check(self: &mut Self) {
|
|
let longest = self.config.last_entry().unwrap();
|
|
let longest = longest.key();
|
|
let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
|
let timer = tokio::time::Instant::now();
|
|
let mut attempt: u32 = 1;
|
|
let access_url = Arc::new(self.access_url.clone());
|
|
// let event_registrator = &mut self.event_registrator;
|
|
|
|
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
|
|
// let access_url = access_url.clone();
|
|
loop {
|
|
interapter.tick().await;
|
|
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt);
|
|
attempt += 1;
|
|
|
|
let state_check_result = self.check_state().await;
|
|
|
|
if state_check_result.is_ok() {
|
|
info!("Connection to {} is `OK` now", &access_url);
|
|
self.state = ServiceState::Ok;
|
|
break;
|
|
} else {
|
|
let now = timer.elapsed();
|
|
let iterator = self.config.iter()
|
|
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
|
|
.flat_map(|(_, a)| a.iter().cloned())
|
|
.collect::<VecDeque<Arc<str>>>();
|
|
|
|
for name in iterator {
|
|
let proc_name = name.to_string();
|
|
info!("Trying to notify process `{}` ...", &proc_name);
|
|
let sender_opt = self.event_registrator.get(&name)
|
|
.map(|(trigger, sender)|
|
|
(trigger.to_service_negative_event(name.clone()), sender)
|
|
);
|
|
|
|
if let Some((tr, tx)) = sender_opt {
|
|
let _ = tx.send(tr.unwrap()).await;
|
|
} else {
|
|
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}).await {
|
|
info!("Timeout of establishing connection to {}. ", &access_url);
|
|
}
|
|
}
|
|
}
|
|
#[async_trait]
|
|
impl ProcessUnit for ServicesController {
|
|
async fn process(&mut self) {
|
|
// check_service(hostname, port)
|
|
let current_state = self.check_state().await;
|
|
match (&self.state, current_state) {
|
|
(ServiceState::Unavailable, Ok(_)) => {
|
|
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
|
self.state = ServiceState::Ok;
|
|
self.trigger_on().await;
|
|
},
|
|
(ServiceState::Ok, Err(_)) => {
|
|
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
|
|
self.state = ServiceState::Unavailable;
|
|
self.trigger_on().await;
|
|
},
|
|
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url),
|
|
_ => { /* DEAD END WITH NO INTEREST */ },
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// # Fn `service_handler`
|
|
/// ## function to realize mechanism of current process' dep services monitoring
|
|
///
|
|
/// *input* : `&str`, `&Vec<Services>`, `Arc<mpsc::Sender<u8>>`
|
|
///
|
|
/// *output* : ()
|
|
///
|
|
/// *initiator* : fn `utils::running_handler`
|
|
///
|
|
/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer
|
|
///
|
|
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
|
|
///
|
|
// pub async fn service_handler(
|
|
// name: &str,
|
|
// services: &Vec<Services>,
|
|
// tx: Arc<mpsc::Sender<u8>>,
|
|
// ) -> Result<(), CustomError> {
|
|
// // println!("service daemon on {}", name);
|
|
// for serv in services {
|
|
// if check_service(&serv.hostname, &serv.port).await.is_err() {
|
|
// if !is_active(name).await || is_frozen(name).await {
|
|
// return Err(CustomError::Fatal);
|
|
// }
|
|
// error!(
|
|
// "Service {}:{} is unreachable for process {}",
|
|
// &serv.hostname, &serv.port, &name
|
|
// );
|
|
// match serv.triggers.on_lost.as_str() {
|
|
// "stay" => {
|
|
// tx.send(4).await.unwrap();
|
|
// continue;
|
|
// }
|
|
// "stop" => {
|
|
// if looped_service_connecting(name, serv).await.is_err() {
|
|
// tx.send(5).await.unwrap();
|
|
// tokio::task::yield_now().await;
|
|
// return Err(CustomError::Fatal);
|
|
// }
|
|
// }
|
|
// "hold" => {
|
|
// // if is_frozen(name).await {
|
|
// // return Err(CustomError::Fatal);
|
|
// // }
|
|
// if looped_service_connecting(name, serv).await.is_err() {
|
|
// tx.send(6).await.unwrap();
|
|
// tokio::task::yield_now().await;
|
|
// return Err(CustomError::Fatal);
|
|
// }
|
|
// }
|
|
// _ => {
|
|
// tx.send(101).await.unwrap();
|
|
// return Err(CustomError::Fatal);
|
|
// }
|
|
// }
|
|
// }
|
|
// }
|
|
// tokio::time::sleep(Duration::from_millis(100)).await;
|
|
// Ok(())
|
|
// }
|
|
|
|
/// # Fn `looped_service_connecting`
|
|
/// ## for service's state check in loop (with delay and restriction of attempts)
|
|
///
|
|
/// *input* : `&str`, `&Services`
|
|
///
|
|
/// *output* : Ok(()) if service now available | Err(er) if still not
|
|
///
|
|
/// *initiator* : fn `service_handler`
|
|
///
|
|
/// *managing* : process name, current service struct
|
|
///
|
|
/// *depends on* : fn `check_service`
|
|
///
|
|
// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
|
// if serv.triggers.wait == 0 {
|
|
// loop {
|
|
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
|
// warn!(
|
|
// "Attempting to connect from {} process to {}:{}",
|
|
// &name, &serv.hostname, &serv.port
|
|
// );
|
|
// match check_service(&serv.hostname, &serv.port).await {
|
|
// Ok(_) => {
|
|
// log::info!(
|
|
// "Successfully connected to {} from {} process!",
|
|
// &serv.hostname,
|
|
// &name
|
|
// );
|
|
// break;
|
|
// }
|
|
// Err(_) => {
|
|
// tokio::task::yield_now().await;
|
|
// }
|
|
// }
|
|
// }
|
|
// Ok(())
|
|
// } else {
|
|
// let start = Instant::now();
|
|
// while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
|
// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
|
// warn!(
|
|
// "Attempting to connect from {} process to {}:{}",
|
|
// &name, &serv.hostname, &serv.port
|
|
// );
|
|
// match check_service(&serv.hostname, &serv.port).await {
|
|
// Ok(_) => {
|
|
// log::info!(
|
|
// "Successfully connected to {} from {} process!",
|
|
// &serv.hostname,
|
|
// &name
|
|
// );
|
|
// return Ok(());
|
|
// }
|
|
// Err(_) => {
|
|
// tokio::task::yield_now().await;
|
|
// }
|
|
// }
|
|
// }
|
|
// Err(CustomError::Fatal)
|
|
// }
|
|
// }
|
|
|
|
/// # Fn `check_service`
|
|
/// ## for check current service's availiability
|
|
///
|
|
/// *input* : `&str`, `&u32`
|
|
///
|
|
/// *output* : Ok(()) if service now available | Err(er) if still not
|
|
///
|
|
/// *initiator* : fn `service_handler`, fn `looped_service_connecting`
|
|
///
|
|
/// *managing* : hostname, port
|
|
///
|
|
/// *depends on* : -
|
|
///
|
|
// ! have to be rewritten
|
|
// todo: rewrite use
|
|
async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {
|
|
let addr = format!("{}:{}", hostname, port);
|
|
|
|
match addr.to_socket_addrs() {
|
|
Ok(mut addrs) => {
|
|
if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
|
|
Ok(())
|
|
} else {
|
|
Err(CustomError::Fatal)
|
|
}
|
|
}
|
|
Err(_) => Err(CustomError::Fatal),
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod service_unittests {
|
|
use super::check_service;
|
|
#[tokio::test]
|
|
async fn check_available_service() {
|
|
assert!(check_service("ya.ru", &443).await.is_ok());
|
|
}
|
|
#[tokio::test]
|
|
async fn check_unavailable_service() {
|
|
assert!(check_service("unavailable.service", &1111).await.is_err());
|
|
}
|
|
}
|