changed to work with tagged with description data

feature/1117
prplV 2025-03-04 14:58:34 +03:00
parent 3b5976b09c
commit 2ca105e67f
1 changed files with 97 additions and 68 deletions

View File

@ -5,15 +5,16 @@ use reqwest::Client;
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use std::sync::Arc; use std::sync::Arc;
// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl}; // use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl};
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size}; use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size};
// use crate::structs::cmdb::Query; // use crate::structs::cmdb::Query;
use integr_structs::api::enode_monitoring::cmdb::Query; use integr_structs::api::enode_monitoring::cmdb::Query;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
// use crate::structs::get_chunk_size; // use crate::structs::get_chunk_size;
use std::pin::Pin; use std::pin::Pin;
use std::future::Future; use std::future::Future;
use integr_structs::api::v3::{MetricOutput, PrometheusMetrics}; use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
use log::{error, info, warn}; use log::{error, info, warn};
use std::collections::HashMap;
// use chrono::{Local, DateTime}; // use chrono::{Local, DateTime};
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
@ -21,17 +22,16 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
'outer: loop { 'outer: loop {
let mut a = MonitoringImporter::new().await; let mut a = MonitoringImporter::new().await;
a.start_session().await?; a.start_session().await?;
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
'inner: loop { 'inner: loop {
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) { if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
break 'outer; break 'outer;
} }
let vec = a.get_metrics_list().await.unwrap_or_else(|_| vec![]); if let Err(_) = a.get_measure_info(vec.clone()).await {
if vec.is_empty() {
warn!("Session dropped, creating new ..."); warn!("Session dropped, creating new ...");
break 'inner; break 'inner;
} }
let _ = a.get_measure_info(Arc::new(vec)).await;
// a.close_session().await?;
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
} }
} }
@ -84,22 +84,23 @@ impl MonitoringImporter {
Ok(()) Ok(())
} }
pub async fn close_session(&mut self) -> anyhow::Result<()> { // [ DEPRECATED ]
let client = Client::new(); // pub async fn close_session(&mut self) -> anyhow::Result<()> {
let url = format!("http://{}/e-data-front/auth/logout", self.ip); // let client = Client::new();
let client = client // let url = format!("http://{}/e-data-front/auth/logout", self.ip);
.post(url) // let client = client
.header("Content-Type", "application/json") // .post(url)
.header("access-token", &self.access_token); // .header("Content-Type", "application/json")
// .header("access-token", &self.access_token);
let _ = client.send().await?; // let _ = client.send().await?;
self.access_token.clear(); // self.access_token.clear();
Ok(()) // Ok(())
} // }
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<String>> { pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
let client = Client::new(); let client = Client::new();
let mut vec: Vec<String> = Vec::new(); let mut vec: Vec<(String, String)> = Vec::new();
let url = format!("http://{}/e-cmdb/api/query", self.ip); let url = format!("http://{}/e-cmdb/api/query", self.ip);
let client = client let client = client
.post(url) .post(url)
@ -113,24 +114,27 @@ impl MonitoringImporter {
for measure in arr { for measure in arr {
let id = measure.get("id"); let id = measure.get("id");
let cls = measure.get("cls"); let cls = measure.get("cls");
let name = measure.get("name");
if id.is_some() && cls.is_some() { if id.is_some() && cls.is_some() {
// todo: later wait for Vaitaliy call of classification // todo: later wait for Vaitaliy call of classification
let id = id.unwrap().as_i64().unwrap_or_default(); let id = id.unwrap().as_i64().unwrap_or_default();
let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
if cls.is_empty() { if cls.is_empty() {
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)")); return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
} }
// let measure_name = format!("{}${}", cls, id); // let measure_name = format!("{}${}", cls, id);
vec.push(format!("{}${}", cls, id)); vec.push((format!("{}${}", cls, id), name.to_string()));
} }
} }
// dbg!(vec); // dbg!(vec);
} else { } else {
return Err(Error::msg("Invalid JSON in response")); return Err(Error::msg("Invalid JSON in response"));
} }
info!("List of measures was pulled, total - {}", &vec.len());
Ok(vec) Ok(vec)
} }
pub async fn get_measure_info(&self, measures: Arc<Vec<String>>) -> anyhow::Result<()> { pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
let mut sys = sysinfo::System::new(); let mut sys = sysinfo::System::new();
sys.refresh_cpu_all(); sys.refresh_cpu_all();
// adaptive permition on task spawm to prevent system overload // adaptive permition on task spawm to prevent system overload
@ -142,27 +146,32 @@ impl MonitoringImporter {
// dbg!(&measures.display()); // dbg!(&measures.display());
// dbg!(&measures.len()); // dbg!(&measures.len());
for measure in measures.chunks(get_chunk_size(measures.len())) { let chunk_size = get_chunk_size(measures.len());
info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size);
for measure in measures.chunks(chunk_size) {
let permit = sem.clone(); let permit = sem.clone();
let arc = arc.clone(); let arc = arc.clone();
let client = client.clone(); let client = client.clone();
let hm = measure.lazy_unzip();
let measure = Arc::new(measure.display()); let measure = Arc::new(measure.display());
let _permit = permit.acquire().await.unwrap(); let _permit = permit.acquire().await.unwrap();
let jh: JoinHandle<anyhow::Result<PrometheusMetrics>> = tokio::spawn(async move { let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await Self::process_endpoint(measure.clone(), client.clone(), arc.clone(), &hm).await
}); });
jh_vec.push(jh); jh_vec.push(jh);
} }
// let mut vals = Vec::new(); // let mut vals = Vec::new();
let mut counter = 0;
for event in jh_vec { for event in jh_vec {
match event.await { match event.await {
Ok(val) => { Ok(val) => {
if let Ok(val) = val { if let Ok(val) = val {
match crate::export::Exporter::export_metrics(val).await { counter+=1;
Ok(bytes) => info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes), match crate::export::Exporter::export_extended_metrics(val).await {
Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)},
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er), Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
} }
// vals.push(val); // vals.push(val);
@ -171,11 +180,12 @@ impl MonitoringImporter {
Err(er) => println!("Fatal error on async task: {}", er), Err(er) => println!("Fatal error on async task: {}", er),
} }
} }
info!("Total preproccessed metrics - {}", counter);
// dbg!(&vals); // dbg!(&vals);
// dbg!(&vals.len()); // dbg!(&vals.len());
Ok(()) Ok(())
} }
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>) -> anyhow::Result<PrometheusMetrics> { async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
let resp = client let resp = client
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
@ -186,54 +196,72 @@ impl MonitoringImporter {
let resp: Value = serde_json::from_str(&resp)?; let resp: Value = serde_json::from_str(&resp)?;
// let a = Self::extract_metric_data(resp); // let a = Self::extract_metric_data(resp);
// dbg!(&resp);
Ok( Ok(
PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
) )
} }
fn extract_metric_data(json: Value) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutput>>> + Send>> { fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
// dbg!(hm);
Box::pin(async move { Box::pin(async move {
dbg!(&json);
return match json { return match json {
Value::Object(obj) => { Value::Object(obj) => {
// let resp: Value = serde_json::from_str(&obj)?; // let resp: Value = serde_json::from_str(&obj)?;
return Ok(vec![Self::process_value(&obj).await?]) // let desc =
return Ok(vec![Self::process_value(&obj, hm).await?])
}, },
Value::Array(arr) => { Value::Array(arr) => {
let mut vec = Vec::new(); let mut vec = Vec::new();
for obj in arr { for obj in arr {
if let Ok(mut val) = Self::extract_metric_data(obj).await { if let Ok(mut val) = Self::extract_metric_data(obj, hm).await {
// vec.push(val); // vec.push(val);
vec.append(&mut val); vec.append(&mut val);
} }
} }
// dbg!(&vec);
// dbg!(vec.len());
// todo!();
return Ok(vec) return Ok(vec)
}, },
_ => Err(Error::msg("Invalid JSON format")), _ => Err(Error::msg("Invalid JSON format")),
} }
}) })
} }
async fn process_value(obj : &Map<String, Value>) -> anyhow::Result<MetricOutput> { async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
let id = obj.get("id"); // dbg!(obj);
// dbg!(hm);
// todo!();
let id = obj.get("$id");
let val = obj.get("value"); let val = obj.get("value");
let description = {
let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null);
let zero = String::new();
if dola_ip.is_null() {
zero
} else {
hm.get(
dola_ip.as_str().unwrap_or_else(|| "")
)
.unwrap_or_else(|| &zero)
.to_owned()
}
// dbg!(dola_ip)
};
if id.is_none() || val.is_none() { if id.is_none() || val.is_none() {
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
} }
let id = id.unwrap().as_str().unwrap_or_else(|| ""); let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_");
let val = val.unwrap(); let val = val.unwrap();
if id.is_empty() { if id.is_empty() {
return Err(Error::msg("Empty `id` field. Invalid JSON response")) return Err(Error::msg("Empty `id` field. Invalid JSON response"))
} }
// pub struct MetricOutput { // dbg!(obj);
// pub id : String, // todo!();
// #[serde(rename = "type")]
// json_type : String,
// addr : String,
// pub value : Value,
// }
Ok(MetricOutput { Ok(MetricOutputExtended {
id : id.to_owned(), id : id.to_owned(),
json_type : match val { json_type : match val {
Value::Number(val) => { Value::Number(val) => {
@ -248,6 +276,7 @@ impl MonitoringImporter {
_ => "unknown".to_owned(), _ => "unknown".to_owned(),
}, },
addr : "enode.monitoring.api".to_owned(), addr : "enode.monitoring.api".to_owned(),
desc : description,
value : val.clone() value : val.clone()
}) })
} }