diff --git a/config_api.json b/config_api.json index d216deb..f88bdfa 100644 --- a/config_api.json +++ b/config_api.json @@ -6,7 +6,7 @@ "pass" : "", "api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711", "period" : "", - "timeout" : "3", + "timeout" : "10", "metrics" : [ { "name": "conferences", diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index a6b80ff..33f3457 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -2,16 +2,17 @@ use anyhow::{Error, Result}; use integr_structs::api::{ApiConfigV2, ProcessedEndpoint}; use log::{error, info}; +use serde_json::Value; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; -use reqwest::{Client, Method}; +use reqwest::{Client, Method, RequestBuilder}; use std::sync::Arc; use tokio::task::JoinHandle; // use tokio::sync::Mutex; use dotenv::dotenv; use crate::json::JsonParser; use crate::export::{self, Exporter}; -use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials}; +use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics}; // type BufferType = Arc>>; @@ -85,19 +86,127 @@ impl<'a> ApiPoll<'a> { // pub async fn get_delay(&self) -> u32 { // self.config.timeout // } + pub async fn process_metrics( + service_id: Arc, + metrics: Arc, + creds: Credentials, + // exporter: Arc + ) -> Result<()> { + // processing metrics + let mut req = Client::new() + .get(&metrics.url); + let login = creds.endpoint.login.clone(); + let password = creds.endpoint.password.clone(); + let api_key = creds.endpoint.api_key.clone(); + if !login.is_empty() && !password.is_empty() { + dbg!("kjgbkasgksjd"); + req = req.basic_auth(login, Some(password)); + } + if !api_key.is_empty() { + req = req.bearer_auth(&api_key); + // req = req.header("authorization", "bearer "); + + req = req.header("accept", "application/json"); + req = req.header("x-api-key", &api_key); + + // req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"]) + } + dbg!(&req); + // let (client, res) = req.build_split(); + // let res = res.unwrap(); + // res.url_mut().is_special() + + + + // dbg!(client); + // dbg!(res); + // todo!(); + + match req.send().await { + Ok(resp) => { + dbg!(&resp); + if let Ok(response) = resp.text().await { + match serde_json::from_str::(&response) { + Err(er) => {error!("Bad JSON in response. Error: {}", er);}, + Ok(_) => { + let endpoint_name = &metrics.name; + let preproc = JsonParser::parse(&metrics.measure, &response); + let metrics: String = serde_json::from_value(preproc.clone()) + .unwrap_or({ + error!("Cannot parse grabbed metrics data to String"); + String::from(r#""value" : null"#) + }); + dbg!(&metrics); + match Exporter::export_metrics(&metrics).await { + Ok(_) => { + info!("Successfully imported metrics data to Prometheus"); + }, + Err(er) => { + error!("Failed to export data to Prometheus due to {}", er); + }, + } + }, + } + } else { + error!("Bad response from {}. No data", &metrics.url); + } + }, + Err(er) => { + error!("Cannot API data from {} due to : {}", &metrics.url, er); + }, + } + Ok(()) + } pub async fn process_endpoint( - client : Arc, + // client : Arc, config : Arc, creds : Credentials, - exporter : Arc + // exporter : Arc ) -> Result<()> { + // + let period = config.get_period().unwrap_or(0); + let timeout = config.get_timeout().unwrap_or(5); + let metrics = Arc::new(config.metrics.clone()); + let service_id = Arc::new(config.id.clone()); + loop { + // let exporter = exporter.clone(); + let creds = creds.clone(); + let metrics = metrics.clone(); + let service_id = service_id.clone(); + let mut jh = Vec::>>::new(); + + for idx in 0..metrics.len() { + // let exporter = exporter.clone(); + let creds = creds.clone(); + let metrics = metrics.clone(); + let service_id = service_id.clone(); + let event = tokio::spawn(async move { + Self::process_metrics( + service_id.clone(), + metrics[idx].clone().into(), + creds.clone(), + // exporter.clone() + ).await + }); + jh.push(event); + } + info!("Initializing another {} subjob(s) for `{}` service", + jh.len(), + &service_id + ); + for i in jh { + let _ = i.await; + } + // processing + sleep(Duration::from_secs(timeout)).await + } Ok(()) } pub async fn process_polling(&self, exporter: Arc) -> Result<()> { // let buffer: BufferType = Arc::new(Mutex::new(vec![])); // let mut join_handles: Vec>> = vec![]; - let client = Arc::new(self.client.clone()); + // let client = Arc::new(self.client.clone()); let config = Arc::new(self.config.clone()); let endpoints: Vec> = ConfigEndpoint::from_config(config.clone()); let mut join_handles: Vec>> = vec![]; @@ -106,18 +215,19 @@ impl<'a> ApiPoll<'a> { // let for_creds = endpoints[idx].clone(); let creds = Credentials::from_config_endpoint(endpoints[idx].clone()); let endpoint = endpoints[idx].clone(); - let client = client.clone(); + // let client = client.clone(); let exporter = exporter.clone(); let join_handler = tokio::spawn(async move { Self::process_endpoint( - client, + // client, endpoint, creds, - exporter.clone() + // exporter.clone() ).await }); join_handles.push(join_handler); } + info!("Initializing {} task(s) for current config", join_handles.len()); for i in join_handles { let _ = i.await; } diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 03086e0..024a410 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -152,14 +152,14 @@ pub mod v3 { pub use super::*; // in config - #[derive(Serialize, Deserialize, Clone)] + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Metric { pub id : String, #[serde(rename = "type")] pub json_type : String, pub addr : String, } - #[derive(Serialize, Deserialize, Clone)] + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Metrics { pub name : String, pub url : String, @@ -167,17 +167,17 @@ pub mod v3 { pub measure : Vec } - #[derive(Serialize, Deserialize, Clone)] + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct ConfigEndpoint { - id : String, - login : String, + pub id : String, + pub login : String, #[serde(rename = "pass")] - password : String, - api_key : String, + pub password : String, + pub api_key : String, period : String, timeout : String, #[serde(default)] - metrics : Vec, + pub metrics : Vec, } impl ConfigEndpoint { pub fn from_config(config: Arc) -> Vec> { @@ -192,12 +192,12 @@ pub mod v3 { pub fn get_period(&self) -> Option { self.period.parse().ok() } - pub fn get_timeout(&self) -> Option { + pub fn get_timeout(&self) -> Option { self.timeout.parse().ok() } } - #[derive(Serialize, Deserialize, Clone)] + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Config { pub config : Vec, } @@ -216,18 +216,24 @@ pub mod v3 { } } + #[derive(Clone)] pub struct Credentials { - endpoint : Arc, + pub endpoint : Arc, } impl Credentials { pub fn from_config_endpoint(endpoint: Arc) -> Credentials { Self { endpoint } } + // pub fn clone(self) -> Self { + // Self { + // endpoint : self.endpoint.clone() + // } + // } } // to prometheus and nmns - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Debug)] pub struct MetricOutput { id : String, #[serde(rename = "type")] @@ -245,4 +251,11 @@ pub mod v3 { } } } + + #[derive(Serialize, Deserialize, Debug)] + pub struct PrometheusMetrics { + service_name: String, + endpoint_name: String, + metrics: Value, + } } \ No newline at end of file