migration: monitoring module

pull/6/head
prplV 2025-02-27 13:00:23 +03:00
parent 2adb706a69
commit 6297ea2a50
1 changed files with 243 additions and 0 deletions

View File

@ -0,0 +1,243 @@
use std::env;
use anyhow::Error;
use serde_json::{Map, Value};
use reqwest::Client;
use tokio::sync::Semaphore;
use std::sync::Arc;
// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl};
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size};
// use crate::structs::cmdb::Query;
use integr_structs::api::enode_monitoring::cmdb::Query;
use tokio::task::JoinHandle;
// use crate::structs::get_chunk_size;
use std::pin::Pin;
use std::future::Future;
use integr_structs::api::v3::{MetricOutput, PrometheusMetrics};
// use chrono::{Local, DateTime};
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
let timer = tokio::time::Instant::now();
loop {
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
break;
}
let mut a = MonitoringImporter::new().await;
a.start_session().await?;
let vec = a.get_metrics_list().await?;
let _ = a.get_measure_info(Arc::new(vec)).await;
a.close_session().await?;
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct MonitoringImporter {
ip : String,
login : String,
password : String,
access_token : String,
ts : String,
}
impl MonitoringImporter {
pub async fn new() -> Self {
MonitoringImporter {
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
access_token : String::new(),
ts : String::new(),
}
}
async fn is_valid(&self) -> bool {
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
}
async fn set_ts(&mut self, ts: &str) {
self.ts = ts.to_owned();
}
pub async fn start_session(&mut self) -> anyhow::Result<()> {
if !self.is_valid().await {
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
}
let client = Client::new();
let url = format!("http://{}/e-data-front/auth/login", self.ip);
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
// dbg!(&fortoken);
let client = client
.post(url)
.header("Content-Type", "application/json")
.json(&fortoken);
let resp = client.send().await?;
let auth = resp.json::<AuthResponse>().await?;
// dbg!(&auth);
self.set_ts(&fortoken.ts).await;
self.access_token = auth.access_token.to_owned();
Ok(())
}
pub async fn close_session(&mut self) -> anyhow::Result<()> {
let client = Client::new();
let url = format!("http://{}/e-data-front/auth/logout", self.ip);
let client = client
.post(url)
.header("Content-Type", "application/json")
.header("access-token", &self.access_token);
let _ = client.send().await?;
self.access_token.clear();
Ok(())
}
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<String>> {
let client = Client::new();
let mut vec: Vec<String> = Vec::new();
let url = format!("http://{}/e-cmdb/api/query", self.ip);
let client = client
.post(url)
.header("Content-Type", "application/json")
.header("access-token", &self.access_token)
.json(&Query::default());
let resp = client.send().await?.text().await?;
// dbg!(&resp.text().await);
let resp: Value = serde_json::from_str(&resp)?;
if let Some(arr) = resp.as_array() {
for measure in arr {
let id = measure.get("id");
let cls = measure.get("cls");
if id.is_some() && cls.is_some() {
// todo: later wait for Vaitaliy call of classification
let id = id.unwrap().as_i64().unwrap_or_default();
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
if cls.is_empty() {
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
}
// let measure_name = format!("{}${}", cls, id);
vec.push(format!("{}${}", cls, id));
}
}
// dbg!(vec);
} else {
return Err(Error::msg("Invalid JSON in response"));
}
Ok(vec)
}
pub async fn get_measure_info(&self, measures: Arc<Vec<String>>) -> anyhow::Result<()> {
let mut sys = sysinfo::System::new();
sys.refresh_cpu_all();
// adaptive permition on task spawm to prevent system overload
let sem = Arc::new(Semaphore::new(sys.cpus().len()));
let mut jh_vec = Vec::new();
let client = Arc::new(Client::new());
let measures = measures.clone();
let arc = Arc::new(self.clone());
// dbg!(&measures.display());
// dbg!(&measures.len());
for measure in measures.chunks(get_chunk_size(measures.len())) {
let permit = sem.clone();
let arc = arc.clone();
let client = client.clone();
let measure = Arc::new(measure.display());
let _permit = permit.acquire().await.unwrap();
let jh: JoinHandle<anyhow::Result<PrometheusMetrics>> = tokio::spawn(async move {
Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await
});
jh_vec.push(jh);
}
let mut vals = Vec::new();
for event in jh_vec {
match event.await {
Ok(val) => {
if let Ok(val) = val {
vals.push(val);
}
},
Err(er) => println!("Fatal error on async task: {}", er),
}
}
// dbg!(&vals);
// dbg!(&vals.len());
Ok(())
}
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>) -> anyhow::Result<PrometheusMetrics> {
let resp = client
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
.header("Content-Type", "application/json")
.header("access-token", &arc.access_token)
.send().await?
.text().await?;
tokio::task::yield_now().await;
let resp: Value = serde_json::from_str(&resp)?;
// let a = Self::extract_metric_data(resp);
Ok(
PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await
)
}
fn extract_metric_data(json: Value) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutput>>> + Send>> {
Box::pin(async move {
return match json {
Value::Object(obj) => {
// let resp: Value = serde_json::from_str(&obj)?;
return Ok(vec![Self::process_value(&obj).await?])
},
Value::Array(arr) => {
let mut vec = Vec::new();
for obj in arr {
if let Ok(mut val) = Self::extract_metric_data(obj).await {
// vec.push(val);
vec.append(&mut val);
}
}
return Ok(vec)
},
_ => Err(Error::msg("Invalid JSON format")),
}
})
}
async fn process_value(obj : &Map<String, Value>) -> anyhow::Result<MetricOutput> {
let id = obj.get("id");
let val = obj.get("value");
if id.is_none() || val.is_none() {
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
}
let id = id.unwrap().as_str().unwrap_or_else(|| "");
let val = val.unwrap();
if id.is_empty() {
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
}
// pub struct MetricOutput {
// pub id : String,
// #[serde(rename = "type")]
// json_type : String,
// addr : String,
// pub value : Value,
// }
Ok(MetricOutput {
id : id.to_owned(),
json_type : match val {
Value::Number(val) => {
if val.is_i64() {
"i64".to_owned()
} else if val.is_u64() {
"u64".to_owned()
} else {
"f64".to_owned()
}
},
_ => "unknown".to_owned(),
},
addr : "enode.monitoring.api".to_owned(),
value : val.clone()
})
}
}