diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index 0c6f3bd..e6a59d6 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -1,11 +1,15 @@ // module to handle unix-socket connection + pulling info from api use anyhow::{Error, Result}; -use integr_structs::api::{ApiConfig, ApiConfigV2, Template}; +use integr_structs::api::ApiConfigV2; use log::{error, info}; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; use reqwest::{Client, Method}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tokio::sync::Mutex; +type BufferType = Arc>>; struct RestMethod; impl RestMethod { @@ -42,33 +46,54 @@ impl<'a> ApiPoll<'a> { pub async fn is_default(&self) -> bool { self.config.template.len() == 0 } - pub async fn process_polling(&self) -> Result> { - let mut buffer: Vec = vec![]; + pub async fn process_polling(&self) -> Result<()> { + let buffer: BufferType = Arc::new(Mutex::new(vec![])); + let mut join_handles: Vec> = vec![]; + let client = Arc::new(self.client.clone()); + let template = Arc::new(self.config.template.clone()); + // TODO: rewrite nextly to async - for point in &self.config.template { + for point in template.iter() { + let point = Arc::new(point.clone()); + let buffer = buffer.clone(); + let client = client.clone(); // let a = self.client.get(&point.url).send().await.unwrap(); // a.text().await.unwrap(); - match self.client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { - Ok(resp) => { - if !resp.status().is_success() { - error!("ErrorCode in Response from API. Check configuration"); - continue; - } - if let Ok(text) = resp.text().await { - buffer.push(text); - } else { - error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); - } - }, - Err(er) => { - error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url); - }, - } + let endpoint_processer = tokio::spawn(async move { + let point = point.clone(); + match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { + Ok(resp) => { + if !resp.status().is_success() { + error!("ErrorCode in Response from API. Check configuration"); + return; + } + if let Ok(text) = resp.text().await { + let mut buffer = buffer.lock().await; + buffer.push(text); + } else { + error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); + } + }, + Err(_) => { + error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url); + }, + } + }); + join_handles.push(endpoint_processer); } + + for i in join_handles { + let _ = i.await; + } + + let buffer = buffer.lock().await; match &buffer.len() { 0 => Err(Error::msg("Error due to API grubbing. Check config" )), - _ => Ok(buffer), + _ => { + Ok(()) + }, } + // Ok(()) } pub async fn get_delay(&self) -> u32 { self.config.timeout @@ -80,6 +105,7 @@ pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver