diff --git a/crates/api-grub/src/export.rs b/crates/api-grub/src/export.rs index 84dfc93..4f183c7 100644 --- a/crates/api-grub/src/export.rs +++ b/crates/api-grub/src/export.rs @@ -1,4 +1,5 @@ use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; +use integr_structs::api::v3::PrometheusMetrics; use reqwest::Client; use tokio_postgres::NoTls; use std::env; @@ -51,16 +52,19 @@ impl Exporter { let _ = client.query(&query, &[&metrics]).await?; Ok(()) } - pub async fn export_metrics(metrics: &str) -> Result { + pub async fn export_metrics(metrics: PrometheusMetrics) -> Result { let url = env::var("EXPORTER_URL")?; // let req = Request::new(Method::PUT, // Url::parse(metrics)?); let req = Client::new() - .put(url) - .json(metrics) + .post(url) + .json(&metrics) .send().await; - req?; - Ok(metrics.as_bytes().len()) + // dbg!(&req); + // dbg!(&req.unwrap().text().await); + // todo : rewrite with status code wrapping + req?; + Ok(metrics.get_bytes_len()) } } \ No newline at end of file diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index 70057b9..a4326d3 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -142,21 +142,21 @@ impl<'a> ApiPoll<'a> { let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); - let metrics = serde_json::to_string_pretty(&preproc) - .unwrap_or_else(|_| { - error!("Cannot parse grabbed metrics data to String"); - String::from(r#" - { - "service_name" : null, - "endpoint_name" : null, - "value" : null - } - "#) - }); - println!("{}", &metrics); - match Exporter::export_metrics(&metrics).await { + // let metrics = serde_json::to_string_pretty(&preproc) + // .unwrap_or_else(|_| { + // error!("Cannot parse grabbed metrics data to String"); + // String::from(r#" + // { + // "service_name" : null, + // "endpoint_name" : null, + // "value" : null + // } + // "#) + // }); + // println!("{}", &metrics); + match Exporter::export_metrics(preproc).await { Ok(bytes) => { - info!("Successfully imported {} bytes of metrics data to Prometheus", bytes); + info!("Successfully exported {} bytes of metrics data to Prometheus", bytes); }, Err(er) => { error!("Failed to export data to Prometheus due to {}", er); diff --git a/crates/exporter/Cargo.toml b/crates/exporter/Cargo.toml index a3f0402..af3b5f0 100644 --- a/crates/exporter/Cargo.toml +++ b/crates/exporter/Cargo.toml @@ -7,4 +7,11 @@ edition = "2021" axum = "0.8.1" prometheus = "0.13.4" tokio = { version = "1.43.0", features = ["full"] } +integr-structs = {path = "../integr-structs"} +anyhow = "1.0.95" +chrono = "0.4.39" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.138" diff --git a/crates/exporter/src/endpoints.rs b/crates/exporter/src/endpoints.rs new file mode 100644 index 0000000..0deeb39 --- /dev/null +++ b/crates/exporter/src/endpoints.rs @@ -0,0 +1,68 @@ +use axum::{ + body::Body, extract::{Json, Request, State}, http::{self, HeaderMap, Response as Resp}, response::{IntoResponse, Response}, routing::get, Router +}; +use integr_structs::api::v3::PrometheusMetrics; +use prometheus::{core::{Collector, Metric}, proto::{Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder}; +use std::sync::{Arc, Mutex}; +use crate::AppState; +// use log::{warn, info, error}; +use tracing::{info, error, warn, debug}; + +pub async fn update_metrics( + State(state): State>, + Json(request) : Json +) -> impl IntoResponse { + info!("post on /update"); +// let resp = Response::new("body"); + debug!("{:?}", &request); +// // resp +// let mut header_map = HeaderMap::new(); +// header_map.insert(http::header::CONTENT_TYPE, +// "text/plain".parse().unwrap()); + + (http::StatusCode::ACCEPTED, "Ok") +} + +pub async fn metrics_handler(State(state): State>) -> String { + let encoder = TextEncoder::new(); + let mut buffer = Vec::new(); + // + let mut fm = prometheus::proto::MetricFamily::new(); + // prometheus::Registry:: + let mut met = prometheus::proto::Metric::new(); + let mutex_guard = state.sum.lock().unwrap(); + met.set_summary(mutex_guard.clone()); + // let metric = met.take_label(); + fm.set_metric(vec![met].into()); + fm.set_help("example summary".to_string()); + fm.set_name("example_summary".to_string()); + fm.set_field_type(prometheus::proto::MetricType::SUMMARY); + + let mut metric_families = state.registry.gather(); + metric_families.push(fm); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() +} + +pub async fn increment_handler(State(state): State>) -> &'static str { + let mut counter = state.counter.lock().unwrap(); + counter.inc(); + "Counter incremented" +} + +pub async fn summary_handler(State(state): State>) -> &'static str { + let mut sum = state.sum.lock().unwrap(); + let qs = sum.get_quantile(); + let new_qs: Vec = qs + .into_iter() + .enumerate() + .map(|(idx, q)| { + let mut q = q.to_owned(); + q.set_value(q.get_value() + idx as f64); + // q.set_quantile(q.get_quantile() * 2.0); + q + }) + .collect(); + sum.set_quantile(new_qs.into()); + "Summary changed" +} diff --git a/crates/exporter/src/main.rs b/crates/exporter/src/main.rs index 93bcb8b..d4b6086 100644 --- a/crates/exporter/src/main.rs +++ b/crates/exporter/src/main.rs @@ -1,7 +1,24 @@ -use axum::{extract::State, routing::get, Router}; -use prometheus::{proto::{Histogram, Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder}; +mod endpoints; +// mod logger; + +// use logger::setup_logger; +use axum::{ + extract::State, + routing::{get, post}, + Router}; +use prometheus::{ + core::{Collector, Metric}, + proto::{Quantile, Summary}, + Counter, + Encoder, + Opts, + Registry, + TextEncoder}; use std::sync::{Arc, Mutex}; +use endpoints::*; use tokio::net::TcpListener; +// use log::{warn, info, error}; +use tracing::{info, error, warn, debug}; struct AppState { registry: Registry, @@ -9,53 +26,9 @@ struct AppState { sum : Mutex, } -async fn metrics_handler(State(state): State>) -> String { - let encoder = TextEncoder::new(); - let mut buffer = Vec::new(); - // - let mut fm = prometheus::proto::MetricFamily::new(); - // prometheus::Registry:: - let mut met = prometheus::proto::Metric::new(); - let mutex_guard = state.sum.lock().unwrap(); - met.set_summary(mutex_guard.clone()); - // let metric = met.take_label(); - fm.set_metric(vec![met].into()); - fm.set_help("example summary".to_string()); - fm.set_name("example_summary".to_string()); - fm.set_field_type(prometheus::proto::MetricType::SUMMARY); - - let mut metric_families = state.registry.gather(); - metric_families.push(fm); - encoder.encode(&metric_families, &mut buffer).unwrap(); - String::from_utf8(buffer).unwrap() -} - -async fn increment_handler(State(state): State>) -> &'static str { - let mut counter = state.counter.lock().unwrap(); - counter.inc(); - "Counter incremented" -} - -async fn summary_handler(State(state): State>) -> &'static str { - let mut sum = state.sum.lock().unwrap(); - let qs = sum.get_quantile(); - let new_qs: Vec = qs - .into_iter() - .enumerate() - .map(|(idx, q)| { - let mut q = q.to_owned(); - q.set_value(q.get_value() + idx as f64); - // q.set_quantile(q.get_quantile() * 2.0); - q - }) - .collect(); - sum.set_quantile(new_qs.into()); - "Summary changed" -} - - #[tokio::main] async fn main() { + // let _ = setup_logger().await; let registry = Registry::new(); let counter_opts = Opts::new("example_counter", "Пример счётчика"); // let histogram_opts = Opts::new("example_histogram", "Пример histogram"); @@ -79,6 +52,7 @@ async fn main() { sunops.set_quantile(vq.into()); let counter = Counter::with_opts(counter_opts).unwrap(); + // counter.desc() registry.register(Box::new(counter.clone())).unwrap(); // registry.register(Box::new(prometheus::proto::MetricFamily::)); // registry.register(Box::new(sunops.clone())).unwrap(); @@ -90,12 +64,22 @@ async fn main() { sum : Mutex::new(sunops) }); + // info!("Configurating Web-Server..."); + + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .init(); + + info!("Configurating Web-Server..."); + let app = Router::new() .route("/metrics", get(metrics_handler)) .route("/increment", get(increment_handler)) .route("/sum", get(summary_handler)) + .route("/update", post(update_metrics)) .with_state(state.clone()); - let listener = TcpListener::bind("0.0.0.0:9100").await.unwrap(); + let listener = TcpListener::bind("127.0.0.1:9100").await.unwrap(); + info!("Serving on ...:9100"); axum::serve(listener, app).await.unwrap(); } \ No newline at end of file diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 260df18..f9ca124 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -266,5 +266,11 @@ pub mod v3 { metrics: metrics } } + pub fn get_bytes_len(&self) -> usize { + let str_metrics = serde_json::to_vec(self).unwrap_or_else( + |_| Vec::new() + ); + str_metrics.len() + } } } \ No newline at end of file