// module to handle unix-socket connection + pulling info from api use anyhow::{Error, Result}; use integr_structs::api::{ApiConfigV2, ProcessedEndpoint}; use log::{error, info}; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; use reqwest::{Client, Method}; use std::sync::Arc; use tokio::task::JoinHandle; // use tokio::sync::Mutex; use dotenv::dotenv; use crate::json::JsonParser; use crate::export::Exporter; use integr_structs::api::v3::Config; // type BufferType = Arc>>; // for api info pulling pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver) -> Result<()> { info!("Initializing API-info grubbing mechanism..."); info!("Loading vars from .env file if exists..."); let _ = dotenv().ok(); let mut config = config; let mut poller = ApiPoll::new(&mut config).await; let client = Exporter::init(); let shared_pool = Arc::new(client); loop { let shared_pool = shared_pool.clone(); if poller.is_default().await { sleep(Duration::from_secs(5)).await; } else { if rx.len() > 0 { if let Some(conf) = rx.recv().await { poller.change_config(conf).await; info!("Config changed"); } } info!("Data from API: {:?}", poller.process_polling(shared_pool).await); sleep(Duration::from_secs(poller.get_delay().await as u64)).await; } } // Ok(()) } struct RestMethod; impl RestMethod { pub async fn from_str(method: &str) -> Method { return match method.trim().to_lowercase().as_str() { "post" => Method::POST, "patch" => Method::PATCH, "put" => Method::PUT, "delete" => Method::DELETE, "head" => Method::HEAD, "trace" => Method::TRACE, "options" => Method::OPTIONS, "connect" => Method::CONNECT, "get" | _ => Method::GET } } } struct ApiPoll<'a> { config : &'a mut ApiConfigV2, client : Client, } impl<'a> ApiPoll<'a> { pub async fn new(poll_cfg : &'a mut ApiConfigV2) -> Self { Self { config : poll_cfg, client : Client::new(), } } // can be weak and with bug test needed pub async fn change_config(&mut self, conf: ApiConfigV2) { *self.config = conf; } pub async fn is_default(&self) -> bool { self.config.template.len() == 0 } pub async fn process_polling(&self, exporter: Arc) -> Result<()> { // let buffer: BufferType = Arc::new(Mutex::new(vec![])); let mut join_handles: Vec>> = vec![]; let client = Arc::new(self.client.clone()); let template = Arc::new(self.config.template.clone()); if self.is_default().await { return Err(Error::msg("Default config with no endpoints")) } // TODO: rewrite nextly to async for point in template.iter() { let point = Arc::new(point.clone()); // let buffer = buffer.clone(); let client = client.clone(); let exporter = exporter.clone(); let endpoint_processer = tokio::spawn(async move { let point = point.clone(); match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { Ok(resp) => { if !resp.status().is_success() { error!("ErrorCode in Response from API. Check configuration"); return Err(Error::msg("Error during sending request")); } if let Ok(text) = resp.text().await { // let metrics = ProcessedEndpoint::from_target_response(&text, &point)?; // dbg!(&metrics); println!("{}", &metrics); // if let Some(conn) = exporter.get_connection_from_pool().await { // TEST: to exporter let res = client.request( RestMethod::from_str("post").await, "http://192.168.2.34:9101/update") .json(&metrics) .send().await; if let Err(er) = res { error!("Cannot send data to exporter due to: {}", er); } else { println!("{:?}", res.unwrap().text().await); } if let Err(er) = Exporter::export_data(conn, &metrics).await { error!("Cannot export data to DB during to: {}", er); return Err(Error::msg("Error during exporting data to DB")); } } else { if !exporter.is_no_connection() { return Err(Error::msg("Error during getting connection from pool")); } } // let mut buffer = buffer.lock().await; // buffer.push(text); } else { error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); return Err(Error::msg("Error with extracting text field from Response")); } }, Err(_) => { error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url); return Err(Error::msg("Endpoint is unreachable")); }, } Ok(()) }); join_handles.push(endpoint_processer); } for i in join_handles { let _ = i.await; } // let buffer = buffer.lock().await; // match &buffer.len() { // 0 => Err(Error::msg("Error due to API grubbing. Check config" )), // _ => { // Ok(()) // }, // } Ok(()) } pub async fn get_delay(&self) -> u32 { self.config.timeout } } #[cfg(test)] mod net_unittests { use super::*; use tokio::test; #[test] async fn check_str_to_rest_method() { assert_eq!(RestMethod::from_str("get").await, Method::GET); assert_eq!(RestMethod::from_str("post").await, Method::POST); assert_eq!(RestMethod::from_str("patch").await, Method::PATCH); assert_eq!(RestMethod::from_str("put").await, Method::PUT); assert_eq!(RestMethod::from_str("delete").await, Method::DELETE); assert_eq!(RestMethod::from_str("invalid_method").await, Method::GET); } #[test] async fn check_api_poll_change_config() { let mut conf1 = ApiConfigV2::default(); let conf2 = ApiConfigV2::pattern(); let mut poll = ApiPoll::new(&mut conf1).await; poll.change_config(conf2).await; assert_eq!(poll.config.timeout, 1) } #[test] async fn check_api_poll_is_default() { let mut conf1 = ApiConfigV2::default(); let poll = ApiPoll::new(&mut conf1).await; assert!(poll.is_default().await) } #[test] async fn check_api_grubbing_mechanism_on_public_one() { use log::{set_max_level, LevelFilter}; set_max_level(LevelFilter::Off); let mut conf1 = ApiConfigV2::pattern(); let conf2 = ApiConfigV2::default(); let exporter = Arc::new(Exporter::init()); let mut poll = ApiPoll::new(&mut conf1).await; assert!(poll.process_polling(exporter.clone()).await.is_ok()); dbg!(&poll.config); poll.change_config(conf2).await; dbg!(&poll.config); assert!(poll.process_polling(exporter.clone()).await.is_err()); } }