diff --git a/src/endpoints.rs b/src/endpoints.rs index 7369452..d167353 100644 --- a/src/endpoints.rs +++ b/src/endpoints.rs @@ -4,11 +4,12 @@ use axum::{ http }; use crate::structs::v3::PrometheusMetrics; -use prometheus::{ core::Collector, Encoder, Registry, TextEncoder}; -use std::sync::{ Arc, MutexGuard }; +use prometheus::{ core::Collector, Encoder, GaugeVec, Registry, TextEncoder, proto::Metric}; +use std::{collections::HashMap, fmt::Display, sync::{ Arc, MutexGuard }}; use crate::AppState; use tracing::{ debug, error, info, warn, trace }; use crate::metrics::{MetricsProcesser, MetricsValueType}; +use prometheus::opts; #[derive(Clone)] struct CloneableCollector(Arc); @@ -19,7 +20,41 @@ impl CloneableCollector { fn get_collector(&self) -> Box { Box::new(self.clone()) } + fn to_gauge_vec(&self) -> GaugeVec { + let mut fam = self.0.collect()[0].clone(); + + let metric_name = fam.take_name(); + let metric_help = fam.take_help(); + + let lables = fam.get_metric().iter() + .enumerate() + .filter(|(id, _)| id == &0) + .map(|(_, metric)| metric.get_label()) + .flat_map(|a| a.iter()) + .map(|a| a.get_name()) + .collect::>(); + + let gv = GaugeVec::new(opts!(&metric_name, &metric_help), &lables).unwrap(); + + for metric in fam.get_metric() { + // gather values + let vals = metric.get_label() + .iter() + .map(|pair| pair.get_value()) + .collect::>(); + gv.with_label_values(&vals).set(metric.get_gauge().get_value()); + } + gv + } } + +impl Display for CloneableCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + self.0.collect(); + write!(f, "{:#?}", self.0.collect()) + } +} + impl Collector for CloneableCollector { fn desc(&self) -> Vec<&prometheus::core::Desc> { self.0.desc() @@ -47,11 +82,18 @@ pub async fn update_metrics( ) -> impl IntoResponse { trace!("post on /update"); let service = &request.service_name; - let endpoint = &request.endpoint_name; + // let endpoint = &request.endpoint_name; + // if request.with_device_status_and_module() { + + // } else { + + // } + + let mut metrics = Vec::new(); for i in request.metrics { debug!("Processing metric: {:?}", &i); - let metric_name = format!("{}_{}_{}", service, endpoint, &i.id); + let metric_name = format!("{}_{}", service, &i.id); match MetricsProcesser::get_type_of_value(&i) { MetricsValueType::Array | MetricsValueType::TaggedArray => { @@ -65,21 +107,25 @@ pub async fn update_metrics( &metric_name, &i.desc.clone().unwrap_or_else(|| std::borrow::Cow::Borrowed(&i.id)) ); - if let Some(gauge) = gauge { - match state.registry.lock() { - Err(er) => { - error!("Cannot lock Metric Registry due to {} ", er) - }, - Ok(registry) => { - // todo: error handler - let _ = update_or_insert_metric( - gauge, - registry, - &metric_name - ); - }, - } - } + metrics.push(gauge); + // if let Some(gauge) = gauge { + // match state.registry.lock() { + // Err(er) => { + // error!("Cannot lock Metric Registry due to {} ", er) + // }, + // Ok(registry) => { + // // todo: error handler + // if let Err(er) = update_or_insert_metric( + // gauge, + // registry, + // &metric_name + // ) { + // error!("Update or insert metric crushed: {}", er); + // return (http::StatusCode::INTERNAL_SERVER_ERROR, er.to_string()) + // } + // }, + // } + // } }, MetricsValueType::ArrayOfStrings => { trace!("processing an array of strings"); @@ -91,8 +137,26 @@ pub async fn update_metrics( } } } - - (http::StatusCode::ACCEPTED, "Ok") + for gauge in metrics { + if let Some(gauge) = gauge { + match state.registry.lock() { + Err(er) => { + error!("Cannot lock Metric Registry due to {} ", er) + }, + Ok(registry) => { + // todo: error handler + if let Err(er) = update_or_insert_metric( + gauge, + registry, + ) { + error!("Update or insert metric crushed: {}", er); + return (http::StatusCode::INTERNAL_SERVER_ERROR, er.to_string()) + } + }, + } + } + } + (http::StatusCode::ACCEPTED, String::from("Ok")) } /// An `Metrics` endpoint @@ -116,7 +180,7 @@ pub async fn metrics_handler(State(state): State>) -> String { let encoder = TextEncoder::new(); let mut buffer = Vec::new(); let metric_families = registry.gather(); - + // dbg!(&metric_families); debug!("vec of metric families - {:?}", &metric_families); encoder.encode(&metric_families, &mut buffer).unwrap(); @@ -136,48 +200,169 @@ pub async fn metrics_handler(State(state): State>) -> String { pub fn update_or_insert_metric<'a>( metric: Box, registry: MutexGuard<'a, Registry>, - metric_name: &str + // metric_name: &str ) -> anyhow::Result<()> { trace!("fn update_or_insert_metric is running"); - use prometheus::Error; let prod = CloneableCollector::from_boxed(metric); - match registry.register(prod.get_collector()) { - Ok(_) => { - info!("Metric `{}` was registered!", metric_name); - }, - Err(er) => { - // update or throw away - match er { - Error::AlreadyReg => { - trace!("processing already regged metric"); - match registry.unregister(prod.get_collector()) { - Ok(_) => { - if let Err(er) = registry.register(prod.get_collector()) { - warn!("Cannot update metric `{}`", metric_name); - return Err(anyhow::Error::msg( - format!("Cannot update metric `{}` due to {}", metric_name, er) - )) - } else { - info!("OK on metric `{}` update", metric_name); - } - }, - Err(er) => { - error!("Cannot unregister metric `{}` due to {}", metric_name, er); - return Err(anyhow::Error::msg( - format!("Cannot unregister metric `{}` due to {}", metric_name, er) - )) - }, - } - }, - _ => { - error!("Cannot register new metric `{}` due to {}", metric_name, er); - return Err(anyhow::Error::msg( - format!("Cannot register new metric `{}` due to {}", metric_name, er) - )) - } - } - }, + let family = registry.gather(); + + let new_metric = prod.get_collector().collect(); + let new_metric_family = &new_metric[0]; + let new_metric_name = new_metric_family.get_name(); + + if let Some(mut fam) = family.into_iter() + .find(|fam| fam.get_name() == new_metric_name) { + trace!("found exising metric. processing update ..."); + debug!("found metric : {:?}", &fam); + let metric_name = fam.take_name(); + let metric_help = fam.take_help(); + let lables = fam.get_metric().iter() + .enumerate() + .filter(|(id, _)| id == &0) + .map(|(_, metric)| metric.get_label()) + .flat_map(|a| a.iter()) + .map(|a| a.get_name()) + .collect::>(); + + let gv = GaugeVec::new(opts!(&metric_name, &metric_help), &lables)?; + + for metric in fam.get_metric() { + let vals = metric.get_label() + .iter() + .map(|pair| pair.get_value()) + .collect::>(); + gv.with_label_values(&vals).set(metric.get_gauge().get_value()); + } + trace!("recreated metric as gauge vec {:?}", gv.collect()); + + let new_vec = prod.to_gauge_vec().compare_with_old(&gv); + + registry.unregister(Box::new(gv))?; + info!("Unregistered old metric!"); + registry.register(Box::new(new_vec))?; + info!("Metric `{}` was re-registered!", new_metric_name); + } else { + match registry.register(prod.get_collector()) { + Ok(_) => info!("Metric `{}` was registered!", new_metric_name), + Err(er) => { + error!("Cannot register new metric `{}` due to {}", new_metric_name, er); + return Err(anyhow::Error::msg( + format!("Cannot register new metric `{}` due to {}", new_metric_name, er) + )) + }, + } } + + // match registry.register(prod.get_collector()) { + // Ok(_) => { + // info!("Metric `{}` was registered!", metric_name); + // }, + // Err(er) => { + // // update or throw away + // match er { + // Error::AlreadyReg => { + // trace!("processing already regged metric"); + // match registry.unregister(prod.get_collector()) { + // Ok(_) => { + // if let Err(er) = registry.register(prod.get_collector()) { + // warn!("Cannot update metric `{}`", metric_name); + // return Err(anyhow::Error::msg( + // format!("Cannot update metric `{}` due to {}", metric_name, er) + // )) + // } else { + // info!("OK on metric `{}` update", metric_name); + // } + // }, + // Err(er) => { + // error!("Cannot unregister metric `{}` due to {}", metric_name, er); + // return Err(anyhow::Error::msg( + // format!("Cannot unregister metric `{}` due to {}", metric_name, er) + // )) + // }, + // } + // }, + // _ => { + // error!("Cannot register new metric `{}` due to {}", metric_name, er); + // return Err(anyhow::Error::msg( + // format!("Cannot register new metric `{}` due to {}", metric_name, er) + // )) + // } + // } + // }, + // } Ok(()) } + + +trait CompareGaugeVec { + fn compare_with_old(self, old: &GaugeVec) -> Self; +} + +impl CompareGaugeVec for GaugeVec { + fn compare_with_old(mut self, old: &GaugeVec) -> Self { + // old. + let old_fam = old.collect(); + let new = self.collect(); + + 'outer: for old_fam in old_fam { + for metric in old_fam.get_metric() { + // labels for current old version of metric in vec + let lables = get_hashmap_lables_from_metric(metric); + dbg!(&lables); + let value = metric.get_gauge().get_value(); + + for new in &new { + for new_metric in new.get_metric() { + let new_lables = get_hashmap_lables_from_metric(new_metric); + dbg!(&new_lables); + if lables.len() != new_lables.len() { + error!("Trying to save invalid metric type. Reseting changes ..."); + self = old.clone(); + break 'outer; + } else { + match (lables.get("status"), new_lables.get("status")) { + (Some(&status), Some(_)) => { + match ( + (lables.get("device"), lables.get("module")), + (new_lables.get("device"), new_lables.get("module")), + ) { + ((Some(&device), Some(&module)), + (Some(&new_device), Some(&new_module))) => { + /* */ + dbg!(1); + if device != new_device || module != new_module { + dbg!(2); + self.with_label_values(&[device, module, status]).set(value); + // continue 'outer; + } + }, + ((Some(&device), None), + (Some(&new_device), None)) => { + /* */ + if device != new_device { + self.with_label_values(&[device, status]).set(value); + // continue 'outer; + } + }, + _ => { /* DEAD END */}, + } + }, + _ => { /* DEAD END */ }, + } + } + } + } + } + } + + self + } +} + +fn get_hashmap_lables_from_metric(metric: &Metric) -> HashMap<&str, &str> { + metric.get_label() + .iter() + .map(|pair| (pair.get_name(), pair.get_value())) + .collect::>() +} \ No newline at end of file diff --git a/src/metrics.rs b/src/metrics.rs index 3376aab..b47270d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,10 +1,7 @@ -use std::sync::Arc; - use crate::structs::v3::MetricOutput; use serde_json::{Map, Value}; use prometheus::Gauge; use tracing::error; -use prometheus::Opts; use prometheus::{opts, GaugeVec, core::Collector}; use tracing::{debug, trace}; @@ -47,14 +44,31 @@ impl MetricsProcesser { pub fn gauge_from_number( metric: &MetricOutput, metric_name: &str, - metric_desc: &str + metric_desc: &str, + // status: Option, + // device: Option, ) -> Option> { trace!("fn gauge_from_number is running"); if let Some(status) = metric.status { - let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status"]).unwrap(); - vec.with_label_values(&[&status.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); - debug!("processed metric: {:?}", &vec); - return Some(Box::new(vec)); + if let Some(device) = metric.device { + if let Some(module) = metric.module { + let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status", "device", "module"]).unwrap(); + vec.with_label_values(&[&status.to_string(), &device.to_string(), &module.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); + debug!("processed metric: {:?}", &vec); + return Some(Box::new(vec)); + } + else { + let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status", "device"]).unwrap(); + vec.with_label_values(&[&status.to_string(), &device.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); + debug!("processed metric: {:?}", &vec); + return Some(Box::new(vec)); + } + } else { + let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status"]).unwrap(); + vec.with_label_values(&[&status.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); + debug!("processed metric: {:?}", &vec); + return Some(Box::new(vec)); + } } let gauge = Gauge::new( @@ -83,89 +97,6 @@ impl MetricsProcesser { } None } - pub fn gauge_from_map_metrics( - map: &Map, - service: &str, - endpoint: &str - ) -> Option { - let map = map.clone(); - let help: String = map.keys() - .enumerate() - .map(|(idx, key)| { - if idx == 1 { - return key.to_owned(); - } - "".to_owned() - }) - .collect(); - let name = format!("{}_{}_{}", service, endpoint, &help); - if map.len() > 1 { - // tagged - if map.len() > 2 { - error!("Cannot create Gauge {}. It can be only 1 tag", &name); - } else { - let mut label_name = String::new(); - let mut label_value = String::new(); - let mut metric_value = 0.0; - map.iter() - .enumerate() - .for_each(|(idx, (key, value))|{ - if idx == 0 { - label_name = key.to_owned(); - label_value = value.as_str() - .unwrap_or("") - .to_owned(); - } else { - metric_value = value.as_f64().unwrap_or(0.0) - } - }); - let opts = Opts::new(&name, &help); - let gauge_vec = GaugeVec::new(opts, &[&label_name]); - match gauge_vec { - Ok(vec) => { - match vec.get_metric_with_label_values(&[&label_value]) { - Ok(metric) => { - metric.set(metric_value); - return Some(metric.clone()); - }, - Err(er) => { - error!("Cannot create Gauge {} due to {}", &name, er); - }, - } - }, - Err(er) => error!("Cannot create Gauge {} due to {}", &name, er), - } - } - } else { - // not-tagged - let metric = Gauge::new(&name, &help); - match metric { - Ok(gauge) => { - let mut value = 0.0; - map.values() - .map(|val| val.clone().as_f64()) - .for_each(|val| { - value = val.unwrap_or(0.0) - }); - gauge.set(value); - return Some(gauge); - }, - Err(er) => { - error!("Cannot create Gauge {} due to {}", &name, er); - } - } - } - None - } - pub fn get_value_as_vec_map(metrics: &MetricOutput) -> Vec>{ - let mut vec: Vec> = Vec::new(); - let arr = metrics.value.as_array().unwrap(); - arr.iter() - .for_each(|a| { - vec.push(serde_json::from_value(a.clone()).unwrap()); - }); - vec - } pub fn is_array_of_string_values(metrics: &MetricOutput) -> bool { let arr = metrics.value.clone(); let arr = arr.as_array().unwrap(); @@ -175,9 +106,6 @@ impl MetricsProcesser { map.values() .all(|val| val.is_string()) } - // fn is_valid(metrics: &PrometheusMetrics) -> bool { - // false - // } fn is_array(metrics: &MetricOutput) -> bool { metrics.value.is_array() } @@ -190,19 +118,4 @@ impl MetricsProcesser { metrics .value.is_number() } -} - - - -trait IsTaggedArray { - fn is_tagged_array(&self) -> bool; -} - -impl IsTaggedArray for Value { - fn is_tagged_array(&self) -> bool { - if let Some(arr) = self.as_array() { - return arr[0].get("tag_name").is_some(); - } - false - } } \ No newline at end of file diff --git a/src/structs.rs b/src/structs.rs index a15eee7..8095768 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -17,7 +17,9 @@ pub mod v3 { pub value : Value, #[serde(rename = "description")] pub desc : Option>, - pub status: Option + pub status: Option, + pub device: Option, + pub module: Option, } #[derive(Serialize, Deserialize, Debug)]