diff --git a/crates/api-grub/src/monitoring.rs b/crates/api-grub/src/monitoring.rs index d637746..e763003 100644 --- a/crates/api-grub/src/monitoring.rs +++ b/crates/api-grub/src/monitoring.rs @@ -5,15 +5,16 @@ use reqwest::Client; use tokio::sync::Semaphore; use std::sync::Arc; // use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl}; -use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size}; +use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size}; // use crate::structs::cmdb::Query; use integr_structs::api::enode_monitoring::cmdb::Query; use tokio::task::JoinHandle; // use crate::structs::get_chunk_size; use std::pin::Pin; use std::future::Future; -use integr_structs::api::v3::{MetricOutput, PrometheusMetrics}; +use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended}; use log::{error, info, warn}; +use std::collections::HashMap; // use chrono::{Local, DateTime}; pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { @@ -21,17 +22,16 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho 'outer: loop { let mut a = MonitoringImporter::new().await; a.start_session().await?; + let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![])); + 'inner: loop { if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) { break 'outer; } - let vec = a.get_metrics_list().await.unwrap_or_else(|_| vec![]); - if vec.is_empty() { + if let Err(_) = a.get_measure_info(vec.clone()).await { warn!("Session dropped, creating new ..."); break 'inner; } - let _ = a.get_measure_info(Arc::new(vec)).await; - // a.close_session().await?; tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await } } @@ -84,22 +84,23 @@ impl MonitoringImporter { Ok(()) } - pub async fn close_session(&mut self) -> anyhow::Result<()> { - let client = Client::new(); - let url = format!("http://{}/e-data-front/auth/logout", self.ip); - let client = client - .post(url) - .header("Content-Type", "application/json") - .header("access-token", &self.access_token); + // [ DEPRECATED ] + // pub async fn close_session(&mut self) -> anyhow::Result<()> { + // let client = Client::new(); + // let url = format!("http://{}/e-data-front/auth/logout", self.ip); + // let client = client + // .post(url) + // .header("Content-Type", "application/json") + // .header("access-token", &self.access_token); - let _ = client.send().await?; + // let _ = client.send().await?; - self.access_token.clear(); - Ok(()) - } - pub async fn get_metrics_list(&self) -> anyhow::Result> { + // self.access_token.clear(); + // Ok(()) + // } + pub async fn get_metrics_list(&self) -> anyhow::Result> { let client = Client::new(); - let mut vec: Vec = Vec::new(); + let mut vec: Vec<(String, String)> = Vec::new(); let url = format!("http://{}/e-cmdb/api/query", self.ip); let client = client .post(url) @@ -113,24 +114,27 @@ impl MonitoringImporter { for measure in arr { let id = measure.get("id"); let cls = measure.get("cls"); + let name = measure.get("name"); if id.is_some() && cls.is_some() { // todo: later wait for Vaitaliy call of classification let id = id.unwrap().as_i64().unwrap_or_default(); let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); + let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null"); if cls.is_empty() { return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)")); } // let measure_name = format!("{}${}", cls, id); - vec.push(format!("{}${}", cls, id)); + vec.push((format!("{}${}", cls, id), name.to_string())); } } // dbg!(vec); } else { return Err(Error::msg("Invalid JSON in response")); } + info!("List of measures was pulled, total - {}", &vec.len()); Ok(vec) } - pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { + pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { let mut sys = sysinfo::System::new(); sys.refresh_cpu_all(); // adaptive permition on task spawm to prevent system overload @@ -142,27 +146,32 @@ impl MonitoringImporter { // dbg!(&measures.display()); // dbg!(&measures.len()); - for measure in measures.chunks(get_chunk_size(measures.len())) { + let chunk_size = get_chunk_size(measures.len()); + info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size); + + for measure in measures.chunks(chunk_size) { let permit = sem.clone(); let arc = arc.clone(); let client = client.clone(); + let hm = measure.lazy_unzip(); let measure = Arc::new(measure.display()); - let _permit = permit.acquire().await.unwrap(); - let jh: JoinHandle> = tokio::spawn(async move { - Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await + let jh: JoinHandle> = tokio::spawn(async move { + Self::process_endpoint(measure.clone(), client.clone(), arc.clone(), &hm).await }); jh_vec.push(jh); } // let mut vals = Vec::new(); + let mut counter = 0; for event in jh_vec { match event.await { Ok(val) => { if let Ok(val) = val { - match crate::export::Exporter::export_metrics(val).await { - Ok(bytes) => info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes), + counter+=1; + match crate::export::Exporter::export_extended_metrics(val).await { + Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)}, Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er), } // vals.push(val); @@ -171,11 +180,12 @@ impl MonitoringImporter { Err(er) => println!("Fatal error on async task: {}", er), } } + info!("Total preproccessed metrics - {}", counter); // dbg!(&vals); // dbg!(&vals.len()); Ok(()) } - async fn process_endpoint(measure: Arc, client: Arc, arc: Arc) -> anyhow::Result { + async fn process_endpoint(measure: Arc, client: Arc, arc: Arc, hm: &HashMap) -> anyhow::Result { let resp = client .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) .header("Content-Type", "application/json") @@ -186,69 +196,88 @@ impl MonitoringImporter { let resp: Value = serde_json::from_str(&resp)?; // let a = Self::extract_metric_data(resp); - + // dbg!(&resp); Ok( - PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await + PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await ) } - fn extract_metric_data(json: Value) -> Pin>> + Send>> { + fn extract_metric_data(json: Value, hm: &HashMap) -> Pin>> + Send + '_>> { + // dbg!(hm); Box::pin(async move { + dbg!(&json); return match json { Value::Object(obj) => { // let resp: Value = serde_json::from_str(&obj)?; - return Ok(vec![Self::process_value(&obj).await?]) + // let desc = + return Ok(vec![Self::process_value(&obj, hm).await?]) }, Value::Array(arr) => { let mut vec = Vec::new(); for obj in arr { - if let Ok(mut val) = Self::extract_metric_data(obj).await { + if let Ok(mut val) = Self::extract_metric_data(obj, hm).await { // vec.push(val); vec.append(&mut val); } } + // dbg!(&vec); + // dbg!(vec.len()); + // todo!(); return Ok(vec) }, _ => Err(Error::msg("Invalid JSON format")), } }) } - async fn process_value(obj : &Map) -> anyhow::Result { - let id = obj.get("id"); - let val = obj.get("value"); - - if id.is_none() || val.is_none() { - return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) + async fn process_value(obj : &Map, hm: &HashMap) -> anyhow::Result { + // dbg!(obj); + // dbg!(hm); + // todo!(); + let id = obj.get("$id"); + let val = obj.get("value"); + let description = { + let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null); + let zero = String::new(); + if dola_ip.is_null() { + zero + } else { + hm.get( + dola_ip.as_str().unwrap_or_else(|| "") + ) + .unwrap_or_else(|| &zero) + .to_owned() } - let id = id.unwrap().as_str().unwrap_or_else(|| ""); - let val = val.unwrap(); + // dbg!(dola_ip) + }; - if id.is_empty() { - return Err(Error::msg("Empty `id` field. Invalid JSON response")) - } - // pub struct MetricOutput { - // pub id : String, - // #[serde(rename = "type")] - // json_type : String, - // addr : String, - // pub value : Value, - // } + if id.is_none() || val.is_none() { + return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) + } + let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_"); + let val = val.unwrap(); - Ok(MetricOutput { - id : id.to_owned(), - json_type : match val { - Value::Number(val) => { - if val.is_i64() { - "i64".to_owned() - } else if val.is_u64() { - "u64".to_owned() - } else { - "f64".to_owned() - } - }, - _ => "unknown".to_owned(), - }, - addr : "enode.monitoring.api".to_owned(), - value : val.clone() - }) + if id.is_empty() { + return Err(Error::msg("Empty `id` field. Invalid JSON response")) + } + // dbg!(obj); + // todo!(); + + Ok(MetricOutputExtended { + id : id.to_owned(), + json_type : match val { + Value::Number(val) => { + if val.is_i64() { + "i64".to_owned() + } else if val.is_u64() { + "u64".to_owned() + } else { + "f64".to_owned() + } + }, + _ => "unknown".to_owned(), + }, + addr : "enode.monitoring.api".to_owned(), + desc : description, + value : val.clone() + }) } } \ No newline at end of file