From 7c7db5e510dc78c024c82e06c0cdacb1b15f8096 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 20 May 2025 03:09:59 -0400 Subject: [PATCH 1/2] enode big update --- .env.example | 1 + crates/api-grub/src/jitter.rs | 2 +- crates/api-grub/src/monitoring.rs | 211 +++++++++++++++++++++++++----- crates/integr-structs/Cargo.toml | 1 + crates/integr-structs/src/api.rs | 23 ++-- 5 files changed, 191 insertions(+), 47 deletions(-) diff --git a/.env.example b/.env.example index d90a1df..26da514 100644 --- a/.env.example +++ b/.env.example @@ -20,6 +20,7 @@ STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input" ENODE_MONITORING_IP = "ip.ip.ip.ip" ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required +ENODE_TARGET_DEVICES = "device$18,device$19" # IM configuration for max level of logging info # for example DEBUG, INFO, WARN, ERROR, TRACE diff --git a/crates/api-grub/src/jitter.rs b/crates/api-grub/src/jitter.rs index a41cd1c..ab0ac93 100644 --- a/crates/api-grub/src/jitter.rs +++ b/crates/api-grub/src/jitter.rs @@ -183,7 +183,7 @@ impl Requester { error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er); Value::Null }); - metrics.add(MetricOutputExtended::new_with_slices(&metric_id, "int", "Vinteo native", &desc, val)); + metrics.add(MetricOutputExtended::new_with_slices(&metric_id, "int", "Vinteo native", &desc, None, None,val)); }); }); metrics diff --git a/crates/api-grub/src/monitoring.rs b/crates/api-grub/src/monitoring.rs index d42a392..b0fa6d5 100644 --- a/crates/api-grub/src/monitoring.rs +++ b/crates/api-grub/src/monitoring.rs @@ -1,10 +1,11 @@ use std::env; +use std::fmt::Display; use anyhow::Error; use serde_json::{Map, Value}; use reqwest::Client; use tokio::sync::Semaphore; use std::sync::Arc; -use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size}; +use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size}; use integr_structs::api::enode_monitoring::cmdb::Query; use tokio::task::JoinHandle; use std::pin::Pin; @@ -15,6 +16,116 @@ use std::collections::HashMap; // const IM_CONNECTION_TIMEOUT: String = std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()); +trait AsDeviceRequest { + fn as_devices(self) -> Vec; +} +trait IntoEnodeRequset { + fn into_enode_request(self) -> String; +} +impl AsDeviceRequest for Vec { + fn as_devices(mut self) -> Vec { + self.iter_mut() + .for_each(|dev| *dev = format!("/measures/device${}", dev)); + self + } +} + + +#[derive(Debug)] +struct MetricInstance { + dola_id : String, + name : String, + desc : String, + device : String, + source : String, +} +impl IntoEnodeRequset for &[MetricInstance] { + fn into_enode_request(self) -> String { + let mut vec: Vec = Vec::new(); + vec.push("%5B".to_owned()); + self.iter() + .enumerate() + .for_each(|(id, val)| { + if id > 0 { + vec.push(",".to_owned()); + } + vec.push(format!("%22{}%22", val.dola_id)); + }); + vec.push("%5D".to_owned()); + vec.concat() + } +} +// impl GenericUrl for [(T, T)] +// where T : Display { +// fn display(&self) -> String { +// let mut vec: Vec = Vec::new(); +// vec.push("%5B".to_owned()); +// self.iter() +// .enumerate() +// .for_each(|(id, val)| { +// if id > 0 { +// vec.push(",".to_owned()); +// } +// vec.push(format!("%22{}%22", val.0)); +// }); +// vec.push("%5D".to_owned()); +// vec.concat() +// } +// } + +#[derive(Debug)] +struct MetricMeta { + name : String, + desc : String, + device : String, + source : String, +} + +impl Default for MetricMeta { + fn default() -> Self { + Self { + name : String::new(), + desc : String::new(), + device : String::new(), + source : String::new(), + } + } +} + +pub trait LazyUnzipInstance { + fn lazy_unzip(&self) -> HashMap; +} + +impl LazyUnzipInstance for &[MetricInstance] { + fn lazy_unzip(&self) -> HashMap { + self.iter().map(|obj| (obj.dola_id.to_string(), MetricMeta::new(&obj.name, &obj.desc, &obj.device, &obj.source))).collect() + } +} + +impl MetricInstance { + fn new(id : &str, name : &str, desc : &str, device : &str, source : &str) -> Self { + Self { + dola_id : id.to_owned(), + name : name.to_owned(), + desc : desc.to_owned(), + device : device.to_owned(), + source : source.to_owned(), + } + } +} + +impl MetricMeta { + fn new(name : &str, desc : &str, device : &str, source : &str) -> Self { + Self { + name : name.to_owned(), + desc : desc.to_owned(), + device : device.to_owned(), + source : source.to_owned(), + } + } +} + + /// # Fn `get_metrics_from_monitoring` /// /// A function to init pulling and exporting metrics mechanism @@ -185,32 +296,70 @@ impl MonitoringImporter { /// , where `(String, String)` is a tuple of measure `id` and `description` /// (`name`) #[tracing::instrument(name = "CM get metrics list mechanism", skip_all)] - pub async fn get_metrics_list(&self) -> anyhow::Result> { + pub async fn get_metrics_list(&self) -> anyhow::Result> { tracing::trace!("Trying ti get measures list from CM ..."); let client = Client::new(); - let mut vec: Vec<(String, String)> = Vec::new(); + let mut vec: Vec = Vec::new(); let url = format!("http://{}/e-cmdb/api/query", self.ip); + let id_list = { + match std::env::var("ENODE_TARGET_DEVICES") { + Err(_) => vec![String::from("18"), String::from("19")], + Ok(var) => var.split(',').into_iter().map(|st| st.trim().to_string()).collect::>(), + } + }; + let list_of_devices = id_list.clone().as_devices(); let client = client .post(url) .timeout(tokio::time::Duration::from_secs(self.timeout as u64)) .header("Content-Type", "application/json") .bearer_auth(&self.access_token) - .json(&Query::default()); + .json(&Query::device_oriented(list_of_devices)); + + // dbg!(Query::device_oriented(id_list.as_devices())); let resp = client.send().await?.text().await?; + // println!("{}", &resp); let resp: Value = serde_json::from_str(&resp)?; + + // todo!(); + if let Some(arr) = resp.as_array() { - for measure in arr { - let id = measure.get("id"); - let cls = measure.get("cls"); - let name = measure.get("name"); - if id.is_some() && cls.is_some() { - let id = id.unwrap().as_i64().unwrap_or_default(); - let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); - let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null"); - if cls.is_empty() { - return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)")); + for device in arr { + let device_id = { + match device.get("name") { + Some(name) => { + match serde_json::to_string(name) { + Ok(name) => { + name.split('$').last().unwrap_or_else(|| "undefined-device").to_owned() + }, + Err(_) => "undefined-device".to_string(), + } + }, + None => "undefined-device".to_string(), + } + }; + if let Some(links) = device.get("links") { + if let Some(measures) = links.as_array() { + for measure in measures.iter() { + let dola_id = measure.get("$id"); + let id = measure.get("measure_id"); + let source = measure.get("source_id"); + let desc = measure.get("name"); + dbg!(&dola_id); + dbg!(&id); + dbg!(&source); + dbg!(&desc); + if id.is_some() && source.is_some() && dola_id.is_some() { + let dola_id = dola_id.unwrap().as_str().unwrap_or_else(|| "no-id"); + let id = id.unwrap().as_str().unwrap_or_else(|| "no-name"); + let source = source.unwrap().as_str().unwrap_or_else(|| "no-source"); + let desc = desc.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "no description"); + if source.is_empty() { + return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`measure_id` or `source_id`)")); + } + vec.push(MetricInstance::new(dola_id, id, desc, device_id.as_ref(), source)); + } + } } - vec.push((format!("{}${}", cls, id), name.to_string())); } } } else { @@ -236,8 +385,8 @@ impl MonitoringImporter { /// exprots all data by itselfs /// #[tracing::instrument(name = "CM get measures info mechanism", skip_all)] - pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { - tracing::trace!("Trying ti get info about each measure ..."); + pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { + tracing::trace!("Trying to get info about each measure ..."); let mut sys = sysinfo::System::new(); sys.refresh_cpu_all(); // adaptive permition on task spawm to prevent system overload @@ -254,7 +403,7 @@ impl MonitoringImporter { let arc = arc.clone(); let client = client.clone(); let hm = measure.lazy_unzip(); - let measure = Arc::new(measure.display()); + let measure = Arc::new(measure.into_enode_request()); let _permit = permit.acquire().await.unwrap(); let jh: JoinHandle> = tokio::spawn(async move { @@ -300,7 +449,7 @@ impl MonitoringImporter { measure: Arc, client: Arc, arc: Arc, - hm: &HashMap, + hm: &HashMap, ) -> anyhow::Result { tracing::trace!("Processing CM endpoint with one or more measure names"); let resp = client @@ -332,7 +481,7 @@ impl MonitoringImporter { /// /// 3) if `Value` is `_` -> returns error **Invalid JSON format** /// - fn extract_metric_data(json: Value, hm: &HashMap) -> Pin>> + Send + '_>> { + fn extract_metric_data(json: Value, hm: &HashMap) -> Pin>> + Send + '_>> { Box::pin(async move { return match json { Value::Object(obj) => { @@ -357,23 +506,10 @@ impl MonitoringImporter { /// /// Searches for certain fields and aggregates it in the `MetricOutputExtended` /// object - async fn process_value(obj : &Map, hm: &HashMap) -> anyhow::Result { + async fn process_value(obj : &Map, hm: &HashMap) -> anyhow::Result { tracing::trace!("Processing atomic Object value in CM JSON-response"); let id = obj.get("$id"); let val = obj.get("value"); - let description = { - let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null); - let zero = String::new(); - if dola_ip.is_null() { - zero - } else { - hm.get( - dola_ip.as_str().unwrap_or_else(|| "") - ) - .unwrap_or_else(|| &zero) - .to_owned() - } - }; if id.is_none() || val.is_none() { return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) @@ -384,6 +520,9 @@ impl MonitoringImporter { if id.is_empty() { return Err(Error::msg("Empty `id` field. Invalid JSON response")) } + let default_meta = MetricMeta::default(); + let meta = hm.get(&id).unwrap_or_else(|| &default_meta); + let device = meta.device.parse::().unwrap_or_else(|_| 0); Ok(MetricOutputExtended { id : id.to_owned(), @@ -400,8 +539,10 @@ impl MonitoringImporter { _ => "unknown".to_owned(), }, addr : "enode.monitoring.api".to_owned(), - desc : description, + desc : meta.desc.clone(), value : val.clone(), + device: Some(device), + source: Some(meta.source.clone()), status: 0, }) } diff --git a/crates/integr-structs/Cargo.toml b/crates/integr-structs/Cargo.toml index ea15533..1c42efc 100644 --- a/crates/integr-structs/Cargo.toml +++ b/crates/integr-structs/Cargo.toml @@ -12,5 +12,6 @@ publish = ["kellnr"] [dependencies] anyhow = "1.0.95" chrono = "0.4.40" +dotenv = "0.15.0" serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.135" diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 349b84b..493529c 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -272,15 +272,19 @@ pub mod v3 { #[serde(rename = "description")] pub desc : String, pub status: usize, + pub device: Option, + pub source: Option, } impl MetricOutputExtended { - pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, value : Value) -> Self { + pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, device: Option, source: Option, value : Value) -> Self { MetricOutputExtended { id : id.to_string(), json_type : json_type.to_string(), addr : addr.to_string(), value : value, desc : desc.to_string(), + device, + source, status: 0, } } @@ -317,14 +321,12 @@ pub mod v3 { #[derive(Serialize, Deserialize, Debug)] pub struct PrometheusMetricsExtended { pub service_name: String, - pub endpoint_name: String, pub metrics: Vec, } impl PrometheusMetricsExtended { pub fn new_empty_jitter() -> Self { Self { service_name : "zvks".to_owned(), - endpoint_name : "jitter".to_owned(), metrics : Vec::new(), } } @@ -334,7 +336,6 @@ pub mod v3 { pub async fn new_zvks(metrics: Vec) -> Self { Self { service_name : "zvks".to_owned(), - endpoint_name : "apiforsnmp".to_owned(), metrics : metrics, } } @@ -377,18 +378,18 @@ pub mod enode_monitoring { pub struct Query { id : Vec, data : Data, - #[serde(rename = "postQuery")] - post_query : String, + // #[serde(rename = "postQuery")] + // post_query : String, #[serde(rename = "enableActions")] enable_actions : bool, ts : usize } - impl Default for Query { - fn default() -> Self { + impl Query { + pub fn device_oriented(devices: Vec) -> Self { Self { - id : vec!["/measures/device$18".to_owned(), "/measures/device$19".to_owned()], + id : devices, data : Data::default(), - post_query : "links".to_owned(), + // post_query : "links".to_owned(), enable_actions : false, ts : 1740060679399 } @@ -416,7 +417,7 @@ pub mod enode_monitoring { fn default() -> Self { Self { flatten : true, - filter : Filter::default() + filter : Filter::default(), } } } From 05b173408eda45a393aae21f30267fbf94f9ca32 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 20 May 2025 03:10:11 -0400 Subject: [PATCH 2/2] -comments --- crates/api-grub/src/monitoring.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/api-grub/src/monitoring.rs b/crates/api-grub/src/monitoring.rs index b0fa6d5..e055b43 100644 --- a/crates/api-grub/src/monitoring.rs +++ b/crates/api-grub/src/monitoring.rs @@ -315,13 +315,9 @@ impl MonitoringImporter { .bearer_auth(&self.access_token) .json(&Query::device_oriented(list_of_devices)); - // dbg!(Query::device_oriented(id_list.as_devices())); let resp = client.send().await?.text().await?; - // println!("{}", &resp); let resp: Value = serde_json::from_str(&resp)?; - // todo!(); - if let Some(arr) = resp.as_array() { for device in arr { let device_id = {