serv + config fix

migrate
prplV 2025-05-26 14:32:58 +03:00
parent 3f98fd7f24
commit 54d2b1aaf7
4 changed files with 41 additions and 18 deletions

View File

@ -21,11 +21,11 @@
"port": 443,
"triggers": {
"wait": 10,
"onLost": "restart"
"onLost": "stop"
}
}
]
}
}
]
}
}

View File

@ -189,7 +189,7 @@ pub mod v2 {
},
Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name);
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3)));
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(1)));
loop {
if let Ok(msg) = pub_sub.get_message() {
// dbg!("ok on get message");
@ -325,10 +325,6 @@ pub mod v2 {
if !events.is_empty() {
warn!("Local config file was overwritten. Discarding changes ...");
need_to_export_config = true;
// events
// .iter()
// .any(|event| *event == EventMask::DELETE_SELF)
// .then(|| need_to_recreate_watcher = true);
}
}
}
@ -686,7 +682,7 @@ fn get_connection_watcher(client: &Client) -> Connection {
///
fn restart_main_thread() -> std::io::Result<()> {
let current_exe = env::current_exe()?;
Command::new(current_exe).exec();
let _ = Command::new(current_exe).exec();
Ok(())
}

View File

@ -88,7 +88,7 @@ pub mod v2 {
"stop" => {
if is_active(&self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
terminate_process(&self.name).await;
let _ = terminate_process(&self.name).await;
self.state = ProcessState::Stopped;
self.pid = Pid::new();
}
@ -96,7 +96,7 @@ pub mod v2 {
"user-stop" => {
if is_active(&self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name);
terminate_process(&self.name).await;
let _ = terminate_process(&self.name).await;
self.state = ProcessState::StoppedByCli;
self.pid = Pid::new();
}

View File

@ -95,10 +95,37 @@ pub mod v2 {
self.event_registrator.entry(proc_name).or_insert((trigger, sender));
}
async fn check_state(&self) -> anyhow::Result<()> {
let mut addrs = self.access_url.to_socket_addrs()?;
if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)))
let url = self.access_url.clone();
let resolve_future = tokio::task::spawn_blocking(move || {
url.to_socket_addrs()
});
let addrs: Vec<_> = match tokio::time::timeout(Duration::from_secs(1), resolve_future).await {
Ok(Ok(addrs)) => addrs?.collect(),
Ok(Err(er)) => return Err(er.into()),
Err(_) => return Err(anyhow::Error::msg("DNS resolution timeout")),
};
if addrs.is_empty() {
return Err(anyhow::Error::msg("No addresses resolved"));
}
let tasks: Vec<_> = addrs.into_iter().map(|addr| async move {
match tokio::time::timeout(Duration::from_secs(1), tokio::net::TcpStream::connect(&addr)).await {
Ok(Ok(_)) => Some(addr),
_ => None,
}
}).collect();
let mut any_success = false;
for task in futures::future::join_all(tasks).await {
if task.is_some() {
any_success = true;
break;
}
}
if !any_success {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url)));
}
Ok(())
}
async fn trigger_on(&mut self) {
@ -123,7 +150,6 @@ pub mod v2 {
let timer = tokio::time::Instant::now();
let mut attempt: u32 = 1;
let access_url = Arc::new(self.access_url.clone());
// let event_registrator = &mut self.event_registrator;
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async {
// let access_url = access_url.clone();
@ -133,15 +159,16 @@ pub mod v2 {
attempt += 1;
let state_check_result = self.check_state().await;
if state_check_result.is_ok() {
info!("Connection to {} is `OK` now", &access_url);
self.state = ServiceState::Ok;
self.state = ServiceState::Ok;
break;
} else {
let now = timer.elapsed();
let iterator = self.config.iter()
.filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now)
.filter(|(&wait, _)| tokio::time::Duration::from_secs(wait as u64) <= now)
.flat_map(|(_, a)| a.iter().cloned())
.collect::<VecDeque<Arc<str>>>();
@ -178,7 +205,7 @@ pub mod v2 {
self.trigger_on().await;
},
(ServiceState::Ok, Err(_)) => {
warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
warn!("Unreachable for connection service `{}`. Initializing reconnect mechanism ...", &self.access_url);
self.state = ServiceState::Unavailable;
self.trigger_on().await;
},