423 lines
16 KiB
Rust
423 lines
16 KiB
Rust
use std::env;
|
|
use anyhow::Error;
|
|
use serde_json::{Map, Value};
|
|
use reqwest::Client;
|
|
use tokio::sync::Semaphore;
|
|
use std::sync::Arc;
|
|
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size};
|
|
use integr_structs::api::enode_monitoring::cmdb::Query;
|
|
use tokio::task::JoinHandle;
|
|
use std::pin::Pin;
|
|
use std::future::Future;
|
|
use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
|
|
use tracing::{error, info, warn};
|
|
use std::collections::HashMap;
|
|
|
|
// const IM_CONNECTION_TIMEOUT: String = std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string());
|
|
|
|
/// # Fn `get_metrics_from_monitoring`
|
|
///
|
|
/// A function to init pulling and exporting metrics mechanism
|
|
/// from CM to Exporter. It spawns async tasks to get measures
|
|
/// and their values and then extract needed info to export
|
|
///
|
|
/// ### Dev-Info :
|
|
///
|
|
/// *input* : duration and delay as `usize` (in secs)
|
|
///
|
|
/// *output* : `anyhow::Result<()>`
|
|
///
|
|
/// *initiator* : fn `main`
|
|
///
|
|
/// *managing* : runtime of N async tasks (N - count of chunks)
|
|
///
|
|
/// # Example
|
|
///
|
|
/// ```
|
|
/// use api-grub::monitoring::get_metrics_from_monitoring;
|
|
///
|
|
/// // exec func without time restriction but with delay in 5 secs
|
|
/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(()));
|
|
/// ```
|
|
///
|
|
#[tracing::instrument(name = "cm_fn_initiator", skip_all)]
|
|
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
|
|
|
let timer = tokio::time::Instant::now();
|
|
let mut a = MonitoringImporter::new().await;
|
|
'outer: loop {
|
|
// let mut a = MonitoringImporter::new().await;
|
|
a.start_session().await?;
|
|
tracing::debug!("CM creds struct - {:#?}", a);
|
|
|
|
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
|
|
tracing::debug!("Measures Vec - {:#?}", vec);
|
|
|
|
'inner: loop {
|
|
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
|
|
break 'outer;
|
|
}
|
|
if vec.len() == 0 || a.get_measure_info(vec.clone()).await.is_err() {
|
|
warn!("Session dropped, creating new ...");
|
|
break 'inner;
|
|
}
|
|
// if let Err(_) = a.get_measure_info(vec.clone()).await {
|
|
// warn!("Session dropped, creating new ...");
|
|
// break 'inner;
|
|
// }
|
|
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// An entity which handle CM creds
|
|
///
|
|
/// Used to capture measures and there values, to preprocess all measures to
|
|
/// relevant Exporter's structure
|
|
///
|
|
/// # Example:
|
|
///
|
|
/// ```
|
|
/// use api-grub::monitoring::MonitoringImporter;
|
|
///
|
|
/// let mut a = MonitoringImporter::new().await;
|
|
/// a.start_session().await?;
|
|
/// let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
|
|
///
|
|
/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(()));
|
|
/// ```
|
|
///
|
|
#[derive(Clone)]
|
|
pub struct MonitoringImporter {
|
|
ip : String,
|
|
login : String,
|
|
password : String,
|
|
access_token : String,
|
|
ts : String,
|
|
timeout : usize,
|
|
}
|
|
|
|
impl MonitoringImporter {
|
|
/// The most simple constructor for `MonitoringImporter`
|
|
///
|
|
/// Returns `Self` object that is constructing according to
|
|
/// env vars:
|
|
/// - `ENODE_MONITORING_IP`
|
|
/// - `ENODE_MONITORING_LOGIN`
|
|
/// - `ENODE_MONITORING_PASSWORD`
|
|
///
|
|
/// If env vars will not be set, it returns `Self` with
|
|
/// empty fields
|
|
///
|
|
pub async fn new() -> Self {
|
|
MonitoringImporter {
|
|
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
|
|
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
|
|
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
|
|
access_token : String::new(),
|
|
ts : String::new(),
|
|
timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10)
|
|
}
|
|
}
|
|
/// Function that checks is current `MonitoringImporter` valid
|
|
/// and can be used to pull and push info to and from CM
|
|
///
|
|
async fn is_valid(&self) -> bool {
|
|
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
|
}
|
|
/// A setter of `timestamp`
|
|
///
|
|
/// This function is needed to set a `timestamp` after
|
|
/// CM session creation.
|
|
///
|
|
/// This `timestamp` is a date of creation a session
|
|
/// on the CM Server
|
|
async fn set_ts(&mut self, ts: &str) {
|
|
self.ts = ts.to_owned();
|
|
}
|
|
/// A function for creation CM session
|
|
///
|
|
/// Returns OK(()) if session was created and there were
|
|
/// no errors (neither internal no external)
|
|
///
|
|
/// *Also* it saves ts and access-key in it's runtime environment,
|
|
/// there's no way to get access-key of session
|
|
#[tracing::instrument(name = "cm_fn_session_start", skip_all)]
|
|
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
|
if !self.is_valid().await {
|
|
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
|
}
|
|
let client = Client::new();
|
|
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
|
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
|
let mut delay = 1;
|
|
|
|
loop {
|
|
let client = client
|
|
.post(&url)
|
|
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
|
|
.header("Content-Type", "application/json")
|
|
.json(&fortoken);
|
|
// let resp = client.send().await?;
|
|
if let Ok(resp) = client.send().await {
|
|
// let auth = resp.json::<AuthResponse>().await?;
|
|
|
|
match resp.json::<AuthResponse>().await {
|
|
Ok(auth) => {
|
|
self.set_ts(&fortoken.ts).await;
|
|
self.access_token = auth.access_token.to_owned();
|
|
tracing::trace!("Access key was changed");
|
|
break;
|
|
},
|
|
Err(er) => error!("Error with extracting access-key from CM response due to {}", er),
|
|
}
|
|
}
|
|
error!("Error while trying to create a new session, waiting {} secs and retrying ...", delay);
|
|
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
|
|
delay = delay * 2;
|
|
}
|
|
info!("Started a new CM session");
|
|
Ok(())
|
|
}
|
|
|
|
/// A function for pulling measures list
|
|
///
|
|
/// Used with actual credentials for current CM session
|
|
/// and returning measures in format of `Ok(Vec<(String, String)>)`
|
|
/// , where `(String, String)` is a tuple of measure `id` and `description`
|
|
/// (`name`)
|
|
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
|
|
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
|
tracing::trace!("Trying ti get measures list from CM ...");
|
|
let client = Client::new();
|
|
let mut vec: Vec<(String, String)> = Vec::new();
|
|
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
|
let client = client
|
|
.post(url)
|
|
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
|
|
.header("Content-Type", "application/json")
|
|
.header("access-token", &self.access_token)
|
|
.json(&Query::default());
|
|
let resp = client.send().await?.text().await?;
|
|
let resp: Value = serde_json::from_str(&resp)?;
|
|
if let Some(arr) = resp.as_array() {
|
|
for measure in arr {
|
|
let id = measure.get("id");
|
|
let cls = measure.get("cls");
|
|
let name = measure.get("name");
|
|
if id.is_some() && cls.is_some() {
|
|
let id = id.unwrap().as_i64().unwrap_or_default();
|
|
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
|
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
|
|
if cls.is_empty() {
|
|
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
|
}
|
|
vec.push((format!("{}${}", cls, id), name.to_string()));
|
|
}
|
|
}
|
|
} else {
|
|
return Err(Error::msg("Invalid JSON in response"));
|
|
}
|
|
info!("List of measures was pulled, total - {}", &vec.len());
|
|
Ok(vec)
|
|
}
|
|
|
|
/// A function to get realtime data
|
|
///
|
|
/// It pulles info about 1 measure or a slice of measures and
|
|
/// exports all data to Prometehus exporter
|
|
///
|
|
/// # How it works
|
|
/// 1) creates a restriction for max count of async
|
|
/// tasks (`tokio::sync::Semaphore`)
|
|
///
|
|
/// 2) divides vec of measures in case of creating chunks with
|
|
/// the most optimal sizes to optimize self and server load
|
|
///
|
|
/// 3) spawns async tasks-grabbers to get measures info which
|
|
/// exprots all data by itselfs
|
|
///
|
|
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
|
|
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
|
|
tracing::trace!("Trying ti get info about each measure ...");
|
|
let mut sys = sysinfo::System::new();
|
|
sys.refresh_cpu_all();
|
|
// adaptive permition on task spawm to prevent system overload
|
|
let sem = Arc::new(Semaphore::new(sys.cpus().len()));
|
|
let mut jh_vec = Vec::new();
|
|
let client = Arc::new(Client::new());
|
|
let measures = measures.clone();
|
|
let arc = Arc::new(self.clone());
|
|
let chunk_size = get_chunk_size(measures.len());
|
|
info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size);
|
|
|
|
for measure in measures.chunks(chunk_size) {
|
|
let permit = sem.clone();
|
|
let arc = arc.clone();
|
|
let client = client.clone();
|
|
let hm = measure.lazy_unzip();
|
|
let measure = Arc::new(measure.display());
|
|
let _permit = permit.acquire().await.unwrap();
|
|
|
|
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
|
|
Self::process_endpoint(
|
|
measure.clone(),
|
|
client.clone(),
|
|
arc.clone(),
|
|
&hm,
|
|
).await
|
|
|
|
});
|
|
jh_vec.push(jh);
|
|
}
|
|
|
|
for event in jh_vec {
|
|
match event.await {
|
|
Ok(val) => {
|
|
match crate::export::Exporter::export_extended_metrics(val?).await {
|
|
Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)},
|
|
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
|
|
}
|
|
},
|
|
Err(er) => {
|
|
println!("Fatal error on async task: {}", er);
|
|
return Err(anyhow::Error::msg(format!("Fatal error on async task: {}", er)))
|
|
},
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// An async task-grabber
|
|
///
|
|
/// Used to create request to the CM server and
|
|
/// get all measure(s) data
|
|
///
|
|
/// # Also
|
|
/// An argument `measure: Arc<String>` can be a single measure like `measure$1` or
|
|
/// a slice of measures in special format `%5B%22measure$1%22,%20%22measure$2%22%5D`.
|
|
/// This is a neccesary measure to handle two types of requests and URL restrictions
|
|
///
|
|
async fn process_endpoint(
|
|
measure: Arc<String>,
|
|
client: Arc<Client>,
|
|
arc: Arc<Self>,
|
|
hm: &HashMap<String, String>,
|
|
) -> anyhow::Result<PrometheusMetricsExtended> {
|
|
tracing::trace!("Processing CM endpoint with one or more measure names");
|
|
let resp = client
|
|
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
|
.timeout(tokio::time::Duration::from_secs(arc.timeout as u64))
|
|
.header("Content-Type", "application/json")
|
|
.header("access-token", &arc.access_token)
|
|
.send().await?
|
|
.text().await?;
|
|
tokio::task::yield_now().await;
|
|
|
|
let resp: Value = serde_json::from_str(&resp)?;
|
|
Ok(
|
|
PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
|
|
)
|
|
}
|
|
|
|
/// An recursive extractor of data
|
|
///
|
|
/// Uses target-json CM-Server response as `Value` and HashMap of
|
|
/// measures' `id`s and their appropriate `description`s
|
|
///
|
|
/// # How it works
|
|
/// 1) if `Value` is an `Object` -> executes `Self::process_value` on it and
|
|
/// returns result of the function as `Vec`
|
|
///
|
|
/// 2) if `Value` is an `Array` -> self-executes for each pat of the array
|
|
/// and aggregates all data in the `Vec` by using `.append(&mut Vec<...>)`
|
|
///
|
|
/// 3) if `Value` is `_` -> returns error **Invalid JSON format**
|
|
///
|
|
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
|
Box::pin(async move {
|
|
return match json {
|
|
Value::Object(obj) => {
|
|
return Ok(vec![Self::process_value(&obj, hm).await?])
|
|
},
|
|
Value::Array(arr) => {
|
|
let mut vec = Vec::new();
|
|
for obj in arr {
|
|
if let Ok(mut val) = Self::extract_metric_data(obj, hm).await {
|
|
// vec.push(val);
|
|
vec.append(&mut val);
|
|
}
|
|
}
|
|
return Ok(vec)
|
|
},
|
|
_ => Err(Error::msg("Invalid JSON format")),
|
|
}
|
|
})
|
|
}
|
|
|
|
/// A function-extractor for single measure object
|
|
///
|
|
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
|
|
/// object
|
|
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
|
tracing::trace!("Processing atomic Object value in CM JSON-response");
|
|
let id = obj.get("$id");
|
|
let val = obj.get("value");
|
|
let description = {
|
|
let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null);
|
|
let zero = String::new();
|
|
if dola_ip.is_null() {
|
|
zero
|
|
} else {
|
|
hm.get(
|
|
dola_ip.as_str().unwrap_or_else(|| "")
|
|
)
|
|
.unwrap_or_else(|| &zero)
|
|
.to_owned()
|
|
}
|
|
};
|
|
|
|
if id.is_none() || val.is_none() {
|
|
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
|
}
|
|
let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_");
|
|
let val = val.unwrap();
|
|
|
|
if id.is_empty() {
|
|
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
|
}
|
|
|
|
Ok(MetricOutputExtended {
|
|
id : id.to_owned(),
|
|
json_type : match val {
|
|
Value::Number(val) => {
|
|
if val.is_i64() {
|
|
"i64".to_owned()
|
|
} else if val.is_u64() {
|
|
"u64".to_owned()
|
|
} else {
|
|
"f64".to_owned()
|
|
}
|
|
},
|
|
_ => "unknown".to_owned(),
|
|
},
|
|
addr : "enode.monitoring.api".to_owned(),
|
|
desc : description,
|
|
value : val.clone()
|
|
})
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for MonitoringImporter {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("MonitoringImporter")
|
|
.field("ip", &self.ip)
|
|
.field("login", &self.login)
|
|
.field("password", &"****")
|
|
.field("access_key", &"HIDDEN")
|
|
.field("ts", &self.ts)
|
|
.finish()
|
|
}
|
|
} |