fixed metrics export + getting it on exporter

pull/6/head
prplV 2025-02-14 14:06:13 +03:00
parent 32ab234236
commit 39ad05c21c
6 changed files with 136 additions and 67 deletions

View File

@ -1,4 +1,5 @@
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use integr_structs::api::v3::PrometheusMetrics;
use reqwest::Client;
use tokio_postgres::NoTls;
use std::env;
@ -51,16 +52,19 @@ impl Exporter {
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
pub async fn export_metrics(metrics: &str) -> Result<usize> {
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
// let req = Request::new(Method::PUT,
// Url::parse(metrics)?);
let req = Client::new()
.put(url)
.json(metrics)
.post(url)
.json(&metrics)
.send().await;
req?;
Ok(metrics.as_bytes().len())
// dbg!(&req);
// dbg!(&req.unwrap().text().await);
// todo : rewrite with status code wrapping
req?;
Ok(metrics.get_bytes_len())
}
}

View File

@ -142,21 +142,21 @@ impl<'a> ApiPoll<'a> {
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
let metrics = serde_json::to_string_pretty(&preproc)
.unwrap_or_else(|_| {
error!("Cannot parse grabbed metrics data to String");
String::from(r#"
{
"service_name" : null,
"endpoint_name" : null,
"value" : null
}
"#)
});
println!("{}", &metrics);
match Exporter::export_metrics(&metrics).await {
// let metrics = serde_json::to_string_pretty(&preproc)
// .unwrap_or_else(|_| {
// error!("Cannot parse grabbed metrics data to String");
// String::from(r#"
// {
// "service_name" : null,
// "endpoint_name" : null,
// "value" : null
// }
// "#)
// });
// println!("{}", &metrics);
match Exporter::export_metrics(preproc).await {
Ok(bytes) => {
info!("Successfully imported {} bytes of metrics data to Prometheus", bytes);
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);
},
Err(er) => {
error!("Failed to export data to Prometheus due to {}", er);

View File

@ -7,4 +7,11 @@ edition = "2021"
axum = "0.8.1"
prometheus = "0.13.4"
tokio = { version = "1.43.0", features = ["full"] }
integr-structs = {path = "../integr-structs"}
anyhow = "1.0.95"
chrono = "0.4.39"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.138"

View File

@ -0,0 +1,68 @@
use axum::{
body::Body, extract::{Json, Request, State}, http::{self, HeaderMap, Response as Resp}, response::{IntoResponse, Response}, routing::get, Router
};
use integr_structs::api::v3::PrometheusMetrics;
use prometheus::{core::{Collector, Metric}, proto::{Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder};
use std::sync::{Arc, Mutex};
use crate::AppState;
// use log::{warn, info, error};
use tracing::{info, error, warn, debug};
pub async fn update_metrics(
State(state): State<Arc<AppState>>,
Json(request) : Json<PrometheusMetrics>
) -> impl IntoResponse {
info!("post on /update");
// let resp = Response::new("body");
debug!("{:?}", &request);
// // resp
// let mut header_map = HeaderMap::new();
// header_map.insert(http::header::CONTENT_TYPE,
// "text/plain".parse().unwrap());
(http::StatusCode::ACCEPTED, "Ok")
}
pub async fn metrics_handler(State(state): State<Arc<AppState>>) -> String {
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
//
let mut fm = prometheus::proto::MetricFamily::new();
// prometheus::Registry::
let mut met = prometheus::proto::Metric::new();
let mutex_guard = state.sum.lock().unwrap();
met.set_summary(mutex_guard.clone());
// let metric = met.take_label();
fm.set_metric(vec![met].into());
fm.set_help("example summary".to_string());
fm.set_name("example_summary".to_string());
fm.set_field_type(prometheus::proto::MetricType::SUMMARY);
let mut metric_families = state.registry.gather();
metric_families.push(fm);
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
pub async fn increment_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut counter = state.counter.lock().unwrap();
counter.inc();
"Counter incremented"
}
pub async fn summary_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut sum = state.sum.lock().unwrap();
let qs = sum.get_quantile();
let new_qs: Vec<Quantile> = qs
.into_iter()
.enumerate()
.map(|(idx, q)| {
let mut q = q.to_owned();
q.set_value(q.get_value() + idx as f64);
// q.set_quantile(q.get_quantile() * 2.0);
q
})
.collect();
sum.set_quantile(new_qs.into());
"Summary changed"
}

View File

@ -1,7 +1,24 @@
use axum::{extract::State, routing::get, Router};
use prometheus::{proto::{Histogram, Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder};
mod endpoints;
// mod logger;
// use logger::setup_logger;
use axum::{
extract::State,
routing::{get, post},
Router};
use prometheus::{
core::{Collector, Metric},
proto::{Quantile, Summary},
Counter,
Encoder,
Opts,
Registry,
TextEncoder};
use std::sync::{Arc, Mutex};
use endpoints::*;
use tokio::net::TcpListener;
// use log::{warn, info, error};
use tracing::{info, error, warn, debug};
struct AppState {
registry: Registry,
@ -9,53 +26,9 @@ struct AppState {
sum : Mutex<Summary>,
}
async fn metrics_handler(State(state): State<Arc<AppState>>) -> String {
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
//
let mut fm = prometheus::proto::MetricFamily::new();
// prometheus::Registry::
let mut met = prometheus::proto::Metric::new();
let mutex_guard = state.sum.lock().unwrap();
met.set_summary(mutex_guard.clone());
// let metric = met.take_label();
fm.set_metric(vec![met].into());
fm.set_help("example summary".to_string());
fm.set_name("example_summary".to_string());
fm.set_field_type(prometheus::proto::MetricType::SUMMARY);
let mut metric_families = state.registry.gather();
metric_families.push(fm);
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
async fn increment_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut counter = state.counter.lock().unwrap();
counter.inc();
"Counter incremented"
}
async fn summary_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut sum = state.sum.lock().unwrap();
let qs = sum.get_quantile();
let new_qs: Vec<Quantile> = qs
.into_iter()
.enumerate()
.map(|(idx, q)| {
let mut q = q.to_owned();
q.set_value(q.get_value() + idx as f64);
// q.set_quantile(q.get_quantile() * 2.0);
q
})
.collect();
sum.set_quantile(new_qs.into());
"Summary changed"
}
#[tokio::main]
async fn main() {
// let _ = setup_logger().await;
let registry = Registry::new();
let counter_opts = Opts::new("example_counter", "Пример счётчика");
// let histogram_opts = Opts::new("example_histogram", "Пример histogram");
@ -79,6 +52,7 @@ async fn main() {
sunops.set_quantile(vq.into());
let counter = Counter::with_opts(counter_opts).unwrap();
// counter.desc()
registry.register(Box::new(counter.clone())).unwrap();
// registry.register(Box::new(prometheus::proto::MetricFamily::));
// registry.register(Box::new(sunops.clone())).unwrap();
@ -90,12 +64,22 @@ async fn main() {
sum : Mutex::new(sunops)
});
// info!("Configurating Web-Server...");
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
info!("Configurating Web-Server...");
let app = Router::new()
.route("/metrics", get(metrics_handler))
.route("/increment", get(increment_handler))
.route("/sum", get(summary_handler))
.route("/update", post(update_metrics))
.with_state(state.clone());
let listener = TcpListener::bind("0.0.0.0:9100").await.unwrap();
let listener = TcpListener::bind("127.0.0.1:9100").await.unwrap();
info!("Serving on ...:9100");
axum::serve(listener, app).await.unwrap();
}

View File

@ -266,5 +266,11 @@ pub mod v3 {
metrics: metrics
}
}
pub fn get_bytes_len(&self) -> usize {
let str_metrics = serde_json::to_vec(self).unwrap_or_else(
|_| Vec::new()
);
str_metrics.len()
}
}
}