From 2495fb84cfc68eff3c2fdd970838513f1159dfa0 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 22 Apr 2025 11:22:22 -0400 Subject: [PATCH] services controller fixed (fuh) --- noxis-rs/src/utils/services.rs | 131 ++++++++++++++++++++++++++++----- 1 file changed, 114 insertions(+), 17 deletions(-) diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index 11141a7..9f54db3 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -5,24 +5,33 @@ use std::net::{TcpStream, ToSocketAddrs}; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; -use tokio::sync::mpsc::Sender as MpscSender; +use tokio::sync::mpsc::Sender as Sender; pub mod v2 { - use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceWaitConfig, ServiceState}; + use log::info; + + use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState}; use super::*; use std::collections::{HashMap, BTreeMap, VecDeque}; + type MpscSender<'a> = Arc>>; // type EventHandlers<'a> = Vec>>; - type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender>)>; + type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; // type wrapper for service wait queue - type ConnectionQueue<'a> = BTreeMap, MpscSender>)>>; + type ConnectionQueue<'a> = BTreeMap>; + #[derive(Debug)] struct ServicesController<'a> { + // i.e. yandex.ru name : &'a str, + // i.e. yandex.ru:443 access_url : String, + // "OK" or "Unavailable" state: ServiceState, - config: ServiceWaitConfig, + // btree map with key as max wait time and it's key to hashmap + config: ConnectionQueue<'a>, + // Map of processes with their (trigger and mpsc sender) event_registrator : EventHandlers<'a>, } impl<'a> ServicesController<'a> { @@ -31,40 +40,128 @@ pub mod v2 { name : "", access_url : String::new(), state : ServiceState::Unavailable, - config: ServiceWaitConfig::default(), + config: ConnectionQueue::new(), event_registrator : EventHandlers::new(), } } - pub async fn with_params(&mut self, hostname: &'a str, port: Option<&'a str>, event_registrator: EventHandlers<'a>) -> anyhow::Result<()> { + pub fn with_params( + &mut self, + hostname: &'a str, + port: Option<&'a str>, + conn_queue: ConnectionQueue<'a>, + event_reg: EventHandlers<'a>, + ) -> &mut ServicesController<'a> { self.name = hostname; self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))); - self.event_registrator = event_registrator; - Ok(()) + self.config = conn_queue; + self.event_registrator = event_reg; + self } - async fn check_state(&mut self) -> anyhow::Result<()> { + pub fn add_process( + &mut self, + proc_name: &'a str, + trigger: Triggers<'a>, + sender: MpscSender<'a>, + ) { + // queue add + if let Triggers::Service { wait, .. } = trigger { + self.config.entry(wait) + .and_modify(|el| el.push_back(proc_name)) + .or_insert({ + let mut temp = VecDeque::new(); + temp.push_back(proc_name); + temp + }); + } + // event add + self.event_registrator.entry(proc_name).or_insert((trigger, sender)); + } + async fn check_state(&self) -> anyhow::Result<()> { let mut addrs = self.access_url.to_socket_addrs()?; if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))) } Ok(()) } - async fn trigger_on(&mut self) {} + async fn trigger_on(&'a mut self) { + match self.state { + ServiceState::Ok => { + let _ = self.event_registrator + .iter() + .map(|(_, (_, el))| async { + let _ = el.send(Events::Positive(&self.access_url)).await; + }); + }, + ServiceState::Unavailable => { + // looped check and notifying + self.looped_check().await; + }, + } + } + async fn looped_check(self: &'a mut Self) { + let longest = self.config.last_entry().unwrap(); + let longest = longest.key(); + let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1)); + let timer = tokio::time::Instant::now(); + let mut attempt: u32 = 1; + let access_url = Arc::new(self.access_url.clone()); + // let event_registrator = &mut self.event_registrator; + + if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async { + // let access_url = access_url.clone(); + loop { + interapter.tick().await; + info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt); + attempt += 1; + + let state_check_result = self.check_state().await; + + if state_check_result.is_ok() { + info!("Connection to {} is `OK` now", &access_url); + self.state = ServiceState::Ok; + break; + } else { + let now = timer.elapsed(); + let iterator = self.config.iter() + .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) + .flat_map(|(_, a)| a.iter().copied()) + .collect::>(); + + for name in iterator { + let sender_opt = self.event_registrator.get(name) + .map(|(trigger, sender)| + (trigger.to_service_negative_event(name), sender) + ); + + if let Some((tr, tx)) = sender_opt { + let _ = tx.send(tr.unwrap()).await; + } else { + error!("Cannot find {} channel sender in {} service", name, &self.access_url) + } + } + } + } + }).await { + info!("Timeout of establishing connection to {}. ", &access_url); + } + } } impl<'a> ProcessUnit<'a> for ServicesController<'a> { - async fn process(&mut self) { + async fn process(&'a mut self) { // check_service(hostname, port) let current_state = self.check_state().await; match (&self.state, current_state) { (ServiceState::Unavailable, Ok(_)) => { - warn!("Unreachable for connection service `{}`. Notifying {} process(es)", &self.access_url, self.event_registrator.len()); - // - self.state = ServiceState::Unavailable; + warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); + self.state = ServiceState::Ok; + self.trigger_on().await; }, (ServiceState::Ok, Err(_)) => { - warn!("Connection with `{}` service was established. Notifying {} process(es)", &self.access_url, self.event_registrator.len()); - // + warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); self.state = ServiceState::Unavailable; + self.trigger_on().await; }, + (ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url), _ => { /* DEAD END WITH NO INTEREST */ }, } }