From 6d56d1e39c38cc9ca17433d68420a680210fe313 Mon Sep 17 00:00:00 2001 From: prplV Date: Fri, 25 Apr 2025 10:56:42 -0400 Subject: [PATCH] big change but it's still not working in utils.rs --- noxis-rs/Cargo.toml | 2 + noxis-rs/settings.json | 1 - noxis-rs/src/main.rs | 14 +- noxis-rs/src/options/config.rs | 4 +- noxis-rs/src/options/structs.rs | 10 +- noxis-rs/src/utils.rs | 252 +++++++++++++++++++------------- noxis-rs/src/utils/files.rs | 22 ++- noxis-rs/src/utils/prcs.rs | 7 +- noxis-rs/src/utils/services.rs | 214 ++++++++++++++------------- 9 files changed, 308 insertions(+), 218 deletions(-) diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 8207b26..8e35c7d 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -18,3 +18,5 @@ sysinfo = "0.32.0" tokio = { version = "1.38.0", features = ["full", "time"] } noxis-cli = { path = "../noxis-cli" } dotenv = "0.15.0" +futures = "0.3.31" +async-trait = "0.1.88" diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index bb5f44a..496fe2a 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -22,7 +22,6 @@ "port": 443, "triggers": { "wait": 10, - "delay": 2, "onLost": "restart" } } diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index aa9eeb1..79e5b8c 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -58,7 +58,19 @@ async fn main() -> anyhow::Result<()>{ handler.push(ctrlc); let monitoring = tokio::spawn(async move { - if let Err(er) = init_monitoring(&mut rx_brd).await { + let config = if !rx_brd.is_empty() { + rx_brd.recv().await? + } else { + let mut tick = tokio::time::interval(Duration::from_millis(500)); + loop { + tick.tick().await; + break match rx_brd.try_recv() { + Ok(conf) => conf, + Err(_) => continue, + } + } + }; + if let Err(er) = init_monitoring(config).await { error!("Monitoring mod failed due to {}", er); } }); diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 9c2e69f..9e21042 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -276,7 +276,7 @@ pub mod v2 { // 100% local exists here // create watcher on local config file - match create_watcher("", local_config_path).await { + match create_watcher("", local_config_path) { Ok(mut watcher) => { loop { let mut need_to_export_config = false; @@ -340,7 +340,7 @@ pub mod v2 { // recreation watcher (draining activity buffer mechanism) // if local config file was deleted and recreated // if local config file was modified locally - match create_watcher("", local_config_path).await { + match create_watcher("", local_config_path) { Ok(new) => watcher = new, Err(er) => error!("Cannot create new watcher due to {}", er), } diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 7d4d81b..c253282 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -2,6 +2,7 @@ use std::net::Ipv4Addr; use serde::{Deserialize, Serialize}; +use async_trait::async_trait; pub enum DependencyType { File, @@ -71,6 +72,7 @@ impl<'a> Triggers<'a> { } } +#[derive(Debug)] pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str } pub struct ServiceTriggersForController<'a>(&'a str); @@ -83,6 +85,7 @@ impl std::fmt::Display for DependencyType { } } +#[derive(Debug)] pub enum ProcessState { Pending, Holding, @@ -99,10 +102,10 @@ pub enum NegativeOutcomes<'a> { ServiceIsUnreachable(&'a str, DependencyType, &'a str), } +#[async_trait] pub trait ProcessUnit<'a> { - fn process(&'a mut self) -> impl std::future::Future + Send; + async fn process(&'a mut self); } - /// # an Error enum (next will be deleted and replaced) pub enum CustomError { Fatal, @@ -251,7 +254,7 @@ pub struct Files { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Services { pub hostname: String, - pub port: u32, + pub port: Option, pub triggers: ServiceTriggers, } @@ -275,7 +278,6 @@ pub struct Services { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ServiceTriggers { pub wait: u32, - pub delay: u32, #[serde(rename = "onLost")] pub on_lost: String, } diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 3430fac..3018f69 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -7,35 +7,42 @@ pub mod services; // TODO : saving current flags state use crate::options::structs::{CustomError, TrackingProcess, Processes}; -use files::create_watcher; -use files::file_handler; -use inotify::Inotify; +// use files::create_watcher; +// use files::file_handler; +// use inotify::Inotify; use log::{error, warn, info}; use prcs::{ freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, unfreeze_process, }; -use services::service_handler; +// use services::service_handler; use std::process::Command; use std::sync::Arc; -use tokio::join; +// use tokio::join; use tokio::sync::mpsc; use tokio::time::Duration; use tokio::sync::broadcast::Receiver; -use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; +// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; // controllers import use prcs::v2::ProcessesController; use files::v2::FilesController; use services::v2::ServicesController; +use async_trait::async_trait; const GET_ID_CMD: &str = "hostname"; pub mod v2 { - use std::collections::{HashMap, LinkedList}; - use crate::options::structs::{Events, FileTriggersForController, ProcessUnit}; - + use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; + use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers}; use super::*; + enum ControllerResult<'a> { + Process(Option>), + File(Option>), + Service(Option>), + } + + #[derive(Debug)] struct Supervisor<'a> { prcs : LinkedList>, files : LinkedList>, @@ -43,35 +50,92 @@ pub mod v2 { } impl<'a> Supervisor<'a> { - pub fn new(config: &'a Processes) -> Supervisor<'a> { - let mut p = LinkedList::new(); - let mut f = LinkedList::new(); - let mut s = LinkedList::new(); - + pub fn new() -> Supervisor<'a> { + Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()} + } + pub async fn with_config(mut self, config: &'a Processes) -> Supervisor<'a> { let _ = config.processes.iter() - .map(|prc| { + .for_each(|prc| { let (rx, tx) = mpsc::channel::>(10); - let temp = ProcessesController::new(&prc.name, tx); - if !p.contains(&temp) { - p.push_back(temp); + let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); + if !self.prcs.contains(&temp) { + self.prcs.push_back(temp); } let rx = Arc::new(rx); - // files + let _ = prc.dependencies.files.iter() - .map(|file| async { + .for_each(|file| { let mut hm = HashMap::new(); let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete}; - hm.insert(&prc.name, (triggers, rx.clone())); - let tempfile = FilesController::new(&file.filename, hm).with_path(file.src).await; + hm.insert(prc.name.as_str(), (triggers, rx.clone())); + + let tempfile = FilesController::new(&file.filename.as_str(), hm) + .with_path(&file.src); + + + if let Ok(file) = tempfile { + if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) { + current_file.add_event(file); + } else { + self.files.push_back(file); + } + } }); + // servs let _ = prc.dependencies.services.iter() - .map(|serv| { - + .for_each(|serv| { + let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref()); + // preparations + let rx = rx.clone(); + let serv_cont = ServicesController::new().with_access_name( + &serv.hostname, + access_url + ); + // triggers + let triggers = Triggers::new_service(&serv.triggers.on_lost, serv.triggers.wait); + + if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) { + proc.add_process(&prc.name, triggers, rx); + } else { + // vecdeque for queue + let mut vec: VecDeque<&'a str> = VecDeque::new(); + vec.push_back(&prc.name); + // connection_queue + let mut connection_queue: BTreeMap> = BTreeMap::new(); + connection_queue.insert(serv.triggers.wait, vec); + // event_reg + let mut hm = HashMap::new(); + hm.insert(prc.name.as_str(), (triggers, rx)); + + let serv_cont = serv_cont.with_params(connection_queue, hm); + self.services.push_back(serv_cont); + } }); }); + self + } + pub fn get_stats(&self) -> String { + format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len()) + } + async fn proccess_prc(&mut self) { - Supervisor { prcs: p, files: f, services: s } + } + } + + #[async_trait] + impl<'a> ProcessUnit<'a> for Supervisor<'a> { + async fn process(&'a mut self) { + info!("Initializing monitoring ..."); + loop { + // let mut tasks: Vec> = vec![]; + // let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap()); + // let res = tokio::join!(prc.process(), file.process(), serv.process()); + if let Some(mut val) = self.prcs.pop_front() { + tokio::spawn(async move {val.process().await;}).await; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } } } @@ -81,23 +145,11 @@ pub mod v2 { // spawn services // ## for ... i.await in loop pub async fn init_monitoring( - local_config: &mut Receiver, + config: Processes ) -> anyhow::Result<()> { - let config = if !local_config.is_empty() { - local_config.recv().await? - } else { - let mut tick = tokio::time::interval(Duration::from_millis(500)); - loop { - tick.tick().await; - break match local_config.try_recv() { - Ok(conf) => conf, - Err(_) => continue, - } - } - }; - info!("Processing {} processes ...", config.processes.len()); - // LinkedList - // LinkedList + let mut supervisor = Supervisor::new().with_config(&config).await; + info!("Monitoring: {} ", &supervisor.get_stats()); + supervisor.process().await; Ok(()) } @@ -140,37 +192,37 @@ pub mod v2 { /// /// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process /// -pub async fn run_daemons( - proc: Arc, - tx: Arc>, - rx: &mut mpsc::Receiver, -) { - // creating watchers + ---buffers--- - let mut watchers: Vec = vec![]; - for file in proc.dependencies.files.clone().into_iter() { - if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { - watchers.push(watcher); - } else { - let _ = tx.send(121).await; - } - // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); - } - let watchers_clone: Arc>> = - Arc::new(tokio::sync::Mutex::new(watchers)); +// pub async fn run_daemons( +// proc: Arc, +// tx: Arc>, +// rx: &mut mpsc::Receiver, +// ) { +// // creating watchers + ---buffers--- +// let mut watchers: Vec = vec![]; +// for file in proc.dependencies.files.clone().into_iter() { +// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { +// watchers.push(watcher); +// } else { +// let _ = tx.send(121).await; +// } +// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); +// } +// let watchers_clone: Arc>> = +// Arc::new(tokio::sync::Mutex::new(watchers)); - loop { - let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); - tokio::select! { - _ = run_hand => continue, - _val = rx.recv() => { - if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { - return; - } - }, - } - tokio::task::yield_now().await; - } -} +// loop { +// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); +// tokio::select! { +// _ = run_hand => continue, +// _val = rx.recv() => { +// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { +// return; +// } +// }, +// } +// tokio::task::yield_now().await; +// } +// } async fn process_protocol_symbol(proc: Arc, val: u8) -> Result<(), CustomError>{ match val { @@ -300,36 +352,36 @@ async fn process_protocol_symbol(proc: Arc, val: u8) -> Result< /// /// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` /// -pub async fn running_handler( - prc: Arc, - tx: Arc>, - watchers: Arc>>, -) { - // services and files check (once) - let files_check = file_handler( - &prc.name, - &prc.dependencies.files, - tx.clone(), - watchers.clone(), - ); - let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); +// pub async fn running_handler( +// prc: Arc, +// tx: Arc>, +// watchers: Arc>>, +// ) { +// // services and files check (once) +// let files_check = file_handler( +// &prc.name, +// &prc.dependencies.files, +// tx.clone(), +// watchers.clone(), +// ); +// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - let res = join!(files_check, services_check); - // if inactive -> spawn checks -> active is true - if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { - if start_process(&prc.name, &prc.path).await.is_err() { - tx.send(3).await.unwrap(); - return; - } - } - // if frozen -> spawn checks -> unfreeze is true - else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { - tx.send(10).await.unwrap(); - return; - } - // tokio::time::sleep(Duration::from_millis(100)).await; - tokio::task::yield_now().await; -} +// let res = join!(files_check, services_check); +// // if inactive -> spawn checks -> active is true +// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { +// if start_process(&prc.name, &prc.path).await.is_err() { +// tx.send(3).await.unwrap(); +// return; +// } +// } +// // if frozen -> spawn checks -> unfreeze is true +// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { +// tx.send(10).await.unwrap(); +// return; +// } +// // tokio::time::sleep(Duration::from_millis(100)).await; +// tokio::task::yield_now().await; +// } // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' /// # Fn `get_container_id` diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index 00d4033..435813c 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -8,6 +8,7 @@ use tokio::sync::mpsc::Sender as Sender; use tokio::time::Duration; use crate::options::structs::Events; + use async_trait::async_trait; pub mod v2 { use log::{error, info, warn}; @@ -21,6 +22,7 @@ // type EventHandlers<'a> = HashMap type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; + #[derive(Debug)] pub struct FilesController<'a> { name : &'a str, path : String, @@ -46,10 +48,10 @@ code_name : name.to_string(), } } - pub async fn with_path(mut self, path: impl AsRef) -> anyhow::Result> { + pub fn with_path(mut self, path: impl AsRef) -> anyhow::Result> { self.path = path.as_ref().to_string_lossy().into_owned(); self.watcher = { - match create_watcher(self.name, &self.path).await { + match create_watcher(self.name, &self.path) { Ok(val) => Some(val), Err(er) => { error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); @@ -60,6 +62,11 @@ self.code_name = format!("{}{}", &self.path, &self.code_name); Ok(self) } + pub fn add_event(&mut self, file_controller : FilesController<'a>) { + for (k, v) in file_controller.triggers { + self.triggers.entry(k).or_insert(v); + } + } async fn trigger_on(&'a mut self, trigger_type: Option) { let _ = self.triggers.iter() .map(|(prc_name, (triggers, channel))| async { @@ -77,6 +84,7 @@ }); } } + #[async_trait] impl<'a> ProcessUnit<'a> for FilesController<'a> { async fn process(&'a mut self) { // polling file check @@ -92,7 +100,7 @@ ) { warn!("File {} ({}) was changed", self.name, &self.path); if recreate_watcher { - self.watcher = match create_watcher(self.name, &self.path).await { + self.watcher = match create_watcher(self.name, &self.path) { Ok(notifier) => Some(notifier), Err(er) => { error!("Failed to recreate watcher for {} ({}) due to {}", @@ -135,7 +143,7 @@ /// /// *depends on* : - /// - pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result { + pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result { let src = format!("{}{}", path, filename); let inotify: Inotify = Inotify::init()?; inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; @@ -214,7 +222,7 @@ let mutex = notify.borrow_mut(); // *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); - if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { + if let Ok(watcher) = create_watcher(&file.filename, &file.src) { *mutex = watcher; } } @@ -277,12 +285,12 @@ use super::*; #[tokio::test] async fn try_to_create_watcher() { - let res = create_watcher("dep-file", "./tests/examples/").await; + let res = create_watcher("dep-file", "./tests/examples/"); assert!(res.is_ok()); } #[tokio::test] async fn try_to_create_invalid_watcher() { - let res = create_watcher("invalid-file", "/path/to/the/no/dir").await; + let res = create_watcher("invalid-file", "/path/to/the/no/dir"); assert!(res.is_err()); } #[tokio::test] diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index b2a966f..bb0b94d 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -5,6 +5,7 @@ use tokio::time::Duration; use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit}; use std::collections::HashSet; use tokio::sync::mpsc::Receiver as MpscReciever; +use async_trait::async_trait; pub mod v2 { use log::info; @@ -13,6 +14,7 @@ pub mod v2 { use super::*; + #[derive(Debug)] pub struct ProcessesController<'a> { name: &'a str, bin: String, @@ -70,10 +72,11 @@ pub mod v2 { } tokio::time::sleep(Duration::from_micros(100)).await; } - } + } + #[async_trait] impl<'a> ProcessUnit<'a> for ProcessesController<'a> { - async fn process(&mut self) { + async fn process(&'a mut self) { if self.negative_events.len() == 0 { match self.state { ProcessState::Holding => { diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index 0dd56f6..9cec823 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; use tokio::sync::mpsc::Sender as Sender; +use async_trait::async_trait; pub mod v2 { use log::info; @@ -52,19 +53,29 @@ pub mod v2 { event_registrator : EventHandlers::new(), } } - pub fn with_params( - &mut self, + pub fn with_access_name( + mut self, hostname: &'a str, - port: Option<&'a str>, + access_url: String, + ) -> ServicesController<'a> { + self.name = hostname; + self.access_url = access_url; + self + } + + pub fn with_params( + mut self, conn_queue: ConnectionQueue<'a>, event_reg: EventHandlers<'a>, - ) -> &mut ServicesController<'a> { - self.name = hostname; - self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))); + ) -> ServicesController<'a> { self.config = conn_queue; self.event_registrator = event_reg; self } + + pub fn get_access_url(hostname: &'a str, port: Option<&u32>) -> String { + format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) + } pub fn add_process( &mut self, proc_name: &'a str, @@ -154,6 +165,7 @@ pub mod v2 { } } } + #[async_trait] impl<'a> ProcessUnit<'a> for ServicesController<'a> { async fn process(&'a mut self) { // check_service(hostname, port) @@ -189,53 +201,53 @@ pub mod v2 { /// /// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` /// -pub async fn service_handler( - name: &str, - services: &Vec, - tx: Arc>, -) -> Result<(), CustomError> { - // println!("service daemon on {}", name); - for serv in services { - if check_service(&serv.hostname, &serv.port).await.is_err() { - if !is_active(name).await || is_frozen(name).await { - return Err(CustomError::Fatal); - } - error!( - "Service {}:{} is unreachable for process {}", - &serv.hostname, &serv.port, &name - ); - match serv.triggers.on_lost.as_str() { - "stay" => { - tx.send(4).await.unwrap(); - continue; - } - "stop" => { - if looped_service_connecting(name, serv).await.is_err() { - tx.send(5).await.unwrap(); - tokio::task::yield_now().await; - return Err(CustomError::Fatal); - } - } - "hold" => { - // if is_frozen(name).await { - // return Err(CustomError::Fatal); - // } - if looped_service_connecting(name, serv).await.is_err() { - tx.send(6).await.unwrap(); - tokio::task::yield_now().await; - return Err(CustomError::Fatal); - } - } - _ => { - tx.send(101).await.unwrap(); - return Err(CustomError::Fatal); - } - } - } - } - tokio::time::sleep(Duration::from_millis(100)).await; - Ok(()) -} +// pub async fn service_handler( +// name: &str, +// services: &Vec, +// tx: Arc>, +// ) -> Result<(), CustomError> { +// // println!("service daemon on {}", name); +// for serv in services { +// if check_service(&serv.hostname, &serv.port).await.is_err() { +// if !is_active(name).await || is_frozen(name).await { +// return Err(CustomError::Fatal); +// } +// error!( +// "Service {}:{} is unreachable for process {}", +// &serv.hostname, &serv.port, &name +// ); +// match serv.triggers.on_lost.as_str() { +// "stay" => { +// tx.send(4).await.unwrap(); +// continue; +// } +// "stop" => { +// if looped_service_connecting(name, serv).await.is_err() { +// tx.send(5).await.unwrap(); +// tokio::task::yield_now().await; +// return Err(CustomError::Fatal); +// } +// } +// "hold" => { +// // if is_frozen(name).await { +// // return Err(CustomError::Fatal); +// // } +// if looped_service_connecting(name, serv).await.is_err() { +// tx.send(6).await.unwrap(); +// tokio::task::yield_now().await; +// return Err(CustomError::Fatal); +// } +// } +// _ => { +// tx.send(101).await.unwrap(); +// return Err(CustomError::Fatal); +// } +// } +// } +// } +// tokio::time::sleep(Duration::from_millis(100)).await; +// Ok(()) +// } /// # Fn `looped_service_connecting` /// ## for service's state check in loop (with delay and restriction of attempts) @@ -250,54 +262,54 @@ pub async fn service_handler( /// /// *depends on* : fn `check_service` /// -async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { - if serv.triggers.wait == 0 { - loop { - tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; - warn!( - "Attempting to connect from {} process to {}:{}", - &name, &serv.hostname, &serv.port - ); - match check_service(&serv.hostname, &serv.port).await { - Ok(_) => { - log::info!( - "Successfully connected to {} from {} process!", - &serv.hostname, - &name - ); - break; - } - Err(_) => { - tokio::task::yield_now().await; - } - } - } - Ok(()) - } else { - let start = Instant::now(); - while start.elapsed().as_secs() < serv.triggers.wait.into() { - tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; - warn!( - "Attempting to connect from {} process to {}:{}", - &name, &serv.hostname, &serv.port - ); - match check_service(&serv.hostname, &serv.port).await { - Ok(_) => { - log::info!( - "Successfully connected to {} from {} process!", - &serv.hostname, - &name - ); - return Ok(()); - } - Err(_) => { - tokio::task::yield_now().await; - } - } - } - Err(CustomError::Fatal) - } -} +// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { +// if serv.triggers.wait == 0 { +// loop { +// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; +// warn!( +// "Attempting to connect from {} process to {}:{}", +// &name, &serv.hostname, &serv.port +// ); +// match check_service(&serv.hostname, &serv.port).await { +// Ok(_) => { +// log::info!( +// "Successfully connected to {} from {} process!", +// &serv.hostname, +// &name +// ); +// break; +// } +// Err(_) => { +// tokio::task::yield_now().await; +// } +// } +// } +// Ok(()) +// } else { +// let start = Instant::now(); +// while start.elapsed().as_secs() < serv.triggers.wait.into() { +// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; +// warn!( +// "Attempting to connect from {} process to {}:{}", +// &name, &serv.hostname, &serv.port +// ); +// match check_service(&serv.hostname, &serv.port).await { +// Ok(_) => { +// log::info!( +// "Successfully connected to {} from {} process!", +// &serv.hostname, +// &name +// ); +// return Ok(()); +// } +// Err(_) => { +// tokio::task::yield_now().await; +// } +// } +// } +// Err(CustomError::Fatal) +// } +// } /// # Fn `check_service` /// ## for check current service's availiability