use crate::options::structs::CustomError; use log::{error, warn}; use std::net::{TcpStream, ToSocketAddrs}; use std::sync::Arc; use tokio::time::Duration; use tokio::sync::mpsc::Sender as Sender; use async_trait::async_trait; pub mod v2 { use log::info; use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState}; use super::*; use std::collections::{HashMap, BTreeMap, VecDeque}; type MpscSender = Arc>; // type EventHandlers<'a> = Vec>>; type EventHandlers = HashMap, (Triggers, MpscSender)>; // type wrapper for service wait queue type ConnectionQueue = BTreeMap>>; #[derive(Debug)] pub struct ServicesController { // i.e. yandex.ru #[allow(unused)] name : String, // i.e. yandex.ru:443 access_url : Arc, // "OK" or "Unavailable" state: ServiceState, // btree map with key as max wait time and it's key to hashmap config: ConnectionQueue, // Map of processes with their (trigger and mpsc sender) event_registrator : EventHandlers, } impl PartialEq for ServicesController { fn eq(&self, other: &Self) -> bool { self.access_url == other.access_url } } impl ServicesController { pub fn new() -> ServicesController { ServicesController { name : String::new(), access_url : Arc::from(String::new()), state : ServiceState::Unavailable, config: ConnectionQueue::new(), event_registrator : EventHandlers::new(), } } pub fn with_access_name( mut self, hostname: &str, access_url: &str, ) -> ServicesController { self.name = hostname.to_string(); self.access_url = Arc::from(access_url); self } pub fn with_params( mut self, conn_queue: ConnectionQueue, event_reg: EventHandlers, ) -> ServicesController { self.config = conn_queue; self.event_registrator = event_reg; self } pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) } pub fn add_process( &mut self, proc_name: &str, trigger: Triggers, sender: MpscSender, ) { let proc_name: Arc = Arc::from(proc_name); // queue add if let Triggers::Service { wait, .. } = trigger { self.config.entry(wait) .and_modify(|el| el.push_back(proc_name.clone())) .or_insert({ let mut temp = VecDeque::new(); temp.push_back(proc_name.clone()); temp }); } // event add self.event_registrator.entry(proc_name).or_insert((trigger, sender)); } async fn check_state(&self) -> anyhow::Result<()> { let mut addrs = self.access_url.to_socket_addrs()?; if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))) } Ok(()) } async fn trigger_on(&mut self) { match self.state { ServiceState::Ok => { let _ = self.event_registrator .iter() .map(|(_, (_, el))| async { let _ = el.send(Events::Positive(self.access_url.clone())).await; }); }, ServiceState::Unavailable => { // looped check and notifying self.looped_check().await; }, } } async fn looped_check(self: &mut Self) { let longest = self.config.last_entry().unwrap(); let longest = longest.key(); let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1)); let timer = tokio::time::Instant::now(); let mut attempt: u32 = 1; let access_url = Arc::new(self.access_url.clone()); // let event_registrator = &mut self.event_registrator; if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async { // let access_url = access_url.clone(); loop { interapter.tick().await; info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt); attempt += 1; let state_check_result = self.check_state().await; if state_check_result.is_ok() { info!("Connection to {} is `OK` now", &access_url); self.state = ServiceState::Ok; break; } else { let now = timer.elapsed(); let iterator = self.config.iter() .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) .flat_map(|(_, a)| a.iter().cloned()) .collect::>>(); for name in iterator { let proc_name = name.to_string(); info!("Trying to notify process `{}` ...", &proc_name); let sender_opt = self.event_registrator.get(&name) .map(|(trigger, sender)| (trigger.to_service_negative_event(name.clone()), sender) ); if let Some((tr, tx)) = sender_opt { let _ = tx.send(tr.unwrap()).await; } else { error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url) } } } } }).await { info!("Timeout of establishing connection to {}. ", &access_url); } } } #[async_trait] impl ProcessUnit for ServicesController { async fn process(&mut self) { // check_service(hostname, port) let current_state = self.check_state().await; match (&self.state, current_state) { (ServiceState::Unavailable, Ok(_)) => { warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); self.state = ServiceState::Ok; self.trigger_on().await; }, (ServiceState::Ok, Err(_)) => { warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); self.state = ServiceState::Unavailable; self.trigger_on().await; }, (ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url), _ => { /* DEAD END WITH NO INTEREST */ }, } } } } /// # Fn `service_handler` /// ## function to realize mechanism of current process' dep services monitoring /// /// *input* : `&str`, `&Vec`, `Arc>` /// /// *output* : () /// /// *initiator* : fn `utils::running_handler` /// /// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer /// /// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` /// // pub async fn service_handler( // name: &str, // services: &Vec, // tx: Arc>, // ) -> Result<(), CustomError> { // // println!("service daemon on {}", name); // for serv in services { // if check_service(&serv.hostname, &serv.port).await.is_err() { // if !is_active(name).await || is_frozen(name).await { // return Err(CustomError::Fatal); // } // error!( // "Service {}:{} is unreachable for process {}", // &serv.hostname, &serv.port, &name // ); // match serv.triggers.on_lost.as_str() { // "stay" => { // tx.send(4).await.unwrap(); // continue; // } // "stop" => { // if looped_service_connecting(name, serv).await.is_err() { // tx.send(5).await.unwrap(); // tokio::task::yield_now().await; // return Err(CustomError::Fatal); // } // } // "hold" => { // // if is_frozen(name).await { // // return Err(CustomError::Fatal); // // } // if looped_service_connecting(name, serv).await.is_err() { // tx.send(6).await.unwrap(); // tokio::task::yield_now().await; // return Err(CustomError::Fatal); // } // } // _ => { // tx.send(101).await.unwrap(); // return Err(CustomError::Fatal); // } // } // } // } // tokio::time::sleep(Duration::from_millis(100)).await; // Ok(()) // } /// # Fn `looped_service_connecting` /// ## for service's state check in loop (with delay and restriction of attempts) /// /// *input* : `&str`, `&Services` /// /// *output* : Ok(()) if service now available | Err(er) if still not /// /// *initiator* : fn `service_handler` /// /// *managing* : process name, current service struct /// /// *depends on* : fn `check_service` /// // async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { // if serv.triggers.wait == 0 { // loop { // tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; // warn!( // "Attempting to connect from {} process to {}:{}", // &name, &serv.hostname, &serv.port // ); // match check_service(&serv.hostname, &serv.port).await { // Ok(_) => { // log::info!( // "Successfully connected to {} from {} process!", // &serv.hostname, // &name // ); // break; // } // Err(_) => { // tokio::task::yield_now().await; // } // } // } // Ok(()) // } else { // let start = Instant::now(); // while start.elapsed().as_secs() < serv.triggers.wait.into() { // tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; // warn!( // "Attempting to connect from {} process to {}:{}", // &name, &serv.hostname, &serv.port // ); // match check_service(&serv.hostname, &serv.port).await { // Ok(_) => { // log::info!( // "Successfully connected to {} from {} process!", // &serv.hostname, // &name // ); // return Ok(()); // } // Err(_) => { // tokio::task::yield_now().await; // } // } // } // Err(CustomError::Fatal) // } // } /// # Fn `check_service` /// ## for check current service's availiability /// /// *input* : `&str`, `&u32` /// /// *output* : Ok(()) if service now available | Err(er) if still not /// /// *initiator* : fn `service_handler`, fn `looped_service_connecting` /// /// *managing* : hostname, port /// /// *depends on* : - /// // ! have to be rewritten // todo: rewrite use async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { let addr = format!("{}:{}", hostname, port); match addr.to_socket_addrs() { Ok(mut addrs) => { if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { Ok(()) } else { Err(CustomError::Fatal) } } Err(_) => Err(CustomError::Fatal), } } #[cfg(test)] mod service_unittests { use super::check_service; #[tokio::test] async fn check_available_service() { assert!(check_service("ya.ru", &443).await.is_ok()); } #[tokio::test] async fn check_unavailable_service() { assert!(check_service("unavailable.service", &1111).await.is_err()); } }