use std::env; use anyhow::Error; use serde_json::{Map, Value}; use reqwest::Client; use tokio::sync::Semaphore; use std::sync::Arc; use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, get_chunk_size}; use integr_structs::api::enode_monitoring::cmdb::Query; use tokio::task::JoinHandle; use std::pin::Pin; use std::future::Future; use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended}; use tracing::{error, info, warn}; use std::collections::HashMap; trait AsDeviceRequest { fn as_devices(self) -> Vec; } trait IntoEnodeRequset { fn into_enode_request(self) -> String; } impl AsDeviceRequest for Vec { fn as_devices(mut self) -> Vec { self.iter_mut() .for_each(|dev| *dev = format!("/measures/device${}", dev)); self } } #[derive(Debug)] struct MetricInstance { dola_id : String, name : String, desc : String, device : String, source : String, } impl IntoEnodeRequset for &[MetricInstance] { fn into_enode_request(self) -> String { let mut vec: Vec = Vec::new(); vec.push("%5B".to_owned()); self.iter() .enumerate() .for_each(|(id, val)| { if id > 0 { vec.push(",".to_owned()); } vec.push(format!("%22{}%22", val.dola_id)); }); vec.push("%5D".to_owned()); vec.concat() } } #[derive(Debug)] struct MetricMeta { name : String, desc : String, device : String, source : String, } impl Default for MetricMeta { fn default() -> Self { Self { name : String::new(), desc : String::new(), device : String::new(), source : String::new(), } } } #[allow(private_interfaces)] pub trait LazyUnzipInstance { fn lazy_unzip(&self) -> HashMap; } impl LazyUnzipInstance for &[MetricInstance] { fn lazy_unzip(&self) -> HashMap { self.iter().map(|obj| (obj.dola_id.to_string(), MetricMeta::new(&obj.name, &obj.desc, &obj.device, &obj.source))).collect() } } impl MetricInstance { fn new(id : &str, name : &str, desc : &str, device : &str, source : &str) -> Self { Self { dola_id : id.to_owned(), name : name.to_owned(), desc : desc.to_owned(), device : device.to_owned(), source : source.to_owned(), } } } impl MetricMeta { fn new(name : &str, desc : &str, device : &str, source : &str) -> Self { Self { name : name.to_owned(), desc : desc.to_owned(), device : device.to_owned(), source : source.to_owned(), } } } /// # Fn `get_metrics_from_monitoring` /// /// A function to init pulling and exporting metrics mechanism /// from CM to Exporter. It spawns async tasks to get measures /// and their values and then extract needed info to export /// /// ### Dev-Info : /// /// *input* : duration and delay as `usize` (in secs) /// /// *output* : `anyhow::Result<()>` /// /// *initiator* : fn `main` /// /// *managing* : runtime of N async tasks (N - count of chunks) /// /// # Example /// /// ``` /// use api-grub::monitoring::get_metrics_from_monitoring; /// /// // exec func without time restriction but with delay in 5 secs /// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(())); /// ``` /// #[tracing::instrument(name = "cm_fn_initiator", skip_all)] pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { let timer = tokio::time::Instant::now(); let mut a = MonitoringImporter::new().await; 'outer: loop { // let mut a = MonitoringImporter::new().await; a.start_session().await?; tracing::debug!("CM creds struct - {:#?}", a); let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![])); tracing::debug!("Measures Vec - {:#?}", vec); 'inner: loop { if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) { break 'outer; } if vec.len() == 0 || a.get_measure_info(vec.clone()).await.is_err() { warn!("Session dropped, creating new ..."); break 'inner; } tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await } } Ok(()) } /// An entity which handle CM creds /// /// Used to capture measures and there values, to preprocess all measures to /// relevant Exporter's structure /// /// # Example: /// /// ``` /// use api-grub::monitoring::MonitoringImporter; /// /// let mut a = MonitoringImporter::new().await; /// a.start_session().await?; /// let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![])); /// /// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(())); /// ``` /// #[derive(Clone)] pub struct MonitoringImporter { ip : String, login : String, password : String, access_token : String, ts : String, timeout : usize, } impl MonitoringImporter { /// The most simple constructor for `MonitoringImporter` /// /// Returns `Self` object that is constructing according to /// env vars: /// - `ENODE_MONITORING_IP` /// - `ENODE_MONITORING_LOGIN` /// - `ENODE_MONITORING_PASSWORD` /// /// If env vars will not be set, it returns `Self` with /// empty fields /// pub async fn new() -> Self { MonitoringImporter { ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()), login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()), password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()), access_token : String::new(), ts : String::new(), timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10) } } /// Function that checks is current `MonitoringImporter` valid /// and can be used to pull and push info to and from CM /// async fn is_valid(&self) -> bool { !self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty() } /// A setter of `timestamp` /// /// This function is needed to set a `timestamp` after /// CM session creation. /// /// This `timestamp` is a date of creation a session /// on the CM Server async fn set_ts(&mut self, ts: &str) { self.ts = ts.to_owned(); } /// A function for creation CM session /// /// Returns OK(()) if session was created and there were /// no errors (neither internal no external) /// /// *Also* it saves ts and access-key in it's runtime environment, /// there's no way to get access-key of session #[tracing::instrument(name = "cm_fn_session_start", skip_all)] pub async fn start_session(&mut self) -> anyhow::Result<()> { if !self.is_valid().await { return Err(Error::msg("Invalid eNODE-Monitoring configuration")); } let client = Client::new(); let url = format!("http://{}/e-data-front/auth/login", self.ip); let fortoken = ForTokenCredentials::new(&self.login, &self.password); let mut delay = 1; loop { let client = client .post(&url) .timeout(tokio::time::Duration::from_secs(self.timeout as u64)) .header("Content-Type", "application/json") .json(&fortoken); // let resp = client.send().await?; if let Ok(resp) = client.send().await { // let auth = resp.json::().await?; match resp.json::().await { Ok(auth) => { self.set_ts(&fortoken.ts).await; self.access_token = auth.access_token.to_owned(); tracing::trace!("Access key was changed"); break; }, Err(er) => error!("Error with extracting access-key from CM response due to {}", er), } } error!("Error while trying to create a new session, waiting {} secs and retrying ...", delay); tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await; delay = delay * 2; } info!("Started a new CM session"); Ok(()) } /// A function for pulling measures list /// /// Used with actual credentials for current CM session /// and returning measures in format of `Ok(Vec<(String, String)>)` /// , where `(String, String)` is a tuple of measure `id` and `description` /// (`name`) #[tracing::instrument(name = "CM get metrics list mechanism", skip_all)] pub async fn get_metrics_list(&self) -> anyhow::Result> { tracing::trace!("Trying ti get measures list from CM ..."); let client = Client::new(); let mut vec: Vec = Vec::new(); let url = format!("http://{}/e-cmdb/api/query", self.ip); let id_list = { match std::env::var("ENODE_TARGET_DEVICES") { Err(_) => vec![String::from("18"), String::from("19")], Ok(var) => var.split(',').into_iter().map(|st| st.trim().to_string()).collect::>(), } }; let list_of_devices = id_list.clone().as_devices(); let client = client .post(url) .timeout(tokio::time::Duration::from_secs(self.timeout as u64)) .header("Content-Type", "application/json") .bearer_auth(&self.access_token) .json(&Query::device_oriented(list_of_devices)); let resp = client.send().await?.text().await?; let resp: Value = serde_json::from_str(&resp)?; if let Some(arr) = resp.as_array() { for device in arr { let device_id = { match device.get("name") { Some(name) => { match serde_json::to_string(name) { Ok(name) => { name.split('$').last().unwrap_or_else(|| "undefined-device").to_owned() }, Err(_) => "undefined-device".to_string(), } }, None => "undefined-device".to_string(), } }; let device_id = device_id.trim_end_matches('"'); if let Some(links) = device.get("links") { if let Some(measures) = links.as_array() { for measure in measures.iter() { let dola_id = measure.get("id"); let id = measure.get("measure_id"); let source = measure.get("source_id"); let desc = measure.get("name"); if id.is_some() && source.is_some() && dola_id.is_some() { let dola_id = format!("measure${}", dola_id.unwrap().as_i64().unwrap_or_else(|| 0)); let id = id.unwrap().as_str().unwrap_or_else(|| "no-name"); let source = source.unwrap().as_str().unwrap_or_else(|| "no-source"); let desc = desc.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "no description"); if source.is_empty() { return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`measure_id` or `source_id`)")); } vec.push(MetricInstance::new(&dola_id, id, desc, device_id.as_ref(), source)); } } } } } } else { return Err(Error::msg("Invalid JSON in response")); } info!("List of measures was pulled, total - {}", &vec.len()); Ok(vec) } /// A function to get realtime data /// /// It pulles info about 1 measure or a slice of measures and /// exports all data to Prometehus exporter /// /// # How it works /// 1) creates a restriction for max count of async /// tasks (`tokio::sync::Semaphore`) /// /// 2) divides vec of measures in case of creating chunks with /// the most optimal sizes to optimize self and server load /// /// 3) spawns async tasks-grabbers to get measures info which /// exprots all data by itselfs /// #[tracing::instrument(name = "CM get measures info mechanism", skip_all)] pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { tracing::trace!("Trying to get info about each measure ..."); let mut sys = sysinfo::System::new(); sys.refresh_cpu_all(); // adaptive permition on task spawm to prevent system overload let sem = Arc::new(Semaphore::new(sys.cpus().len())); let mut jh_vec = Vec::new(); let client = Arc::new(Client::new()); let measures = measures.clone(); let arc = Arc::new(self.clone()); let chunk_size = get_chunk_size(measures.len()); info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size); for measure in measures.chunks(chunk_size) { let permit = sem.clone(); let arc = arc.clone(); let client = client.clone(); let hm = measure.lazy_unzip(); let measure = Arc::new(measure.into_enode_request()); let _permit = permit.acquire().await.unwrap(); let jh: JoinHandle> = tokio::spawn(async move { Self::process_endpoint( measure.clone(), client.clone(), arc.clone(), &hm, ).await }); jh_vec.push(jh); } for event in jh_vec { match event.await { Ok(val) => { match crate::export::Exporter::export_extended_metrics(val?).await { Ok(bytes) => {info!("Successfully transmitted {} bytes", bytes)}, Err(er) => error!("Cannot export data due to : `{}`", er), } }, Err(er) => { println!("Fatal error on async task: {}", er); return Err(anyhow::Error::msg(format!("Fatal error on async task: {}", er))) }, } } Ok(()) } /// An async task-grabber /// /// Used to create request to the CM server and /// get all measure(s) data /// /// # Also /// An argument `measure: Arc` can be a single measure like `measure$1` or /// a slice of measures in special format `%5B%22measure$1%22,%20%22measure$2%22%5D`. /// This is a neccesary measure to handle two types of requests and URL restrictions /// async fn process_endpoint( measure: Arc, client: Arc, arc: Arc, hm: &HashMap, ) -> anyhow::Result { tracing::trace!("Processing CM endpoint with one or more measure names"); let resp = client .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) .timeout(tokio::time::Duration::from_secs(arc.timeout as u64)) .header("Content-Type", "application/json") .bearer_auth(&arc.access_token) .send().await? .text().await?; tokio::task::yield_now().await; let resp: Value = serde_json::from_str(&resp)?; Ok( PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await ) } /// An recursive extractor of data /// /// Uses target-json CM-Server response as `Value` and HashMap of /// measures' `id`s and their appropriate `description`s /// /// # How it works /// 1) if `Value` is an `Object` -> executes `Self::process_value` on it and /// returns result of the function as `Vec` /// /// 2) if `Value` is an `Array` -> self-executes for each pat of the array /// and aggregates all data in the `Vec` by using `.append(&mut Vec<...>)` /// /// 3) if `Value` is `_` -> returns error **Invalid JSON format** /// fn extract_metric_data(json: Value, hm: &HashMap) -> Pin>> + Send + '_>> { Box::pin(async move { return match json { Value::Object(obj) => { return Ok(vec![Self::process_value(&obj, hm).await?]) }, Value::Array(arr) => { let mut vec = Vec::new(); for obj in arr { if let Ok(mut val) = Self::extract_metric_data(obj, hm).await { // vec.push(val); vec.append(&mut val); } } return Ok(vec) }, _ => Err(Error::msg("Invalid JSON format")), } }) } /// A function-extractor for single measure object /// /// Searches for certain fields and aggregates it in the `MetricOutputExtended` /// object async fn process_value(obj : &Map, hm: &HashMap) -> anyhow::Result { tracing::trace!("Processing atomic Object value in CM JSON-response"); let id = obj.get("$id"); let val = obj.get("value"); if id.is_none() || val.is_none() { return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) } let id = id.unwrap().as_str().unwrap_or_else(|| ""); let default_meta = MetricMeta::default(); let meta = hm.get(id).unwrap_or_else(|| &default_meta); let id = id.replace("$", "_"); let val = val.unwrap(); let device = meta.device.parse::().unwrap_or_else(|_| 0); if id.is_empty() { return Err(Error::msg("Empty `id` field. Invalid JSON response")) } Ok(MetricOutputExtended::new_with_slices( id.as_ref(), &meta.name, { match val { Value::Number(val) => { if val.is_i64() { "i64" } else if val.is_u64() { "u64" } else { "f64" } }, _ => "unknown", } }, "enode.monitoring.api", &meta.desc, Some(device), Some(meta.source.clone()), val.clone(), )) } } impl std::fmt::Debug for MonitoringImporter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MonitoringImporter") .field("ip", &self.ip) .field("login", &self.login) .field("password", &"****") .field("access_key", &"HIDDEN") .field("ts", &self.ts) .finish() } }