diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index b27d9c7..4dae157 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -21,11 +21,11 @@ "port": 443, "triggers": { "wait": 10, - "onLost": "restart" + "onLost": "stop" } } ] } } ] -} +} \ No newline at end of file diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 11350cc..3a461e1 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -189,7 +189,7 @@ pub mod v2 { }, Ok(_) => { info!("Successfully subscribed to {} pubsub channel", channel_name); - let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3))); + let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(1))); loop { if let Ok(msg) = pub_sub.get_message() { // dbg!("ok on get message"); @@ -325,10 +325,6 @@ pub mod v2 { if !events.is_empty() { warn!("Local config file was overwritten. Discarding changes ..."); need_to_export_config = true; - // events - // .iter() - // .any(|event| *event == EventMask::DELETE_SELF) - // .then(|| need_to_recreate_watcher = true); } } } @@ -686,7 +682,7 @@ fn get_connection_watcher(client: &Client) -> Connection { /// fn restart_main_thread() -> std::io::Result<()> { let current_exe = env::current_exe()?; - Command::new(current_exe).exec(); + let _ = Command::new(current_exe).exec(); Ok(()) } diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 4ed600b..23a712c 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -88,7 +88,7 @@ pub mod v2 { "stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); - terminate_process(&self.name).await; + let _ = terminate_process(&self.name).await; self.state = ProcessState::Stopped; self.pid = Pid::new(); } @@ -96,7 +96,7 @@ pub mod v2 { "user-stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); - terminate_process(&self.name).await; + let _ = terminate_process(&self.name).await; self.state = ProcessState::StoppedByCli; self.pid = Pid::new(); } diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index a381cb1..33737a3 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -95,10 +95,37 @@ pub mod v2 { self.event_registrator.entry(proc_name).or_insert((trigger, sender)); } async fn check_state(&self) -> anyhow::Result<()> { - let mut addrs = self.access_url.to_socket_addrs()?; - if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { - return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))) + let url = self.access_url.clone(); + let resolve_future = tokio::task::spawn_blocking(move || { + url.to_socket_addrs() + }); + let addrs: Vec<_> = match tokio::time::timeout(Duration::from_secs(1), resolve_future).await { + Ok(Ok(addrs)) => addrs?.collect(), + Ok(Err(er)) => return Err(er.into()), + Err(_) => return Err(anyhow::Error::msg("DNS resolution timeout")), + }; + + if addrs.is_empty() { + return Err(anyhow::Error::msg("No addresses resolved")); } + + let tasks: Vec<_> = addrs.into_iter().map(|addr| async move { + match tokio::time::timeout(Duration::from_secs(1), tokio::net::TcpStream::connect(&addr)).await { + Ok(Ok(_)) => Some(addr), + _ => None, + } + }).collect(); + let mut any_success = false; + for task in futures::future::join_all(tasks).await { + if task.is_some() { + any_success = true; + break; + } + } + if !any_success { + return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))); + } + Ok(()) } async fn trigger_on(&mut self) { @@ -123,7 +150,6 @@ pub mod v2 { let timer = tokio::time::Instant::now(); let mut attempt: u32 = 1; let access_url = Arc::new(self.access_url.clone()); - // let event_registrator = &mut self.event_registrator; if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async { // let access_url = access_url.clone(); @@ -133,15 +159,16 @@ pub mod v2 { attempt += 1; let state_check_result = self.check_state().await; - + if state_check_result.is_ok() { info!("Connection to {} is `OK` now", &access_url); - self.state = ServiceState::Ok; + self.state = ServiceState::Ok; break; } else { let now = timer.elapsed(); + let iterator = self.config.iter() - .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) + .filter(|(&wait, _)| tokio::time::Duration::from_secs(wait as u64) <= now) .flat_map(|(_, a)| a.iter().cloned()) .collect::>>(); @@ -178,7 +205,7 @@ pub mod v2 { self.trigger_on().await; }, (ServiceState::Ok, Err(_)) => { - warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); + warn!("Unreachable for connection service `{}`. Initializing reconnect mechanism ...", &self.access_url); self.state = ServiceState::Unavailable; self.trigger_on().await; },