use axum::{ extract::{Json, State}, response::IntoResponse, http }; use crate::structs::v3::PrometheusMetrics; use prometheus::{ core::Collector, Encoder, GaugeVec, Registry, TextEncoder, proto::Metric}; use std::{collections::HashMap, fmt::Display, sync::{ Arc, MutexGuard }}; use crate::AppState; use tracing::{ debug, error, info, warn, trace }; use crate::metrics::{MetricsProcesser, MetricsValueType}; use prometheus::opts; #[derive(Clone)] struct CloneableCollector(Arc); impl CloneableCollector { fn from_boxed(collector: Box) -> Self { CloneableCollector(Arc::from(collector)) } fn get_collector(&self) -> Box { Box::new(self.clone()) } fn to_gauge_vec(&self) -> GaugeVec { let mut fam = self.0.collect()[0].clone(); let metric_name = fam.take_name(); let metric_help = fam.take_help(); let lables = fam.get_metric().iter() .enumerate() .filter(|(id, _)| id == &0) .map(|(_, metric)| metric.get_label()) .flat_map(|a| a.iter()) .map(|a| a.get_name()) .collect::>(); let gv = GaugeVec::new(opts!(&metric_name, &metric_help), &lables).unwrap(); for metric in fam.get_metric() { // gather values let vals = metric.get_label() .iter() .map(|pair| pair.get_value()) .collect::>(); gv.with_label_values(&vals).set(metric.get_gauge().get_value()); } gv } } impl Display for CloneableCollector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { self.0.collect(); write!(f, "{:#?}", self.0.collect()) } } impl Collector for CloneableCollector { fn desc(&self) -> Vec<&prometheus::core::Desc> { self.0.desc() } fn collect(&self) -> Vec { self.0.collect() } } /// An `Update` endpoint /// /// Used to registrate new metrics and to update already /// existing in local metrics `Registry` /// /// # Usage /// /// ``` bash /// curl -X POST -d '...' 'http://127.0.0.1:9100/update' -d ... /// ``` /// pub async fn update_metrics( State(state): State>, Json(request) : Json> ) -> impl IntoResponse { trace!("post on /update"); let service = &request.service_name; // let endpoint = &request.endpoint_name; // if request.with_device_status_and_module() { // } else { // } let mut metrics = Vec::new(); for i in request.metrics { debug!("Processing metric: {:?}", &i); let metric_name = format!("{}_{}", service, &i.id); match MetricsProcesser::get_type_of_value(&i) { MetricsValueType::Array | MetricsValueType::TaggedArray => { trace!("processing an array of metrics"); // ... }, MetricsValueType::Number => { trace!("processing a number type of metric"); let gauge = MetricsProcesser::gauge_from_number( &i, &metric_name, &i.desc.clone().unwrap_or_else(|| std::borrow::Cow::Borrowed(&i.id)) ); metrics.push(gauge); // if let Some(gauge) = gauge { // match state.registry.lock() { // Err(er) => { // error!("Cannot lock Metric Registry due to {} ", er) // }, // Ok(registry) => { // // todo: error handler // if let Err(er) = update_or_insert_metric( // gauge, // registry, // &metric_name // ) { // error!("Update or insert metric crushed: {}", er); // return (http::StatusCode::INTERNAL_SERVER_ERROR, er.to_string()) // } // }, // } // } }, MetricsValueType::ArrayOfStrings => { trace!("processing an array of strings"); warn!("String arrays are unsupported, ignoring ..."); }, _ => { trace!("processing unrecognized type of metric"); warn!("Unrecognized metric type was supplied, ignoring ..."); } } } for gauge in metrics { if let Some(gauge) = gauge { match state.registry.lock() { Err(er) => { error!("Cannot lock Metric Registry due to {} ", er) }, Ok(registry) => { // todo: error handler if let Err(er) = update_or_insert_metric( gauge, registry, ) { error!("Update or insert metric crushed: {}", er); return (http::StatusCode::INTERNAL_SERVER_ERROR, er.to_string()) } }, } } } (http::StatusCode::ACCEPTED, String::from("Ok")) } /// An `Metrics` endpoint /// /// Needed in sharing current local metrics `Registry` state /// and all stored metrics /// /// # Usage /// /// ``` bash /// curl -X GET 'http://127.0.0.1:9100/metrics' /// ``` /// pub async fn metrics_handler(State(state): State>) -> String { trace!("get on /metrics"); let registry = state.registry.lock(); debug!("registry mutex lock is {}", registry.is_ok()); return match registry { Ok(registry) => { let encoder = TextEncoder::new(); let mut buffer = Vec::new(); let metric_families = registry.gather(); // dbg!(&metric_families); debug!("vec of metric families - {:?}", &metric_families); encoder.encode(&metric_families, &mut buffer).unwrap(); String::from_utf8(buffer).unwrap() }, Err(er) => { format!("Cannot lock Metric Registry due to {} ", er) } } } /// A function-registrator /// /// Registrates or updates metrics state in local /// `Registry` /// pub fn update_or_insert_metric<'a>( metric: Box, registry: MutexGuard<'a, Registry>, // metric_name: &str ) -> anyhow::Result<()> { trace!("fn update_or_insert_metric is running"); let prod = CloneableCollector::from_boxed(metric); let family = registry.gather(); let new_metric = prod.get_collector().collect(); let new_metric_family = &new_metric[0]; let new_metric_name = new_metric_family.get_name(); if let Some(mut fam) = family.into_iter() .find(|fam| fam.get_name() == new_metric_name) { trace!("found exising metric. processing update ..."); debug!("found metric : {:?}", &fam); let metric_name = fam.take_name(); let metric_help = fam.take_help(); let lables = fam.get_metric().iter() .enumerate() .filter(|(id, _)| id == &0) .map(|(_, metric)| metric.get_label()) .flat_map(|a| a.iter()) .map(|a| a.get_name()) .collect::>(); let gv = GaugeVec::new(opts!(&metric_name, &metric_help), &lables)?; for metric in fam.get_metric() { let vals = metric.get_label() .iter() .map(|pair| pair.get_value()) .collect::>(); gv.with_label_values(&vals).set(metric.get_gauge().get_value()); } trace!("recreated metric as gauge vec {:?}", gv.collect()); let new_vec = prod.to_gauge_vec().compare_with_old(&gv); registry.unregister(Box::new(gv))?; info!("Unregistered old metric!"); registry.register(Box::new(new_vec))?; info!("Metric `{}` was re-registered!", new_metric_name); } else { match registry.register(prod.get_collector()) { Ok(_) => info!("Metric `{}` was registered!", new_metric_name), Err(er) => { error!("Cannot register new metric `{}` due to {}", new_metric_name, er); return Err(anyhow::Error::msg( format!("Cannot register new metric `{}` due to {}", new_metric_name, er) )) }, } } // match registry.register(prod.get_collector()) { // Ok(_) => { // info!("Metric `{}` was registered!", metric_name); // }, // Err(er) => { // // update or throw away // match er { // Error::AlreadyReg => { // trace!("processing already regged metric"); // match registry.unregister(prod.get_collector()) { // Ok(_) => { // if let Err(er) = registry.register(prod.get_collector()) { // warn!("Cannot update metric `{}`", metric_name); // return Err(anyhow::Error::msg( // format!("Cannot update metric `{}` due to {}", metric_name, er) // )) // } else { // info!("OK on metric `{}` update", metric_name); // } // }, // Err(er) => { // error!("Cannot unregister metric `{}` due to {}", metric_name, er); // return Err(anyhow::Error::msg( // format!("Cannot unregister metric `{}` due to {}", metric_name, er) // )) // }, // } // }, // _ => { // error!("Cannot register new metric `{}` due to {}", metric_name, er); // return Err(anyhow::Error::msg( // format!("Cannot register new metric `{}` due to {}", metric_name, er) // )) // } // } // }, // } Ok(()) } trait CompareGaugeVec { fn compare_with_old(self, old: &GaugeVec) -> Self; } impl CompareGaugeVec for GaugeVec { fn compare_with_old(mut self, old: &GaugeVec) -> Self { // old. let old_fam = old.collect(); let new = self.collect(); 'outer: for old_fam in old_fam { for metric in old_fam.get_metric() { // labels for current old version of metric in vec let lables = get_hashmap_lables_from_metric(metric); dbg!(&lables); let value = metric.get_gauge().get_value(); for new in &new { for new_metric in new.get_metric() { let new_lables = get_hashmap_lables_from_metric(new_metric); dbg!(&new_lables); if lables.len() != new_lables.len() { error!("Trying to save invalid metric type. Reseting changes ..."); self = old.clone(); break 'outer; } else { match (lables.get("status"), new_lables.get("status")) { (Some(&status), Some(_)) => { match ( (lables.get("device"), lables.get("module")), (new_lables.get("device"), new_lables.get("module")), ) { ((Some(&device), Some(&module)), (Some(&new_device), Some(&new_module))) => { /* */ dbg!(1); if device != new_device || module != new_module { dbg!(2); self.with_label_values(&[device, module, status]).set(value); // continue 'outer; } }, ((Some(&device), None), (Some(&new_device), None)) => { /* */ if device != new_device { self.with_label_values(&[device, status]).set(value); // continue 'outer; } }, _ => { /* DEAD END */}, } }, _ => { /* DEAD END */ }, } } } } } } self } } fn get_hashmap_lables_from_metric(metric: &Metric) -> HashMap<&str, &str> { metric.get_label() .iter() .map(|pair| (pair.get_name(), pair.get_value())) .collect::>() }