diff --git a/crates/api-grub/src/main.rs b/crates/api-grub/src/main.rs index 1db6f64..f7a79bc 100644 --- a/crates/api-grub/src/main.rs +++ b/crates/api-grub/src/main.rs @@ -23,7 +23,7 @@ async fn main() -> Result<()>{ setup_logger().await?; let config = get_config().await; // config update channel - let (tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); // futures // todo : rewrite with spawn // let config_fut = init_config_grub_mechanism(&tx); diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index 61a394a..a6b80ff 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -10,23 +10,24 @@ use tokio::task::JoinHandle; // use tokio::sync::Mutex; use dotenv::dotenv; use crate::json::JsonParser; -use crate::export::Exporter; -use integr_structs::api::v3::Config; +use crate::export::{self, Exporter}; +use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials}; // type BufferType = Arc>>; // for api info pulling -pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver) -> Result<()> { - info!("Initializing API-info grubbing mechanism..."); - info!("Loading vars from .env file if exists..."); +pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver) -> Result<()> { + info!("Initializing API-info grubbing mechanism :"); + info!("1) Loading vars from .env file if exists..."); let _ = dotenv().ok(); let mut config = config; let mut poller = ApiPoll::new(&mut config).await; + info!("2) Api-Poller has initialized"); let client = Exporter::init(); + info!("3) Exporter has initialized"); let shared_pool = Arc::new(client); loop { - let shared_pool = shared_pool.clone(); if poller.is_default().await { sleep(Duration::from_secs(5)).await; } else { @@ -36,8 +37,9 @@ pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver { - config : &'a mut ApiConfigV2, + config : &'a mut Config, client : Client, } impl<'a> ApiPoll<'a> { - pub async fn new(poll_cfg : &'a mut ApiConfigV2) -> Self { + pub async fn new(poll_cfg : &'a mut Config) -> Self { Self { config : poll_cfg, client : Client::new(), } } // can be weak and with bug test needed - pub async fn change_config(&mut self, conf: ApiConfigV2) { + pub async fn change_config(&mut self, conf: Config) { *self.config = conf; } pub async fn is_default(&self) -> bool { - self.config.template.len() == 0 + self.config.is_default().await + } + // pub async fn get_delay(&self) -> u32 { + // self.config.timeout + // } + pub async fn process_endpoint( + client : Arc, + config : Arc, + creds : Credentials, + exporter : Arc + ) -> Result<()> { + + Ok(()) } pub async fn process_polling(&self, exporter: Arc) -> Result<()> { // let buffer: BufferType = Arc::new(Mutex::new(vec![])); - let mut join_handles: Vec>> = vec![]; + // let mut join_handles: Vec>> = vec![]; let client = Arc::new(self.client.clone()); - let template = Arc::new(self.config.template.clone()); + let config = Arc::new(self.config.clone()); + let endpoints: Vec> = ConfigEndpoint::from_config(config.clone()); + let mut join_handles: Vec>> = vec![]; - if self.is_default().await { return Err(Error::msg("Default config with no endpoints")) } - - // TODO: rewrite nextly to async - for point in template.iter() { - let point = Arc::new(point.clone()); - // let buffer = buffer.clone(); + for (idx, _) in config.config.iter().enumerate() { + // let for_creds = endpoints[idx].clone(); + let creds = Credentials::from_config_endpoint(endpoints[idx].clone()); + let endpoint = endpoints[idx].clone(); let client = client.clone(); let exporter = exporter.clone(); - let endpoint_processer = tokio::spawn(async move { - let point = point.clone(); - match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { - Ok(resp) => { - if !resp.status().is_success() { - error!("ErrorCode in Response from API. Check configuration"); - return Err(Error::msg("Error during sending request")); - } - if let Ok(text) = resp.text().await { - // - let metrics = ProcessedEndpoint::from_target_response(&text, &point)?; - // dbg!(&metrics); - println!("{}", &metrics); - // - if let Some(conn) = exporter.get_connection_from_pool().await { - - // TEST: to exporter - let res = client.request( - RestMethod::from_str("post").await, - "http://192.168.2.34:9101/update") - .json(&metrics) - .send().await; - if let Err(er) = res { - error!("Cannot send data to exporter due to: {}", er); - } else { - println!("{:?}", res.unwrap().text().await); - } - - if let Err(er) = Exporter::export_data(conn, &metrics).await { - error!("Cannot export data to DB during to: {}", er); - return Err(Error::msg("Error during exporting data to DB")); - } - } else { - if !exporter.is_no_connection() { - return Err(Error::msg("Error during getting connection from pool")); - } - } - - // let mut buffer = buffer.lock().await; - // buffer.push(text); - } else { - error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); - return Err(Error::msg("Error with extracting text field from Response")); - } - }, - Err(_) => { - error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url); - return Err(Error::msg("Endpoint is unreachable")); - }, - } - Ok(()) + let join_handler = tokio::spawn(async move { + Self::process_endpoint( + client, + endpoint, + creds, + exporter.clone() + ).await }); - join_handles.push(endpoint_processer); + join_handles.push(join_handler); } - for i in join_handles { let _ = i.await; } + // let template = Arc::new(self.config.template.clone()); - // let buffer = buffer.lock().await; - // match &buffer.len() { - // 0 => Err(Error::msg("Error due to API grubbing. Check config" )), - // _ => { + // if self.is_default().await { return Err(Error::msg("Default config with no endpoints")) } + + // // TODO: rewrite nextly to async + // for point in template.iter() { + // let point = Arc::new(point.clone()); + // // let buffer = buffer.clone(); + // let client = client.clone(); + // let exporter = exporter.clone(); + // let endpoint_processer = tokio::spawn(async move { + // let point = point.clone(); + // match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { + // Ok(resp) => { + // if !resp.status().is_success() { + // error!("ErrorCode in Response from API. Check configuration"); + // return Err(Error::msg("Error during sending request")); + // } + // if let Ok(text) = resp.text().await { + // // + // let metrics = ProcessedEndpoint::from_target_response(&text, &point)?; + // // dbg!(&metrics); + // println!("{}", &metrics); + // // + // if let Some(conn) = exporter.get_connection_from_pool().await { + + // // TEST: to exporter + // let res = client.request( + // RestMethod::from_str("post").await, + // "http://192.168.2.34:9101/update") + // .json(&metrics) + // .send().await; + // if let Err(er) = res { + // error!("Cannot send data to exporter due to: {}", er); + // } else { + // println!("{:?}", res.unwrap().text().await); + // } + + // if let Err(er) = Exporter::export_data(conn, &metrics).await { + // error!("Cannot export data to DB during to: {}", er); + // return Err(Error::msg("Error during exporting data to DB")); + // } + // } else { + // if !exporter.is_no_connection() { + // return Err(Error::msg("Error during getting connection from pool")); + // } + // } + + // // let mut buffer = buffer.lock().await; + // // buffer.push(text); + // } else { + // error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); + // return Err(Error::msg("Error with extracting text field from Response")); + // } + // }, + // Err(_) => { + // error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url); + // return Err(Error::msg("Endpoint is unreachable")); + // }, + // } // Ok(()) - // }, + // }); + // join_handles.push(endpoint_processer); // } + + // for i in join_handles { + // let _ = i.await; + // } + + // // let buffer = buffer.lock().await; + // // match &buffer.len() { + // // 0 => Err(Error::msg("Error due to API grubbing. Check config" )), + // // _ => { + // // Ok(()) + // // }, + // // } Ok(()) } - pub async fn get_delay(&self) -> u32 { - self.config.timeout - } } -#[cfg(test)] -mod net_unittests { - use super::*; - use tokio::test; +// #[cfg(test)] +// mod net_unittests { +// use super::*; +// use tokio::test; - #[test] - async fn check_str_to_rest_method() { - assert_eq!(RestMethod::from_str("get").await, Method::GET); - assert_eq!(RestMethod::from_str("post").await, Method::POST); - assert_eq!(RestMethod::from_str("patch").await, Method::PATCH); - assert_eq!(RestMethod::from_str("put").await, Method::PUT); - assert_eq!(RestMethod::from_str("delete").await, Method::DELETE); - assert_eq!(RestMethod::from_str("invalid_method").await, Method::GET); - } - #[test] - async fn check_api_poll_change_config() { - let mut conf1 = ApiConfigV2::default(); - let conf2 = ApiConfigV2::pattern(); - let mut poll = ApiPoll::new(&mut conf1).await; - poll.change_config(conf2).await; - assert_eq!(poll.config.timeout, 1) - } +// #[test] +// async fn check_str_to_rest_method() { +// assert_eq!(RestMethod::from_str("get").await, Method::GET); +// assert_eq!(RestMethod::from_str("post").await, Method::POST); +// assert_eq!(RestMethod::from_str("patch").await, Method::PATCH); +// assert_eq!(RestMethod::from_str("put").await, Method::PUT); +// assert_eq!(RestMethod::from_str("delete").await, Method::DELETE); +// assert_eq!(RestMethod::from_str("invalid_method").await, Method::GET); +// } +// #[test] +// async fn check_api_poll_change_config() { +// let mut conf1 = ApiConfigV2::default(); +// let conf2 = ApiConfigV2::pattern(); +// let mut poll = ApiPoll::new(&mut conf1).await; +// poll.change_config(conf2).await; +// assert_eq!(poll.config.timeout, 1) +// } - #[test] - async fn check_api_poll_is_default() { - let mut conf1 = ApiConfigV2::default(); - let poll = ApiPoll::new(&mut conf1).await; - assert!(poll.is_default().await) - } +// #[test] +// async fn check_api_poll_is_default() { +// let mut conf1 = ApiConfigV2::default(); +// let poll = ApiPoll::new(&mut conf1).await; +// assert!(poll.is_default().await) +// } - #[test] - async fn check_api_grubbing_mechanism_on_public_one() { - use log::{set_max_level, LevelFilter}; +// #[test] +// async fn check_api_grubbing_mechanism_on_public_one() { +// use log::{set_max_level, LevelFilter}; - set_max_level(LevelFilter::Off); - let mut conf1 = ApiConfigV2::pattern(); - let conf2 = ApiConfigV2::default(); - let exporter = Arc::new(Exporter::init()); +// set_max_level(LevelFilter::Off); +// let mut conf1 = ApiConfigV2::pattern(); +// let conf2 = ApiConfigV2::default(); +// let exporter = Arc::new(Exporter::init()); - let mut poll = ApiPoll::new(&mut conf1).await; - assert!(poll.process_polling(exporter.clone()).await.is_ok()); +// let mut poll = ApiPoll::new(&mut conf1).await; +// assert!(poll.process_polling(exporter.clone()).await.is_ok()); - dbg!(&poll.config); - poll.change_config(conf2).await; - dbg!(&poll.config); - assert!(poll.process_polling(exporter.clone()).await.is_err()); - } -} \ No newline at end of file +// dbg!(&poll.config); +// poll.change_config(conf2).await; +// dbg!(&poll.config); +// assert!(poll.process_polling(exporter.clone()).await.is_err()); +// } +// } \ No newline at end of file diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 02ca610..03086e0 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -1,7 +1,9 @@ +use core::sync; use std::collections::HashMap; use serde::{Serialize, Deserialize}; use serde_json::{ to_string_pretty, Value }; use anyhow::Result; +use std::sync::Arc; #[derive(Serialize, Deserialize, Debug)] @@ -150,14 +152,14 @@ pub mod v3 { pub use super::*; // in config - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Clone)] pub struct Metric { pub id : String, #[serde(rename = "type")] pub json_type : String, pub addr : String, } - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Clone)] pub struct Metrics { pub name : String, pub url : String, @@ -165,9 +167,9 @@ pub mod v3 { pub measure : Vec } - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Clone)] pub struct ConfigEndpoint { - ip : String, + id : String, login : String, #[serde(rename = "pass")] password : String, @@ -177,10 +179,27 @@ pub mod v3 { #[serde(default)] metrics : Vec, } + impl ConfigEndpoint { + pub fn from_config(config: Arc) -> Vec> { + let mut result: Vec> = Vec::new(); + config.config + .iter() + .for_each(|el| { + result.push(Arc::new(el.clone())) + }); + result + } + pub fn get_period(&self) -> Option { + self.period.parse().ok() + } + pub fn get_timeout(&self) -> Option { + self.timeout.parse().ok() + } + } - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Clone)] pub struct Config { - config : Vec, + pub config : Vec, } impl Default for Config { @@ -197,25 +216,13 @@ pub mod v3 { } } - pub struct Credentials<'a> { - ip : &'a str, - login : &'a str, - password : &'a str, - api_key : &'a str, - period : &'a str, - timeout : &'a str, + pub struct Credentials { + endpoint : Arc, } - impl<'a> Credentials<'a> { - pub fn from_config_endpoint(endpoint: &'a ConfigEndpoint) -> Credentials<'a> { - Self { - ip : &endpoint.ip, - login : &endpoint.login, - password : &endpoint.password, - api_key : &endpoint.api_key, - period : &endpoint.period, - timeout : &endpoint.timeout, - } + impl Credentials { + pub fn from_config_endpoint(endpoint: Arc) -> Credentials { + Self { endpoint } } } @@ -225,13 +232,15 @@ pub mod v3 { id : String, #[serde(rename = "type")] json_type : String, + addr : String, value : Value, } impl MetricOutput { - pub fn new_with_slices(id : &str, json_type : &str, value : Value) -> Self { + pub fn new_with_slices(id : &str, json_type : &str, addr: &str,value : Value) -> Self { MetricOutput { id : id.to_string(), json_type : json_type.to_string(), + addr : addr.to_string(), value : value, } }