diff --git a/crates/api-grub/src/export.rs b/crates/api-grub/src/export.rs index 4f183c7..88dfaf4 100644 --- a/crates/api-grub/src/export.rs +++ b/crates/api-grub/src/export.rs @@ -56,6 +56,7 @@ impl Exporter { let url = env::var("EXPORTER_URL")?; // let req = Request::new(Method::PUT, // Url::parse(metrics)?); + dbg!(&metrics); let req = Client::new() .post(url) .json(&metrics) diff --git a/crates/api-grub/src/json.rs b/crates/api-grub/src/json.rs index 2cc3289..6208724 100644 --- a/crates/api-grub/src/json.rs +++ b/crates/api-grub/src/json.rs @@ -5,7 +5,7 @@ use integr_structs::api::v3::{Metric, MetricOutput}; pub struct JsonParser; impl JsonParser { - pub fn parse(targets: &Vec, json: &str) -> Value { + pub fn parse(targets: &Vec, json: &str) -> Vec { let mut res_vec: Vec = Vec::new(); for target in targets { let metric = match target.addr.contains("[") { @@ -14,7 +14,7 @@ impl JsonParser { }; res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, &target.addr, metric)); } - serde_json::to_value(res_vec).unwrap_or(Value::Null) + res_vec } fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value { if target.addr.is_empty() { diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index a4326d3..82887a3 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -140,20 +140,6 @@ impl<'a> ApiPoll<'a> { let preproc = JsonParser::parse(&metrics.measure, &response); // dbg!(serde_json::to_string_pretty(&preproc)); let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); - - - // let metrics = serde_json::to_string_pretty(&preproc) - // .unwrap_or_else(|_| { - // error!("Cannot parse grabbed metrics data to String"); - // String::from(r#" - // { - // "service_name" : null, - // "endpoint_name" : null, - // "value" : null - // } - // "#) - // }); - // println!("{}", &metrics); match Exporter::export_metrics(preproc).await { Ok(bytes) => { info!("Successfully exported {} bytes of metrics data to Prometheus", bytes); diff --git a/crates/exporter/src/endpoints.rs b/crates/exporter/src/endpoints.rs index 0deeb39..c76ecce 100644 --- a/crates/exporter/src/endpoints.rs +++ b/crates/exporter/src/endpoints.rs @@ -1,12 +1,15 @@ -use axum::{ - body::Body, extract::{Json, Request, State}, http::{self, HeaderMap, Response as Resp}, response::{IntoResponse, Response}, routing::get, Router +use axum::{ + extract::{Json, State}, + response::IntoResponse, + http }; use integr_structs::api::v3::PrometheusMetrics; -use prometheus::{core::{Collector, Metric}, proto::{Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder}; -use std::sync::{Arc, Mutex}; +use prometheus::{Encoder, Registry, TextEncoder, Gauge}; +use std::sync::{Arc, MutexGuard}; use crate::AppState; // use log::{warn, info, error}; -use tracing::{info, error, warn, debug}; +use tracing::{info, error, warn}; +use crate::metrics::{MetricsProcesser, MetricsValueType}; pub async fn update_metrics( State(state): State>, @@ -14,55 +17,86 @@ pub async fn update_metrics( ) -> impl IntoResponse { info!("post on /update"); // let resp = Response::new("body"); - debug!("{:?}", &request); -// // resp -// let mut header_map = HeaderMap::new(); -// header_map.insert(http::header::CONTENT_TYPE, -// "text/plain".parse().unwrap()); + // println!("{:?}", request); + // debug!("{:?}", MetricsProcesser::get_type_of_value(&request)); + let service = &request.service_name; + let endpoint = &request.endpoint_name; + + for i in request.metrics { + // debug!("{:?}", &i); + // debug!("{:?}", MetricsProcesser::get_type_of_value(&i)); + let metric_name = format!("{}_{}_{}", service, endpoint, &i.id); + match MetricsProcesser::get_type_of_value(&i) { + MetricsValueType::Array | + MetricsValueType::TaggedArray => { + + }, + MetricsValueType::Number => { + let gauge = MetricsProcesser::gauge_from_number( + &i, + &metric_name + ); + if let Some(gauge) = gauge { + match state.registry.lock() { + Err(er) => { + error!("Cannot lock Metric Registry due to {} ", er) + }, + Ok(registry) => { + update_or_insert_metric( + gauge, + registry, + &metric_name + ); + }, + } + } + // dbg!(gauge); + }, + MetricsValueType::ArrayOfStrings => { + warn!("String arrays are unsupported, ignoring ..."); + }, + _ => { + warn!("Unrecognized metric type was supplied, ignoring ..."); + } + } + } (http::StatusCode::ACCEPTED, "Ok") } pub async fn metrics_handler(State(state): State>) -> String { - let encoder = TextEncoder::new(); - let mut buffer = Vec::new(); - // - let mut fm = prometheus::proto::MetricFamily::new(); - // prometheus::Registry:: - let mut met = prometheus::proto::Metric::new(); - let mutex_guard = state.sum.lock().unwrap(); - met.set_summary(mutex_guard.clone()); - // let metric = met.take_label(); - fm.set_metric(vec![met].into()); - fm.set_help("example summary".to_string()); - fm.set_name("example_summary".to_string()); - fm.set_field_type(prometheus::proto::MetricType::SUMMARY); + let registry = state.registry.lock(); - let mut metric_families = state.registry.gather(); - metric_families.push(fm); - encoder.encode(&metric_families, &mut buffer).unwrap(); - String::from_utf8(buffer).unwrap() + return match registry { + Ok(registry) => { + let encoder = TextEncoder::new(); + let mut buffer = Vec::new(); + let metric_families = registry.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() + }, + Err(er) => { + format!("Cannot lock Metric Registry due to {} ", er) + } + } } -pub async fn increment_handler(State(state): State>) -> &'static str { - let mut counter = state.counter.lock().unwrap(); - counter.inc(); - "Counter incremented" -} - -pub async fn summary_handler(State(state): State>) -> &'static str { - let mut sum = state.sum.lock().unwrap(); - let qs = sum.get_quantile(); - let new_qs: Vec = qs - .into_iter() - .enumerate() - .map(|(idx, q)| { - let mut q = q.to_owned(); - q.set_value(q.get_value() + idx as f64); - // q.set_quantile(q.get_quantile() * 2.0); - q - }) - .collect(); - sum.set_quantile(new_qs.into()); - "Summary changed" +pub fn update_or_insert_metric<'a>( + metric: Gauge, + registry: MutexGuard<'a, Registry>, + metric_name: &str +) { + // let mut counter = 0; + match registry.register(Box::new(metric)) { + Ok(_) => { + info!("Metric `{}` was added!", metric_name); + }, + Err(_er) => { + // update + }, + } + // registry.gather() + // .iter() + // .filter(|fam| fam.get_name().) + } diff --git a/crates/exporter/src/main.rs b/crates/exporter/src/main.rs index d4b6086..7bc1b19 100644 --- a/crates/exporter/src/main.rs +++ b/crates/exporter/src/main.rs @@ -1,67 +1,60 @@ mod endpoints; // mod logger; +mod metrics; // use logger::setup_logger; use axum::{ - extract::State, routing::{get, post}, Router}; -use prometheus::{ - core::{Collector, Metric}, - proto::{Quantile, Summary}, - Counter, - Encoder, - Opts, - Registry, - TextEncoder}; +use prometheus::Registry; use std::sync::{Arc, Mutex}; use endpoints::*; use tokio::net::TcpListener; // use log::{warn, info, error}; -use tracing::{info, error, warn, debug}; +use tracing::info; struct AppState { - registry: Registry, - counter: Mutex, - sum : Mutex, + registry: Mutex, + // counter: Mutex, + // sum : Mutex, } #[tokio::main] async fn main() { // let _ = setup_logger().await; let registry = Registry::new(); - let counter_opts = Opts::new("example_counter", "Пример счётчика"); + // let counter_opts = Opts::new("example_counter", "Пример счётчика"); // let histogram_opts = Opts::new("example_histogram", "Пример histogram"); - use prometheus::proto::{Summary, Quantile}; + // use prometheus::proto::{Summary, Quantile}; // use prometheus::proto:: // let guage = prometheus::ProtobufEncoder::new(); - let mut sunops = Summary::new(); - let mut q1 = Quantile::new(); - let mut q2 = Quantile::new(); + // let mut sunops = Summary::new(); + // let mut q1 = Quantile::new(); + // let mut q2 = Quantile::new(); - q1.set_quantile(25.0); - q2.set_quantile(75.0); - // prometheus::proto::Metric:: + // q1.set_quantile(25.0); + // q2.set_quantile(75.0); + // // prometheus::proto::Metric:: - let vq = vec![q1, q2]; - sunops.set_quantile(vq.into()); + // let vq = vec![q1, q2]; + // sunops.set_quantile(vq.into()); - let counter = Counter::with_opts(counter_opts).unwrap(); - // counter.desc() - registry.register(Box::new(counter.clone())).unwrap(); + // let counter = Counter::with_opts(counter_opts).unwrap(); + // // counter.desc() + // registry.register(Box::new(counter.clone())).unwrap(); // registry.register(Box::new(prometheus::proto::MetricFamily::)); // registry.register(Box::new(sunops.clone())).unwrap(); let state = Arc::new(AppState { - registry, - counter: Mutex::new(counter), - sum : Mutex::new(sunops) + registry: Mutex::new(registry), + // counter: Mutex::new(counter), + // sum : Mutex::new(sunops) }); // info!("Configurating Web-Server..."); @@ -74,12 +67,12 @@ async fn main() { let app = Router::new() .route("/metrics", get(metrics_handler)) - .route("/increment", get(increment_handler)) - .route("/sum", get(summary_handler)) + // .route("/increment", get(increment_handler)) + // .route("/sum", get(summary_handler)) .route("/update", post(update_metrics)) .with_state(state.clone()); - let listener = TcpListener::bind("127.0.0.1:9100").await.unwrap(); + let listener = TcpListener::bind("0.0.0.0:9100").await.unwrap(); info!("Serving on ...:9100"); axum::serve(listener, app).await.unwrap(); } \ No newline at end of file diff --git a/crates/exporter/src/metrics.rs b/crates/exporter/src/metrics.rs new file mode 100644 index 0000000..5b94562 --- /dev/null +++ b/crates/exporter/src/metrics.rs @@ -0,0 +1,181 @@ +// use serde_json::; +use integr_structs::api::v3::MetricOutput; +use serde_json::{Map, Value}; +use prometheus::Gauge; +use tracing::error; + +#[derive(Debug)] +pub enum MetricsValueType { + Number, + Array, + TaggedArray, + ArrayOfStrings, + None, +} + +pub struct MetricsProcesser; + + +impl MetricsProcesser { + + pub fn get_type_of_value(metrics: &MetricOutput) -> MetricsValueType { + if Self::is_number(metrics) { + return MetricsValueType::Number; + } + else if Self::is_array(metrics) { + if Self::is_tagged_array(metrics) { + return MetricsValueType::TaggedArray; + } + if Self::is_array_of_string_values(metrics) { + return MetricsValueType::ArrayOfStrings; + } + return MetricsValueType::Array; + } + MetricsValueType::None + } + pub fn gauge_from_number( + metric: &MetricOutput, + metric_name: &str, + ) -> Option { + let gauge = Gauge::new( + metric_name, + &metric.id + ); + + match gauge { + Ok(gauge) => { + // let value = metric.value.as_number().unwrap_or({ + // error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name); + // }); + // let value = value.as_f64() + let val = match metric.value.as_number() { + Some(val) => { + val.as_f64().unwrap_or_else(|| + 0.0 + ) + }, + None => { + error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name); + 0.0 + }, + }; + gauge.set(val); + return Some(gauge); + }, + Err(er) => error!("Cannot create Gauge metric {} due to {}", &metric_name, er), + } + + None + } + pub fn gauge_from_map_metrics( + map: &Map, + service: &str, + endpoint: &str + ) -> Option { + let map = map.clone(); + let help: String = map.keys() + .enumerate() + .map(|(idx, key)| { + if idx == 1 { + return key.to_owned(); + } + "".to_owned() + }) + .collect(); + let name = format!("{}_{}_{}", service, endpoint, &help); + if map.len() > 1 { + // tagged + if map.len() > 2 { + error!("Cannot create Gauge {}. It can be only 1 tag", &name); + } else { + let mut label_name = String::new(); + let mut label_value = String::new(); + let mut metric_value = 0.0; + map.iter() + .enumerate() + .for_each(|(idx, (key, value))|{ + if idx == 0 { + label_name = key.to_owned(); + label_value = value.as_str() + .unwrap_or("") + .to_owned(); + } else { + metric_value = value.as_f64().unwrap_or(0.0) + } + }); + use prometheus::Opts; + use prometheus::GaugeVec; + + let opts = Opts::new(&name, &help); + let gauge_vec = GaugeVec::new(opts, &[&label_name]); + match gauge_vec { + Ok(vec) => { + // vec.get_metric_with_label_values(vals) + match vec.get_metric_with_label_values(&[&label_value]) { + Ok(metric) => { + metric.set(metric_value); // Устанавливаем значение метрики + return Some(metric.clone()); // Возвращаем `Gauge` + }, + Err(er) => { + error!("Cannot create Gauge {} due to {}", &name, er); + }, + } + }, + Err(er) => error!("Cannot create Gauge {} due to {}", &name, er), + } + } + } else { + // not-tagged + let metric = Gauge::new(&name, &help); + match metric { + Ok(gauge) => { + let mut value = 0.0; + map.values() + .map(|val| val.clone().as_f64()) + .for_each(|val| { + value = val.unwrap_or(0.0) + }); + gauge.set(value); + return Some(gauge); + }, + Err(er) => { + error!("Cannot create Gauge {} due to {}", &name, er); + } + } + } + None + } + pub fn get_value_as_vec_map(metrics: &MetricOutput) -> Vec>{ + let mut vec: Vec> = Vec::new(); + let arr = metrics.value.as_array().unwrap(); + arr.iter() + .for_each(|a| { + vec.push(serde_json::from_value(a.clone()).unwrap()); + }); + vec + } + pub fn is_array_of_string_values(metrics: &MetricOutput) -> bool { + let arr = metrics.value.clone(); + let arr = arr.as_array().unwrap(); + let map: Map = serde_json::from_value( + arr[0].clone() + ).unwrap(); + map.values() + .all(|val| val.is_string()) + } + // fn is_valid(metrics: &PrometheusMetrics) -> bool { + // false + // } + fn is_array(metrics: &MetricOutput) -> bool { + metrics.value.is_array() + } + fn is_tagged_array(metrics: &MetricOutput) -> bool { + let arr = metrics.value.as_array().unwrap(); + let map: Map = serde_json::from_value(arr[0].clone()).unwrap(); + map.len() > 1 + } + fn is_number(metrics: &MetricOutput) -> bool { + metrics + .value.is_number() + } +} \ No newline at end of file diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index f9ca124..4a824f0 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -235,11 +235,11 @@ pub mod v3 { // to prometheus and nmns #[derive(Serialize, Deserialize, Debug)] pub struct MetricOutput { - id : String, + pub id : String, #[serde(rename = "type")] json_type : String, addr : String, - value : Value, + pub value : Value, } impl MetricOutput { pub fn new_with_slices(id : &str, json_type : &str, addr: &str,value : Value) -> Self { @@ -254,12 +254,12 @@ pub mod v3 { #[derive(Serialize, Deserialize, Debug)] pub struct PrometheusMetrics { - service_name: String, - endpoint_name: String, - metrics: Value, + pub service_name: String, + pub endpoint_name: String, + pub metrics: Vec, } impl PrometheusMetrics { - pub fn new(service: &str, endpoint: &str, metrics: Value) -> Self { + pub fn new(service: &str, endpoint: &str, metrics: Vec) -> Self { Self { service_name: service.to_string(), endpoint_name: endpoint.to_string(),