Compare commits

...

2 Commits

Author SHA1 Message Date
prplV 39ad05c21c fixed metrics export + getting it on exporter 2025-02-14 14:06:13 +03:00
prplV 32ab234236 api grub set up 2025-02-14 12:32:30 +03:00
7 changed files with 150 additions and 142 deletions

View File

@ -6,7 +6,7 @@
"pass" : "", "pass" : "",
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711", "api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
"period" : "", "period" : "",
"timeout" : "10", "timeout" : "5",
"metrics" : [ "metrics" : [
{ {
"name": "conferences", "name": "conferences",

View File

@ -1,4 +1,5 @@
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use integr_structs::api::v3::PrometheusMetrics;
use reqwest::Client; use reqwest::Client;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use std::env; use std::env;
@ -51,16 +52,19 @@ impl Exporter {
let _ = client.query(&query, &[&metrics]).await?; let _ = client.query(&query, &[&metrics]).await?;
Ok(()) Ok(())
} }
pub async fn export_metrics(metrics: &str) -> Result<()> { pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?; let url = env::var("EXPORTER_URL")?;
// let req = Request::new(Method::PUT, // let req = Request::new(Method::PUT,
// Url::parse(metrics)?); // Url::parse(metrics)?);
let req = Client::new() let req = Client::new()
.put(url) .post(url)
.json(metrics) .json(&metrics)
.send().await; .send().await;
req?; // dbg!(&req);
Ok(()) // dbg!(&req.unwrap().text().await);
// todo : rewrite with status code wrapping
req?;
Ok(metrics.get_bytes_len())
} }
} }

View File

@ -12,7 +12,7 @@ use tokio::task::JoinHandle;
use dotenv::dotenv; use dotenv::dotenv;
use crate::json::JsonParser; use crate::json::JsonParser;
use crate::export::{self, Exporter}; use crate::export::{self, Exporter};
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics}; use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
// type BufferType = Arc<Mutex<Vec<String>>>; // type BufferType = Arc<Mutex<Vec<String>>>;
@ -139,15 +139,24 @@ impl<'a> ApiPoll<'a> {
let endpoint_name = &metrics.name; let endpoint_name = &metrics.name;
let preproc = JsonParser::parse(&metrics.measure, &response); let preproc = JsonParser::parse(&metrics.measure, &response);
// dbg!(serde_json::to_string_pretty(&preproc)); // dbg!(serde_json::to_string_pretty(&preproc));
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
let metrics = serde_json::to_string_pretty(&preproc)
.unwrap_or_else(|_| { // let metrics = serde_json::to_string_pretty(&preproc)
error!("Cannot parse grabbed metrics data to String"); // .unwrap_or_else(|_| {
String::from(r#""value" : null"#) // error!("Cannot parse grabbed metrics data to String");
}); // String::from(r#"
match Exporter::export_metrics(&metrics).await { // {
Ok(_) => { // "service_name" : null,
info!("Successfully imported metrics data to Prometheus"); // "endpoint_name" : null,
// "value" : null
// }
// "#)
// });
// println!("{}", &metrics);
match Exporter::export_metrics(preproc).await {
Ok(bytes) => {
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);
}, },
Err(er) => { Err(er) => {
error!("Failed to export data to Prometheus due to {}", er); error!("Failed to export data to Prometheus due to {}", er);
@ -184,7 +193,6 @@ impl<'a> ApiPoll<'a> {
let mut jh = Vec::<JoinHandle::<Result<()>>>::new(); let mut jh = Vec::<JoinHandle::<Result<()>>>::new();
for idx in 0..metrics.len() { for idx in 0..metrics.len() {
// let exporter = exporter.clone();
let creds = creds.clone(); let creds = creds.clone();
let metrics = metrics.clone(); let metrics = metrics.clone();
let service_id = service_id.clone(); let service_id = service_id.clone();
@ -193,7 +201,6 @@ impl<'a> ApiPoll<'a> {
service_id.clone(), service_id.clone(),
metrics[idx].clone().into(), metrics[idx].clone().into(),
creds.clone(), creds.clone(),
// exporter.clone()
).await ).await
}); });
jh.push(event); jh.push(event);
@ -239,82 +246,6 @@ impl<'a> ApiPoll<'a> {
for i in join_handles { for i in join_handles {
let _ = i.await; let _ = i.await;
} }
// let template = Arc::new(self.config.template.clone());
// if self.is_default().await { return Err(Error::msg("Default config with no endpoints")) }
// // TODO: rewrite nextly to async
// for point in template.iter() {
// let point = Arc::new(point.clone());
// // let buffer = buffer.clone();
// let client = client.clone();
// let exporter = exporter.clone();
// let endpoint_processer = tokio::spawn(async move {
// let point = point.clone();
// match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
// Ok(resp) => {
// if !resp.status().is_success() {
// error!("ErrorCode in Response from API. Check configuration");
// return Err(Error::msg("Error during sending request"));
// }
// if let Ok(text) = resp.text().await {
// //
// let metrics = ProcessedEndpoint::from_target_response(&text, &point)?;
// // dbg!(&metrics);
// println!("{}", &metrics);
// //
// if let Some(conn) = exporter.get_connection_from_pool().await {
// // TEST: to exporter
// let res = client.request(
// RestMethod::from_str("post").await,
// "http://192.168.2.34:9101/update")
// .json(&metrics)
// .send().await;
// if let Err(er) = res {
// error!("Cannot send data to exporter due to: {}", er);
// } else {
// println!("{:?}", res.unwrap().text().await);
// }
// if let Err(er) = Exporter::export_data(conn, &metrics).await {
// error!("Cannot export data to DB during to: {}", er);
// return Err(Error::msg("Error during exporting data to DB"));
// }
// } else {
// if !exporter.is_no_connection() {
// return Err(Error::msg("Error during getting connection from pool"));
// }
// }
// // let mut buffer = buffer.lock().await;
// // buffer.push(text);
// } else {
// error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
// return Err(Error::msg("Error with extracting text field from Response"));
// }
// },
// Err(_) => {
// error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url);
// return Err(Error::msg("Endpoint is unreachable"));
// },
// }
// Ok(())
// });
// join_handles.push(endpoint_processer);
// }
// for i in join_handles {
// let _ = i.await;
// }
// // let buffer = buffer.lock().await;
// // match &buffer.len() {
// // 0 => Err(Error::msg("Error due to API grubbing. Check config" )),
// // _ => {
// // Ok(())
// // },
// // }
Ok(()) Ok(())
} }
} }

View File

@ -7,4 +7,11 @@ edition = "2021"
axum = "0.8.1" axum = "0.8.1"
prometheus = "0.13.4" prometheus = "0.13.4"
tokio = { version = "1.43.0", features = ["full"] } tokio = { version = "1.43.0", features = ["full"] }
integr-structs = {path = "../integr-structs"}
anyhow = "1.0.95"
chrono = "0.4.39"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.138"

View File

@ -0,0 +1,68 @@
use axum::{
body::Body, extract::{Json, Request, State}, http::{self, HeaderMap, Response as Resp}, response::{IntoResponse, Response}, routing::get, Router
};
use integr_structs::api::v3::PrometheusMetrics;
use prometheus::{core::{Collector, Metric}, proto::{Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder};
use std::sync::{Arc, Mutex};
use crate::AppState;
// use log::{warn, info, error};
use tracing::{info, error, warn, debug};
pub async fn update_metrics(
State(state): State<Arc<AppState>>,
Json(request) : Json<PrometheusMetrics>
) -> impl IntoResponse {
info!("post on /update");
// let resp = Response::new("body");
debug!("{:?}", &request);
// // resp
// let mut header_map = HeaderMap::new();
// header_map.insert(http::header::CONTENT_TYPE,
// "text/plain".parse().unwrap());
(http::StatusCode::ACCEPTED, "Ok")
}
pub async fn metrics_handler(State(state): State<Arc<AppState>>) -> String {
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
//
let mut fm = prometheus::proto::MetricFamily::new();
// prometheus::Registry::
let mut met = prometheus::proto::Metric::new();
let mutex_guard = state.sum.lock().unwrap();
met.set_summary(mutex_guard.clone());
// let metric = met.take_label();
fm.set_metric(vec![met].into());
fm.set_help("example summary".to_string());
fm.set_name("example_summary".to_string());
fm.set_field_type(prometheus::proto::MetricType::SUMMARY);
let mut metric_families = state.registry.gather();
metric_families.push(fm);
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
pub async fn increment_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut counter = state.counter.lock().unwrap();
counter.inc();
"Counter incremented"
}
pub async fn summary_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut sum = state.sum.lock().unwrap();
let qs = sum.get_quantile();
let new_qs: Vec<Quantile> = qs
.into_iter()
.enumerate()
.map(|(idx, q)| {
let mut q = q.to_owned();
q.set_value(q.get_value() + idx as f64);
// q.set_quantile(q.get_quantile() * 2.0);
q
})
.collect();
sum.set_quantile(new_qs.into());
"Summary changed"
}

View File

@ -1,7 +1,24 @@
use axum::{extract::State, routing::get, Router}; mod endpoints;
use prometheus::{proto::{Histogram, Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder}; // mod logger;
// use logger::setup_logger;
use axum::{
extract::State,
routing::{get, post},
Router};
use prometheus::{
core::{Collector, Metric},
proto::{Quantile, Summary},
Counter,
Encoder,
Opts,
Registry,
TextEncoder};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use endpoints::*;
use tokio::net::TcpListener; use tokio::net::TcpListener;
// use log::{warn, info, error};
use tracing::{info, error, warn, debug};
struct AppState { struct AppState {
registry: Registry, registry: Registry,
@ -9,53 +26,9 @@ struct AppState {
sum : Mutex<Summary>, sum : Mutex<Summary>,
} }
async fn metrics_handler(State(state): State<Arc<AppState>>) -> String {
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
//
let mut fm = prometheus::proto::MetricFamily::new();
// prometheus::Registry::
let mut met = prometheus::proto::Metric::new();
let mutex_guard = state.sum.lock().unwrap();
met.set_summary(mutex_guard.clone());
// let metric = met.take_label();
fm.set_metric(vec![met].into());
fm.set_help("example summary".to_string());
fm.set_name("example_summary".to_string());
fm.set_field_type(prometheus::proto::MetricType::SUMMARY);
let mut metric_families = state.registry.gather();
metric_families.push(fm);
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
async fn increment_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut counter = state.counter.lock().unwrap();
counter.inc();
"Counter incremented"
}
async fn summary_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut sum = state.sum.lock().unwrap();
let qs = sum.get_quantile();
let new_qs: Vec<Quantile> = qs
.into_iter()
.enumerate()
.map(|(idx, q)| {
let mut q = q.to_owned();
q.set_value(q.get_value() + idx as f64);
// q.set_quantile(q.get_quantile() * 2.0);
q
})
.collect();
sum.set_quantile(new_qs.into());
"Summary changed"
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// let _ = setup_logger().await;
let registry = Registry::new(); let registry = Registry::new();
let counter_opts = Opts::new("example_counter", "Пример счётчика"); let counter_opts = Opts::new("example_counter", "Пример счётчика");
// let histogram_opts = Opts::new("example_histogram", "Пример histogram"); // let histogram_opts = Opts::new("example_histogram", "Пример histogram");
@ -79,6 +52,7 @@ async fn main() {
sunops.set_quantile(vq.into()); sunops.set_quantile(vq.into());
let counter = Counter::with_opts(counter_opts).unwrap(); let counter = Counter::with_opts(counter_opts).unwrap();
// counter.desc()
registry.register(Box::new(counter.clone())).unwrap(); registry.register(Box::new(counter.clone())).unwrap();
// registry.register(Box::new(prometheus::proto::MetricFamily::)); // registry.register(Box::new(prometheus::proto::MetricFamily::));
// registry.register(Box::new(sunops.clone())).unwrap(); // registry.register(Box::new(sunops.clone())).unwrap();
@ -90,12 +64,22 @@ async fn main() {
sum : Mutex::new(sunops) sum : Mutex::new(sunops)
}); });
// info!("Configurating Web-Server...");
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
info!("Configurating Web-Server...");
let app = Router::new() let app = Router::new()
.route("/metrics", get(metrics_handler)) .route("/metrics", get(metrics_handler))
.route("/increment", get(increment_handler)) .route("/increment", get(increment_handler))
.route("/sum", get(summary_handler)) .route("/sum", get(summary_handler))
.route("/update", post(update_metrics))
.with_state(state.clone()); .with_state(state.clone());
let listener = TcpListener::bind("0.0.0.0:9100").await.unwrap(); let listener = TcpListener::bind("127.0.0.1:9100").await.unwrap();
info!("Serving on ...:9100");
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }

View File

@ -258,5 +258,19 @@ pub mod v3 {
endpoint_name: String, endpoint_name: String,
metrics: Value, metrics: Value,
} }
impl PrometheusMetrics {
pub fn new(service: &str, endpoint: &str, metrics: Value) -> Self {
Self {
service_name: service.to_string(),
endpoint_name: endpoint.to_string(),
metrics: metrics
}
}
pub fn get_bytes_len(&self) -> usize {
let str_metrics = serde_json::to_vec(self).unwrap_or_else(
|_| Vec::new()
);
str_metrics.len()
}
}
} }