diff --git a/.env.example b/.env.example index d82e45f..d6acc3c 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,8 @@ # port binding for prometheus-exporter # default value = 9100 PROMETHEUS_EXPORTER_PORT = 9100 + +# setting up max level of logging +# default - INFO +# values : WARN, ERROR, INFO, DEBUG, TRACE +PROMETHEUS_EXPORTER_LOG_LEVEL = "TRACE" \ No newline at end of file diff --git a/src/endpoints.rs b/src/endpoints.rs index d3d1525..7369452 100644 --- a/src/endpoints.rs +++ b/src/endpoints.rs @@ -4,12 +4,32 @@ use axum::{ http }; use crate::structs::v3::PrometheusMetrics; -use prometheus::{ Encoder, Gauge, Registry, TextEncoder}; +use prometheus::{ core::Collector, Encoder, Registry, TextEncoder}; use std::sync::{ Arc, MutexGuard }; use crate::AppState; -use tracing::{ debug, error, info, warn }; +use tracing::{ debug, error, info, warn, trace }; use crate::metrics::{MetricsProcesser, MetricsValueType}; +#[derive(Clone)] +struct CloneableCollector(Arc); +impl CloneableCollector { + fn from_boxed(collector: Box) -> Self { + CloneableCollector(Arc::from(collector)) + } + fn get_collector(&self) -> Box { + Box::new(self.clone()) + } +} +impl Collector for CloneableCollector { + fn desc(&self) -> Vec<&prometheus::core::Desc> { + self.0.desc() + } + + fn collect(&self) -> Vec { + self.0.collect() + } +} + /// An `Update` endpoint /// /// Used to registrate new metrics and to update already @@ -18,14 +38,14 @@ use crate::metrics::{MetricsProcesser, MetricsValueType}; /// # Usage /// /// ``` bash -/// curl -X POST -d '"id" : ...' 'http::/localhost:9100/update' +/// curl -X POST -d '...' 'http://127.0.0.1:9100/update' -d ... /// ``` /// pub async fn update_metrics( State(state): State>, Json(request) : Json> ) -> impl IntoResponse { - info!("post on /update"); + trace!("post on /update"); let service = &request.service_name; let endpoint = &request.endpoint_name; @@ -35,9 +55,11 @@ pub async fn update_metrics( match MetricsProcesser::get_type_of_value(&i) { MetricsValueType::Array | MetricsValueType::TaggedArray => { - + trace!("processing an array of metrics"); + // ... }, MetricsValueType::Number => { + trace!("processing a number type of metric"); let gauge = MetricsProcesser::gauge_from_number( &i, &metric_name, @@ -60,9 +82,11 @@ pub async fn update_metrics( } }, MetricsValueType::ArrayOfStrings => { + trace!("processing an array of strings"); warn!("String arrays are unsupported, ignoring ..."); }, _ => { + trace!("processing unrecognized type of metric"); warn!("Unrecognized metric type was supplied, ignoring ..."); } } @@ -79,17 +103,22 @@ pub async fn update_metrics( /// # Usage /// /// ``` bash -/// curl -X GET 'http::/localhost:9100/metrics' +/// curl -X GET 'http://127.0.0.1:9100/metrics' /// ``` /// pub async fn metrics_handler(State(state): State>) -> String { + trace!("get on /metrics"); let registry = state.registry.lock(); + debug!("registry mutex lock is {}", registry.is_ok()); return match registry { Ok(registry) => { let encoder = TextEncoder::new(); let mut buffer = Vec::new(); let metric_families = registry.gather(); + + debug!("vec of metric families - {:?}", &metric_families); + encoder.encode(&metric_families, &mut buffer).unwrap(); String::from_utf8(buffer).unwrap() }, @@ -105,13 +134,15 @@ pub async fn metrics_handler(State(state): State>) -> String { /// `Registry` /// pub fn update_or_insert_metric<'a>( - metric: Gauge, + metric: Box, registry: MutexGuard<'a, Registry>, metric_name: &str ) -> anyhow::Result<()> { + trace!("fn update_or_insert_metric is running"); use prometheus::Error; - // let mut counter = 0; - match registry.register(Box::new(metric.clone())) { + let prod = CloneableCollector::from_boxed(metric); + + match registry.register(prod.get_collector()) { Ok(_) => { info!("Metric `{}` was registered!", metric_name); }, @@ -119,10 +150,10 @@ pub fn update_or_insert_metric<'a>( // update or throw away match er { Error::AlreadyReg => { - - match registry.unregister(Box::new(metric.clone())) { + trace!("processing already regged metric"); + match registry.unregister(prod.get_collector()) { Ok(_) => { - if let Err(er) = registry.register(Box::new(metric)) { + if let Err(er) = registry.register(prod.get_collector()) { warn!("Cannot update metric `{}`", metric_name); return Err(anyhow::Error::msg( format!("Cannot update metric `{}` due to {}", metric_name, er) @@ -138,29 +169,6 @@ pub fn update_or_insert_metric<'a>( )) }, } - // use prometheus::opts; - // use prometheus::GaugeVec; - - // let vec = GaugeVec::new(opts!("test", "test_help"), &["label"]).unwrap(); - // // vec.with_label_values(&["default"]).set(42.0); - // if registry.unregister(Box::new(vec)).is_err() { - // debug!("unregister failed"); - // }; - - // let vec = GaugeVec::new(opts!("test1", "test_help1"), &["label"]).unwrap(); - // vec.with_label_values(&["goood!"]).set(412.0); - // let _ = registry.register(Box::new(vec)); - // registry - // .gather() - // .iter_mut() - // .filter(|target| target.get_name() == metric_name.trim()) - // .for_each(|family| { - // // let prev: &mut GaugeVec = family.mut_metric()[0].mut_gauge(); - - // // GaugeVec:: - - // // info!("Metric `{}` was updated, new value - {}", metric_name, new); - // }); }, _ => { error!("Cannot register new metric `{}` due to {}", metric_name, er); @@ -172,8 +180,4 @@ pub fn update_or_insert_metric<'a>( }, } Ok(()) - // registry.gather() - // .iter() - // .filter(|fam| fam.get_name().) - } diff --git a/src/main.rs b/src/main.rs index e691d1f..8a237c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use axum::{ routing::{get, post}, Router}; use prometheus::Registry; -use std::sync::{Arc, Mutex}; +use std::{str::FromStr, sync::{Arc, Mutex}}; use endpoints::*; use tokio::net::TcpListener; use tracing::info; @@ -27,13 +27,22 @@ struct AppState { #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt() - .with_max_level(tracing::Level::DEBUG) - .init(); - - info!("Loading env vars from .env if exists ..."); dotenv().ok(); + let log_level = std::env::var("PROMETHEUS_EXPORTER_LOG_LEVEL") + .unwrap_or_else(|_| "INFO".to_owned()); + + tracing_subscriber::fmt() + .with_max_level(tracing::Level::from_str(&log_level).unwrap_or_else(|_| tracing::Level::INFO)) + .with_writer(std::io::stdout) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW) + .with_line_number(false) + .with_target(false) + .with_file(false) + .compact() + .init(); + info!("Logger was created and configurated, dotenv vars were loaded (if exist)"); + info!("Initializing local Prometehus metrics registry ..."); let registry = Registry::new(); @@ -49,10 +58,12 @@ async fn main() -> anyhow::Result<()> { .route("/update", post(update_metrics)) .with_state(state.clone()); - let bind_address = format!("0.0.0.0:{}", std::env::var("PROMETHEUS_EXPORTER_PORT").unwrap_or_else(|_| "9100".to_owned())); - let listener = TcpListener::bind(bind_address).await.unwrap(); + let port = std::env::var("PROMETHEUS_EXPORTER_PORT") + .unwrap_or_else(|_| "9100".to_owned()); + let bind_address = format!("0.0.0.0:{}", &port); + let listener = TcpListener::bind(bind_address).await?; - info!("Serving on ...:9100"); + info!("Serving on ...:{}", &port); axum::serve(listener, app).await?; Ok(()) } \ No newline at end of file diff --git a/src/metrics.rs b/src/metrics.rs index 87aa996..3376aab 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,9 +1,12 @@ +use std::sync::Arc; + use crate::structs::v3::MetricOutput; use serde_json::{Map, Value}; use prometheus::Gauge; use tracing::error; use prometheus::Opts; -use prometheus::GaugeVec; +use prometheus::{opts, GaugeVec, core::Collector}; +use tracing::{debug, trace}; #[derive(Debug)] pub enum MetricsValueType { @@ -18,20 +21,25 @@ pub struct MetricsProcesser; impl MetricsProcesser { - pub fn get_type_of_value(metrics: &MetricOutput) -> MetricsValueType { + trace!("defining metric type"); if Self::is_number(metrics) { + debug!("processing Number"); return MetricsValueType::Number; } else if Self::is_array(metrics) { if Self::is_tagged_array(metrics) { + debug!("processing TaggedArray"); return MetricsValueType::TaggedArray; } if Self::is_array_of_string_values(metrics) { + debug!("processing ArrayOfStrings"); return MetricsValueType::ArrayOfStrings; } + debug!("processing Array"); return MetricsValueType::Array; } + debug!("processing undefined type"); MetricsValueType::None } @@ -40,7 +48,15 @@ impl MetricsProcesser { metric: &MetricOutput, metric_name: &str, metric_desc: &str - ) -> Option { + ) -> Option> { + trace!("fn gauge_from_number is running"); + if let Some(status) = metric.status { + let vec = GaugeVec::new(opts!(metric_name, metric_desc), &["status"]).unwrap(); + vec.with_label_values(&[&status.to_string()]).set(metric.value.as_f64().unwrap_or_else(|| 0.0)); + debug!("processed metric: {:?}", &vec); + return Some(Box::new(vec)); + } + let gauge = Gauge::new( metric_name, metric_desc @@ -48,10 +64,6 @@ impl MetricsProcesser { match gauge { Ok(gauge) => { - // let value = metric.value.as_number().unwrap_or({ - // error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name); - // }); - // let value = value.as_f64() let val = match metric.value.as_number() { Some(val) => { val.as_f64().unwrap_or_else(|| @@ -64,11 +76,11 @@ impl MetricsProcesser { }, }; gauge.set(val); - return Some(gauge); + debug!("processed metric: {:?}", &gauge); + return Some(Box::new(gauge)); }, Err(er) => error!("Cannot create Gauge metric {} due to {}", &metric_name, er), } - None } pub fn gauge_from_map_metrics( @@ -111,11 +123,10 @@ impl MetricsProcesser { let gauge_vec = GaugeVec::new(opts, &[&label_name]); match gauge_vec { Ok(vec) => { - // vec.get_metric_with_label_values(vals) match vec.get_metric_with_label_values(&[&label_value]) { Ok(metric) => { - metric.set(metric_value); // Устанавливаем значение метрики - return Some(metric.clone()); // Возвращаем `Gauge` + metric.set(metric_value); + return Some(metric.clone()); }, Err(er) => { error!("Cannot create Gauge {} due to {}", &name, er); diff --git a/src/structs.rs b/src/structs.rs index b480f3a..a15eee7 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -17,6 +17,7 @@ pub mod v3 { pub value : Value, #[serde(rename = "description")] pub desc : Option>, + pub status: Option } #[derive(Serialize, Deserialize, Debug)]