diff --git a/crates/api-grub/src/monitoring.rs b/crates/api-grub/src/monitoring.rs new file mode 100644 index 0000000..b77c141 --- /dev/null +++ b/crates/api-grub/src/monitoring.rs @@ -0,0 +1,243 @@ +use std::env; +use anyhow::Error; +use serde_json::{Map, Value}; +use reqwest::Client; +use tokio::sync::Semaphore; +use std::sync::Arc; +// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl}; +use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size}; +// use crate::structs::cmdb::Query; +use integr_structs::api::enode_monitoring::cmdb::Query; +use tokio::task::JoinHandle; +// use crate::structs::get_chunk_size; +use std::pin::Pin; +use std::future::Future; +use integr_structs::api::v3::{MetricOutput, PrometheusMetrics}; +// use chrono::{Local, DateTime}; + +pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { + let timer = tokio::time::Instant::now(); + loop { + if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) { + break; + } + let mut a = MonitoringImporter::new().await; + a.start_session().await?; + let vec = a.get_metrics_list().await?; + let _ = a.get_measure_info(Arc::new(vec)).await; + a.close_session().await?; + tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await + } + Ok(()) +} + +#[derive(Debug, Clone)] +pub struct MonitoringImporter { + ip : String, + login : String, + password : String, + access_token : String, + ts : String, +} + +impl MonitoringImporter { + pub async fn new() -> Self { + MonitoringImporter { + ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()), + login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()), + password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()), + access_token : String::new(), + ts : String::new(), + } + } + async fn is_valid(&self) -> bool { + !self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty() + } + async fn set_ts(&mut self, ts: &str) { + self.ts = ts.to_owned(); + } + pub async fn start_session(&mut self) -> anyhow::Result<()> { + if !self.is_valid().await { + return Err(Error::msg("Invalid eNODE-Monitoring configuration")); + } + let client = Client::new(); + let url = format!("http://{}/e-data-front/auth/login", self.ip); + let fortoken = ForTokenCredentials::new(&self.login, &self.password); + // dbg!(&fortoken); + let client = client + .post(url) + .header("Content-Type", "application/json") + .json(&fortoken); + let resp = client.send().await?; + let auth = resp.json::().await?; + // dbg!(&auth); + self.set_ts(&fortoken.ts).await; + + self.access_token = auth.access_token.to_owned(); + + Ok(()) + } + pub async fn close_session(&mut self) -> anyhow::Result<()> { + let client = Client::new(); + let url = format!("http://{}/e-data-front/auth/logout", self.ip); + let client = client + .post(url) + .header("Content-Type", "application/json") + .header("access-token", &self.access_token); + + let _ = client.send().await?; + + self.access_token.clear(); + Ok(()) + } + pub async fn get_metrics_list(&self) -> anyhow::Result> { + let client = Client::new(); + let mut vec: Vec = Vec::new(); + let url = format!("http://{}/e-cmdb/api/query", self.ip); + let client = client + .post(url) + .header("Content-Type", "application/json") + .header("access-token", &self.access_token) + .json(&Query::default()); + let resp = client.send().await?.text().await?; + // dbg!(&resp.text().await); + let resp: Value = serde_json::from_str(&resp)?; + if let Some(arr) = resp.as_array() { + for measure in arr { + let id = measure.get("id"); + let cls = measure.get("cls"); + if id.is_some() && cls.is_some() { + // todo: later wait for Vaitaliy call of classification + let id = id.unwrap().as_i64().unwrap_or_default(); + let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); + if cls.is_empty() { + return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)")); + } + // let measure_name = format!("{}${}", cls, id); + vec.push(format!("{}${}", cls, id)); + } + } + // dbg!(vec); + } else { + return Err(Error::msg("Invalid JSON in response")); + } + Ok(vec) + } + pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { + let mut sys = sysinfo::System::new(); + sys.refresh_cpu_all(); + // adaptive permition on task spawm to prevent system overload + let sem = Arc::new(Semaphore::new(sys.cpus().len())); + let mut jh_vec = Vec::new(); + let client = Arc::new(Client::new()); + let measures = measures.clone(); + let arc = Arc::new(self.clone()); + // dbg!(&measures.display()); + + // dbg!(&measures.len()); + for measure in measures.chunks(get_chunk_size(measures.len())) { + let permit = sem.clone(); + let arc = arc.clone(); + let client = client.clone(); + let measure = Arc::new(measure.display()); + + let _permit = permit.acquire().await.unwrap(); + + let jh: JoinHandle> = tokio::spawn(async move { + Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await + + }); + jh_vec.push(jh); + } + let mut vals = Vec::new(); + for event in jh_vec { + match event.await { + Ok(val) => { + if let Ok(val) = val { + vals.push(val); + } + }, + Err(er) => println!("Fatal error on async task: {}", er), + } + } + // dbg!(&vals); + // dbg!(&vals.len()); + Ok(()) + } + async fn process_endpoint(measure: Arc, client: Arc, arc: Arc) -> anyhow::Result { + let resp = client + .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) + .header("Content-Type", "application/json") + .header("access-token", &arc.access_token) + .send().await? + .text().await?; + tokio::task::yield_now().await; + + let resp: Value = serde_json::from_str(&resp)?; + // let a = Self::extract_metric_data(resp); + + Ok( + PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await + ) + } + fn extract_metric_data(json: Value) -> Pin>> + Send>> { + Box::pin(async move { + return match json { + Value::Object(obj) => { + // let resp: Value = serde_json::from_str(&obj)?; + return Ok(vec![Self::process_value(&obj).await?]) + }, + Value::Array(arr) => { + let mut vec = Vec::new(); + for obj in arr { + if let Ok(mut val) = Self::extract_metric_data(obj).await { + // vec.push(val); + vec.append(&mut val); + } + } + return Ok(vec) + }, + _ => Err(Error::msg("Invalid JSON format")), + } + }) + } + async fn process_value(obj : &Map) -> anyhow::Result { + let id = obj.get("id"); + let val = obj.get("value"); + + if id.is_none() || val.is_none() { + return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) + } + let id = id.unwrap().as_str().unwrap_or_else(|| ""); + let val = val.unwrap(); + + if id.is_empty() { + return Err(Error::msg("Empty `id` field. Invalid JSON response")) + } + // pub struct MetricOutput { + // pub id : String, + // #[serde(rename = "type")] + // json_type : String, + // addr : String, + // pub value : Value, + // } + + Ok(MetricOutput { + id : id.to_owned(), + json_type : match val { + Value::Number(val) => { + if val.is_i64() { + "i64".to_owned() + } else if val.is_u64() { + "u64".to_owned() + } else { + "f64".to_owned() + } + }, + _ => "unknown".to_owned(), + }, + addr : "enode.monitoring.api".to_owned(), + value : val.clone() + }) + } +} \ No newline at end of file