diff --git a/src/endpoints.rs b/src/endpoints.rs index 7369452..a369f09 100644 --- a/src/endpoints.rs +++ b/src/endpoints.rs @@ -4,11 +4,12 @@ use axum::{ http }; use crate::structs::v3::PrometheusMetrics; -use prometheus::{ core::Collector, Encoder, Registry, TextEncoder}; -use std::sync::{ Arc, MutexGuard }; +use prometheus::{ core::Collector, Encoder, GaugeVec, Registry, TextEncoder, proto::Metric}; +use std::{collections::HashMap, fmt::Display, sync::{ Arc, MutexGuard }}; use crate::AppState; use tracing::{ debug, error, info, warn, trace }; -use crate::metrics::{MetricsProcesser, MetricsValueType}; +use crate::metrics::{MetricsProcesser, MetricsValueType, IsTaggedWithParams}; +use prometheus::opts; #[derive(Clone)] struct CloneableCollector(Arc); @@ -19,7 +20,41 @@ impl CloneableCollector { fn get_collector(&self) -> Box { Box::new(self.clone()) } + fn to_gauge_vec(&self) -> GaugeVec { + let mut fam = self.0.collect()[0].clone(); + + let metric_name = fam.take_name(); + let metric_help = fam.take_help(); + + let lables = fam.get_metric().iter() + .enumerate() + .filter(|(id, _)| id == &0) + .map(|(_, metric)| metric.get_label()) + .flat_map(|a| a.iter()) + .map(|a| a.get_name()) + .collect::>(); + + let gv = GaugeVec::new(opts!(&metric_name, &metric_help), &lables).unwrap(); + + for metric in fam.get_metric() { + // gather values + let vals = metric.get_label() + .iter() + .map(|pair| pair.get_value()) + .collect::>(); + gv.with_label_values(&vals).set(metric.get_gauge().get_value()); + } + gv + } } + +impl Display for CloneableCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + self.0.collect(); + write!(f, "{:#?}", self.0.collect()) + } +} + impl Collector for CloneableCollector { fn desc(&self) -> Vec<&prometheus::core::Desc> { self.0.desc() @@ -47,11 +82,18 @@ pub async fn update_metrics( ) -> impl IntoResponse { trace!("post on /update"); let service = &request.service_name; - let endpoint = &request.endpoint_name; + // let endpoint = &request.endpoint_name; + // if request.with_device_status_and_module() { + + // } else { + + // } + + let mut metrics = Vec::new(); for i in request.metrics { debug!("Processing metric: {:?}", &i); - let metric_name = format!("{}_{}_{}", service, endpoint, &i.id); + let metric_name = format!("{}_{}", service, &i.id); match MetricsProcesser::get_type_of_value(&i) { MetricsValueType::Array | MetricsValueType::TaggedArray => { @@ -65,21 +107,25 @@ pub async fn update_metrics( &metric_name, &i.desc.clone().unwrap_or_else(|| std::borrow::Cow::Borrowed(&i.id)) ); - if let Some(gauge) = gauge { - match state.registry.lock() { - Err(er) => { - error!("Cannot lock Metric Registry due to {} ", er) - }, - Ok(registry) => { - // todo: error handler - let _ = update_or_insert_metric( - gauge, - registry, - &metric_name - ); - }, - } - } + metrics.push(gauge); + // if let Some(gauge) = gauge { + // match state.registry.lock() { + // Err(er) => { + // error!("Cannot lock Metric Registry due to {} ", er) + // }, + // Ok(registry) => { + // // todo: error handler + // if let Err(er) = update_or_insert_metric( + // gauge, + // registry, + // &metric_name + // ) { + // error!("Update or insert metric crushed: {}", er); + // return (http::StatusCode::INTERNAL_SERVER_ERROR, er.to_string()) + // } + // }, + // } + // } }, MetricsValueType::ArrayOfStrings => { trace!("processing an array of strings"); @@ -91,8 +137,26 @@ pub async fn update_metrics( } } } - - (http::StatusCode::ACCEPTED, "Ok") + for gauge in metrics { + if let Some(gauge) = gauge { + match state.registry.lock() { + Err(er) => { + error!("Cannot lock Metric Registry due to {} ", er) + }, + Ok(registry) => { + // todo: error handler + if let Err(er) = update_or_insert_metric( + gauge, + registry, + ) { + error!("Update or insert metric crushed: {}", er); + return (http::StatusCode::INTERNAL_SERVER_ERROR, er.to_string()) + } + }, + } + } + } + (http::StatusCode::ACCEPTED, String::from("Ok")) } /// An `Metrics` endpoint @@ -116,7 +180,7 @@ pub async fn metrics_handler(State(state): State>) -> String { let encoder = TextEncoder::new(); let mut buffer = Vec::new(); let metric_families = registry.gather(); - + // dbg!(&metric_families); debug!("vec of metric families - {:?}", &metric_families); encoder.encode(&metric_families, &mut buffer).unwrap(); @@ -136,48 +200,266 @@ pub async fn metrics_handler(State(state): State>) -> String { pub fn update_or_insert_metric<'a>( metric: Box, registry: MutexGuard<'a, Registry>, - metric_name: &str + // metric_name: &str ) -> anyhow::Result<()> { trace!("fn update_or_insert_metric is running"); - use prometheus::Error; let prod = CloneableCollector::from_boxed(metric); - match registry.register(prod.get_collector()) { - Ok(_) => { - info!("Metric `{}` was registered!", metric_name); - }, - Err(er) => { - // update or throw away - match er { - Error::AlreadyReg => { - trace!("processing already regged metric"); - match registry.unregister(prod.get_collector()) { - Ok(_) => { - if let Err(er) = registry.register(prod.get_collector()) { - warn!("Cannot update metric `{}`", metric_name); - return Err(anyhow::Error::msg( - format!("Cannot update metric `{}` due to {}", metric_name, er) - )) - } else { - info!("OK on metric `{}` update", metric_name); - } - }, - Err(er) => { - error!("Cannot unregister metric `{}` due to {}", metric_name, er); - return Err(anyhow::Error::msg( - format!("Cannot unregister metric `{}` due to {}", metric_name, er) - )) - }, - } - }, - _ => { - error!("Cannot register new metric `{}` due to {}", metric_name, er); - return Err(anyhow::Error::msg( - format!("Cannot register new metric `{}` due to {}", metric_name, er) - )) - } - } - }, + let family = registry.gather(); + + let new_metric = prod.get_collector().collect(); + let new_metric_family = &new_metric[0]; + let new_metric_name = new_metric_family.get_name(); + + if let Some(mut fam) = family.into_iter() + .find(|fam| fam.get_name() == new_metric_name) { + trace!("found exising metric. processing update ..."); + debug!("found metric : {:?}", &fam); + let metric_name = fam.take_name(); + let metric_help = fam.take_help(); + let lables = fam.get_metric().iter() + .enumerate() + .filter(|(id, _)| id == &0) + .map(|(_, metric)| metric.get_label()) + .flat_map(|a| a.iter()) + .map(|a| a.get_name()) + .collect::>(); + + let gv = GaugeVec::new(opts!(&metric_name, &metric_help), &lables)?; + + for metric in fam.get_metric() { + let vals = metric.get_label() + .iter() + .map(|pair| pair.get_value()) + .collect::>(); + gv.with_label_values(&vals).set(metric.get_gauge().get_value()); + } + trace!("recreated metric as gauge vec {:?}", gv.collect()); + + let new_vec = prod.to_gauge_vec().compare_with_old(&gv); + + registry.unregister(Box::new(gv))?; + info!("Unregistered old metric!"); + registry.register(Box::new(new_vec))?; + info!("Metric `{}` was re-registered!", new_metric_name); + } else { + match registry.register(prod.get_collector()) { + Ok(_) => info!("Metric `{}` was registered!", new_metric_name), + Err(er) => { + error!("Cannot register new metric `{}` due to {}", new_metric_name, er); + return Err(anyhow::Error::msg( + format!("Cannot register new metric `{}` due to {}", new_metric_name, er) + )) + }, + } } + + // match registry.register(prod.get_collector()) { + // Ok(_) => { + // info!("Metric `{}` was registered!", metric_name); + // }, + // Err(er) => { + // // update or throw away + // match er { + // Error::AlreadyReg => { + // trace!("processing already regged metric"); + // match registry.unregister(prod.get_collector()) { + // Ok(_) => { + // if let Err(er) = registry.register(prod.get_collector()) { + // warn!("Cannot update metric `{}`", metric_name); + // return Err(anyhow::Error::msg( + // format!("Cannot update metric `{}` due to {}", metric_name, er) + // )) + // } else { + // info!("OK on metric `{}` update", metric_name); + // } + // }, + // Err(er) => { + // error!("Cannot unregister metric `{}` due to {}", metric_name, er); + // return Err(anyhow::Error::msg( + // format!("Cannot unregister metric `{}` due to {}", metric_name, er) + // )) + // }, + // } + // }, + // _ => { + // error!("Cannot register new metric `{}` due to {}", metric_name, er); + // return Err(anyhow::Error::msg( + // format!("Cannot register new metric `{}` due to {}", metric_name, er) + // )) + // } + // } + // }, + // } Ok(()) } + + +trait CompareGaugeVec { + fn compare_with_old(self, old: &GaugeVec) -> Self; +} + +impl CompareGaugeVec for GaugeVec { + fn compare_with_old(mut self, old: &GaugeVec) -> Self { + // old. + let old_fam = old.collect(); + let new = self.collect(); + + 'outer: for old_fam in old_fam { + for metric in old_fam.get_metric() { + // labels for current old version of metric in vec + let lables = get_hashmap_lables_from_metric(metric); + dbg!(&lables); + let value = metric.get_gauge().get_value(); + + for new in &new { + for new_metric in new.get_metric() { + let new_lables = get_hashmap_lables_from_metric(new_metric); + dbg!(&new_lables); + if lables.len() != new_lables.len() { + error!("Trying to save invalid metric type. Reseting changes ..."); + self = old.clone(); + break 'outer; + } else { + match (lables.get("status"), new_lables.get("status")) { + (Some(&status), Some(&new_status)) => { + match ( + (lables.get("device"), lables.get("module")), + (new_lables.get("device"), new_lables.get("module")), + ) { + ((Some(&device), Some(&module)), + (Some(&new_device), Some(&new_module))) => { + /* */ + dbg!(1); + if device != new_device || module != new_module { + dbg!(2); + self.with_label_values(&[device, module, status]).set(value); + // continue 'outer; + } + }, + ((Some(&device), None), + (Some(&new_device), None)) => { + /* */ + if device != new_device { + self.with_label_values(&[device, status]).set(value); + // continue 'outer; + } + }, + _ => { /* DEAD END */}, + } + }, + _ => { /* DEAD END */ }, + } + } + } + } + } + } + + self + } +} + +fn get_hashmap_lables_from_metric(metric: &Metric) -> HashMap<&str, &str> { + metric.get_label() + .iter() + .map(|pair| (pair.get_name(), pair.get_value())) + .collect::>() +} + +fn get_vectored_lables_from_metric(metric: &Metric) -> Vec<&str> { + metric.get_label() + .iter() + .map(|pair| pair.get_value()) + .collect::>() +} + +struct MeasureLables<'a> { + status : Option<&'a str>, + device : Option<&'a str>, + module : Option<&'a str>, +} + +impl<'a> MeasureLables<'a> { + fn new_from_hashed(entry: HashMap<&'a str, &'a str>) -> MeasureLables<'a> { + MeasureLables { + status : entry.get("status").map(|v| &**v), + device : entry.get("device").map(|v| &**v), + module : entry.get("module").map(|v| &**v), + } + } + fn self_type(&'a self) -> LablesList<'a> { + return match self.status { + None => LablesList::Empty, + Some(status) => { + match self.status { + None => LablesList::OnlyStatus(status), + Some(device) => { + match self.module { + None => LablesList::WithDevice(status, device), + Some(module) => LablesList::Full(status, device, module) + } + }, + } + }, + } + } +} + +enum LablesList<'a> { + Full(&'a str, &'a str, &'a str), + WithDevice(&'a str, &'a str), + OnlyStatus(&'a str), + Empty, +} + +enum LablesType { + Full, + WithDevice, + OnlyStatus, + Empty, +} + +enum CompareLables { + NeedToSave, + Ignore +} + + + +impl<'a> LablesList<'a> { + fn type_of(&'a self) -> LablesType { + return match self { + LablesList::Full(_, _, _) => LablesType::Full, + LablesList::WithDevice(_, _) => LablesType::WithDevice, + LablesList::OnlyStatus(_) => LablesType::OnlyStatus, + LablesList::Empty => LablesType::Empty, + } + } + fn is_changed(&self, other: &'a LablesList) -> CompareLables { + return match (self, other) { + (LablesList::Full(s, d, m), + LablesList::Full(s1, d1, m1)) => { + if s != s1 && d == d1 && m == m1 { + return CompareLables::NeedToSave + } + CompareLables::Ignore + }, + (LablesList::WithDevice(s, d), + LablesList::WithDevice(s1, d1)) => { + if s != s1 && d == d1 { + return CompareLables::NeedToSave + } + CompareLables::Ignore + }, + (LablesList::OnlyStatus(s), + LablesList::OnlyStatus(s1)) => { + if s != s1 { + return CompareLables::NeedToSave + } + CompareLables::Ignore + }, + _ => CompareLables::Ignore, + } + } +} \ No newline at end of file diff --git a/src/metrics.rs b/src/metrics.rs index 3376aab..e7a9eb1 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use crate::structs::v3::MetricOutput; +use crate::structs::v3::{MetricOutput, PrometheusMetrics}; use serde_json::{Map, Value}; use prometheus::Gauge; use tracing::error; @@ -47,14 +47,31 @@ impl MetricsProcesser { pub fn gauge_from_number( metric: &MetricOutput, metric_name: &str, - metric_desc: &str + metric_desc: &str, + // status: Option, + // device: Option, ) -> Option> { trace!("fn gauge_from_number is running"); if let Some(status) = metric.status { - let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status"]).unwrap(); - vec.with_label_values(&[&status.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); - debug!("processed metric: {:?}", &vec); - return Some(Box::new(vec)); + if let Some(device) = metric.device { + if let Some(module) = metric.module { + let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status", "device", "module"]).unwrap(); + vec.with_label_values(&[&status.to_string(), &device.to_string(), &module.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); + debug!("processed metric: {:?}", &vec); + return Some(Box::new(vec)); + } + else { + let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status", "device"]).unwrap(); + vec.with_label_values(&[&status.to_string(), &device.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); + debug!("processed metric: {:?}", &vec); + return Some(Box::new(vec)); + } + } else { + let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status"]).unwrap(); + vec.with_label_values(&[&status.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); + debug!("processed metric: {:?}", &vec); + return Some(Box::new(vec)); + } } let gauge = Gauge::new( @@ -86,7 +103,7 @@ impl MetricsProcesser { pub fn gauge_from_map_metrics( map: &Map, service: &str, - endpoint: &str + endpoint: &str, ) -> Option { let map = map.clone(); let help: String = map.keys() @@ -205,4 +222,14 @@ impl IsTaggedArray for Value { } false } +} + +pub trait IsTaggedWithParams<'a> { + fn with_device_status_and_module(&self) -> bool; +} + +impl<'a> IsTaggedWithParams<'a> for PrometheusMetrics<'a> { + fn with_device_status_and_module(&self) -> bool { + self.metrics.iter().any(|metric| metric.module.is_some() && metric.device.is_some()) + } } \ No newline at end of file