+ async endpoint polling

pull/3/head
prplV 2025-01-23 18:22:35 +03:00
parent 498bedf1f4
commit 70b7d41f87
1 changed files with 48 additions and 23 deletions

View File

@ -1,11 +1,15 @@
// module to handle unix-socket connection + pulling info from api
use anyhow::{Error, Result};
use integr_structs::api::{ApiConfig, ApiConfigV2, Template};
use integr_structs::api::ApiConfigV2;
use log::{error, info};
use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration};
use reqwest::{Client, Method};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::sync::Mutex;
type BufferType = Arc<Mutex<Vec<String>>>;
struct RestMethod;
impl RestMethod {
@ -42,33 +46,54 @@ impl<'a> ApiPoll<'a> {
pub async fn is_default(&self) -> bool {
self.config.template.len() == 0
}
pub async fn process_polling(&self) -> Result<Vec<String>> {
let mut buffer: Vec<String> = vec![];
pub async fn process_polling(&self) -> Result<()> {
let buffer: BufferType = Arc::new(Mutex::new(vec![]));
let mut join_handles: Vec<JoinHandle<()>> = vec![];
let client = Arc::new(self.client.clone());
let template = Arc::new(self.config.template.clone());
// TODO: rewrite nextly to async
for point in &self.config.template {
for point in template.iter() {
let point = Arc::new(point.clone());
let buffer = buffer.clone();
let client = client.clone();
// let a = self.client.get(&point.url).send().await.unwrap();
// a.text().await.unwrap();
match self.client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
let endpoint_processer = tokio::spawn(async move {
let point = point.clone();
match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
Ok(resp) => {
if !resp.status().is_success() {
error!("ErrorCode in Response from API. Check configuration");
continue;
return;
}
if let Ok(text) = resp.text().await {
let mut buffer = buffer.lock().await;
buffer.push(text);
} else {
error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
}
},
Err(er) => {
Err(_) => {
error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url);
},
}
});
join_handles.push(endpoint_processer);
}
for i in join_handles {
let _ = i.await;
}
let buffer = buffer.lock().await;
match &buffer.len() {
0 => Err(Error::msg("Error due to API grubbing. Check config" )),
_ => Ok(buffer),
_ => {
Ok(())
},
}
// Ok(())
}
pub async fn get_delay(&self) -> u32 {
self.config.timeout
@ -80,6 +105,7 @@ pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiC
info!("Initializing API-info grubbing mechanism...");
let mut config = config;
let mut poller = ApiPoll::new(&mut config).await;
// let arc_poller = Arc::new(poller);
loop {
if poller.is_default().await {
sleep(Duration::from_secs(5)).await;
@ -101,7 +127,6 @@ pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver<ApiC
mod net_unittests {
use super::*;
use tokio::test;
use integr_structs::api::{ApiConfig, ApiEndpoint};
#[test]
async fn check_str_to_rest_method() {
@ -124,7 +149,7 @@ mod net_unittests {
#[test]
async fn check_api_poll_is_default() {
let mut conf1 = ApiConfigV2::default();
let poll = ApiPoll::new(&mut conf1).await;
let mut poll = ApiPoll::new(&mut conf1).await;
assert!(poll.is_default().await)
}