// module to handle unix-socket connection + pulling info from api use anyhow::Result; use tracing::{error, info}; use rand::random; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; use reqwest::Client; use std::hash::{Hash, Hasher}; use std::sync::Arc; use tokio::task::JoinHandle; use dotenv::dotenv; use crate::json::JsonParser; use crate::export::Exporter; use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics}; // for api info pulling pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver) -> Result<()> { info!("Initializing API-info grubbing mechanism :"); info!("1) Loading vars from .env file if exists..."); let _ = dotenv().ok(); let mut config = config; let mut poller = ApiPoll::new(&mut config).await; info!("2) Api-Poller has initialized"); let client = Exporter::init(); info!("3) Exporter has initialized"); let shared_pool = Arc::new(client); loop { if poller.is_default().await { sleep(Duration::from_secs(5)).await; } else { if rx.len() > 0 { if let Some(conf) = rx.recv().await { poller.change_config(conf).await; info!("Config changed"); } } let shared_pool = shared_pool.clone(); info!("Data from API: {:?}", poller.process_polling(shared_pool).await); } } } struct ApiPoll<'a> { config : &'a mut Config, #[allow(unused)] client : Client, } impl<'a> ApiPoll<'a> { pub async fn new(poll_cfg : &'a mut Config) -> Self { Self { config : poll_cfg, client : Client::new(), } } pub async fn change_config(&mut self, conf: Config) { *self.config = conf; } pub async fn is_default(&self) -> bool { self.config.is_default().await } pub async fn process_metrics( service_id: Arc, metrics: Arc, creds: Credentials, ) -> Result<()> { // processing metrics use std::hash::DefaultHasher; let rand = random::(); let mut hash = DefaultHasher::new(); rand.hash(&mut hash); let client = Client::builder() .user_agent(format!("api-grabber-{}", hash.finish())); let mut req = client.build().unwrap().get(&metrics.url); let login = &creds.endpoint.login; let password = &creds.endpoint.password; let api_key = &creds.endpoint.api_key; if !login.is_empty() && !password.is_empty() { req = req.basic_auth(login, Some(password)); } if !api_key.is_empty() { req = req.header("accept", "application/json"); req = req.header("x-api-key", api_key); } match req.send().await { Ok(resp) => { if let Ok(response) = resp.text().await { match serde_json::to_value(&response) { Err(er) => { error!("Bad JSON in response. Error: {}", er); }, Ok(_) => { let endpoint_name = &metrics.name; let preproc = JsonParser::parse(&metrics.measure, &response); let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); match Exporter::export_metrics(preproc).await { Ok(bytes) => { info!("Successfully exported {} bytes of metrics data to Prometheus", bytes); }, Err(er) => { error!("Failed to export data to Prometheus due to {}", er); }, } }, } } else { error!("Bad response from {}. No data", &metrics.url); } }, Err(er) => { error!("Cannot API data from {} due to : {}", &metrics.url, er); }, } Ok(()) } pub async fn process_endpoint( config : Arc, creds : Credentials, ) -> Result<()> { // TODO: HAVE TO BE USED let _period = config.get_period().unwrap_or(0); let timeout = config.get_timeout().unwrap_or(5); let metrics = Arc::new(config.metrics.clone()); let service_id = Arc::new(config.id.clone()); loop { let creds = creds.clone(); let metrics = metrics.clone(); let service_id = service_id.clone(); let mut jh = Vec::>>::new(); for idx in 0..metrics.len() { let creds = creds.clone(); let metrics = metrics.clone(); let service_id = service_id.clone(); let event = tokio::spawn(async move { Self::process_metrics( service_id.clone(), metrics[idx].clone().into(), creds.clone(), ).await }); jh.push(event); } info!("Initializing another {} subjob(s) for `{}` service", jh.len(), &service_id ); for i in jh { let _ = i.await; } // processing sleep(Duration::from_secs(timeout)).await } } pub async fn process_polling(&self, exporter: Arc) -> Result<()> { let config = Arc::new(self.config.clone()); let endpoints: Vec> = ConfigEndpoint::from_config(config.clone()); let mut join_handles: Vec>> = vec![]; for (idx, _) in config.config.iter().enumerate() { let creds = Credentials::from_config_endpoint(endpoints[idx].clone()); let endpoint = endpoints[idx].clone(); // TODO: USE EXPORTER #[allow(unused)] let exporter = exporter.clone(); let join_handler = tokio::spawn(async move { Self::process_endpoint( endpoint, creds, ).await }); join_handles.push(join_handler); } info!("Initializing {} task(s) for current config", join_handles.len()); for i in join_handles { let _ = i.await; } Ok(()) } } // TODO: FIX TESTS // #[cfg(test)] // mod net_unittests { // use super::*; // use tokio::test; // #[test] // async fn check_str_to_rest_method() { // assert_eq!(RestMethod::from_str("get").await, Method::GET); // assert_eq!(RestMethod::from_str("post").await, Method::POST); // assert_eq!(RestMethod::from_str("patch").await, Method::PATCH); // assert_eq!(RestMethod::from_str("put").await, Method::PUT); // assert_eq!(RestMethod::from_str("delete").await, Method::DELETE); // assert_eq!(RestMethod::from_str("invalid_method").await, Method::GET); // } // #[test] // async fn check_api_poll_change_config() { // let mut conf1 = ApiConfigV2::default(); // let conf2 = ApiConfigV2::pattern(); // let mut poll = ApiPoll::new(&mut conf1).await; // poll.change_config(conf2).await; // assert_eq!(poll.config.timeout, 1) // } // #[test] // async fn check_api_poll_is_default() { // let mut conf1 = ApiConfigV2::default(); // let poll = ApiPoll::new(&mut conf1).await; // assert!(poll.is_default().await) // } // #[test] // async fn check_api_grubbing_mechanism_on_public_one() { // use log::{set_max_level, LevelFilter}; // set_max_level(LevelFilter::Off); // let mut conf1 = ApiConfigV2::pattern(); // let conf2 = ApiConfigV2::default(); // let exporter = Arc::new(Exporter::init()); // let mut poll = ApiPoll::new(&mut conf1).await; // assert!(poll.process_polling(exporter.clone()).await.is_ok()); // dbg!(&poll.config); // poll.change_config(conf2).await; // dbg!(&poll.config); // assert!(poll.process_polling(exporter.clone()).await.is_err()); // } // }