Compare commits
2 Commits
fe7aaaaef3
...
39ad05c21c
| Author | SHA1 | Date |
|---|---|---|
|
|
39ad05c21c | |
|
|
32ab234236 |
|
|
@ -6,7 +6,7 @@
|
||||||
"pass" : "",
|
"pass" : "",
|
||||||
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
||||||
"period" : "",
|
"period" : "",
|
||||||
"timeout" : "10",
|
"timeout" : "5",
|
||||||
"metrics" : [
|
"metrics" : [
|
||||||
{
|
{
|
||||||
"name": "conferences",
|
"name": "conferences",
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
|
||||||
|
use integr_structs::api::v3::PrometheusMetrics;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use tokio_postgres::NoTls;
|
use tokio_postgres::NoTls;
|
||||||
use std::env;
|
use std::env;
|
||||||
|
|
@ -51,16 +52,19 @@ impl Exporter {
|
||||||
let _ = client.query(&query, &[&metrics]).await?;
|
let _ = client.query(&query, &[&metrics]).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn export_metrics(metrics: &str) -> Result<()> {
|
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
// let req = Request::new(Method::PUT,
|
// let req = Request::new(Method::PUT,
|
||||||
// Url::parse(metrics)?);
|
// Url::parse(metrics)?);
|
||||||
let req = Client::new()
|
let req = Client::new()
|
||||||
.put(url)
|
.post(url)
|
||||||
.json(metrics)
|
.json(&metrics)
|
||||||
.send().await;
|
.send().await;
|
||||||
req?;
|
// dbg!(&req);
|
||||||
Ok(())
|
// dbg!(&req.unwrap().text().await);
|
||||||
|
// todo : rewrite with status code wrapping
|
||||||
|
req?;
|
||||||
|
Ok(metrics.get_bytes_len())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -12,7 +12,7 @@ use tokio::task::JoinHandle;
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use crate::json::JsonParser;
|
use crate::json::JsonParser;
|
||||||
use crate::export::{self, Exporter};
|
use crate::export::{self, Exporter};
|
||||||
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics};
|
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
|
||||||
|
|
||||||
// type BufferType = Arc<Mutex<Vec<String>>>;
|
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||||
|
|
||||||
|
|
@ -139,15 +139,24 @@ impl<'a> ApiPoll<'a> {
|
||||||
let endpoint_name = &metrics.name;
|
let endpoint_name = &metrics.name;
|
||||||
let preproc = JsonParser::parse(&metrics.measure, &response);
|
let preproc = JsonParser::parse(&metrics.measure, &response);
|
||||||
// dbg!(serde_json::to_string_pretty(&preproc));
|
// dbg!(serde_json::to_string_pretty(&preproc));
|
||||||
|
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
||||||
|
|
||||||
let metrics = serde_json::to_string_pretty(&preproc)
|
|
||||||
.unwrap_or_else(|_| {
|
// let metrics = serde_json::to_string_pretty(&preproc)
|
||||||
error!("Cannot parse grabbed metrics data to String");
|
// .unwrap_or_else(|_| {
|
||||||
String::from(r#""value" : null"#)
|
// error!("Cannot parse grabbed metrics data to String");
|
||||||
});
|
// String::from(r#"
|
||||||
match Exporter::export_metrics(&metrics).await {
|
// {
|
||||||
Ok(_) => {
|
// "service_name" : null,
|
||||||
info!("Successfully imported metrics data to Prometheus");
|
// "endpoint_name" : null,
|
||||||
|
// "value" : null
|
||||||
|
// }
|
||||||
|
// "#)
|
||||||
|
// });
|
||||||
|
// println!("{}", &metrics);
|
||||||
|
match Exporter::export_metrics(preproc).await {
|
||||||
|
Ok(bytes) => {
|
||||||
|
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);
|
||||||
},
|
},
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
error!("Failed to export data to Prometheus due to {}", er);
|
error!("Failed to export data to Prometheus due to {}", er);
|
||||||
|
|
@ -184,7 +193,6 @@ impl<'a> ApiPoll<'a> {
|
||||||
let mut jh = Vec::<JoinHandle::<Result<()>>>::new();
|
let mut jh = Vec::<JoinHandle::<Result<()>>>::new();
|
||||||
|
|
||||||
for idx in 0..metrics.len() {
|
for idx in 0..metrics.len() {
|
||||||
// let exporter = exporter.clone();
|
|
||||||
let creds = creds.clone();
|
let creds = creds.clone();
|
||||||
let metrics = metrics.clone();
|
let metrics = metrics.clone();
|
||||||
let service_id = service_id.clone();
|
let service_id = service_id.clone();
|
||||||
|
|
@ -193,7 +201,6 @@ impl<'a> ApiPoll<'a> {
|
||||||
service_id.clone(),
|
service_id.clone(),
|
||||||
metrics[idx].clone().into(),
|
metrics[idx].clone().into(),
|
||||||
creds.clone(),
|
creds.clone(),
|
||||||
// exporter.clone()
|
|
||||||
).await
|
).await
|
||||||
});
|
});
|
||||||
jh.push(event);
|
jh.push(event);
|
||||||
|
|
@ -239,82 +246,6 @@ impl<'a> ApiPoll<'a> {
|
||||||
for i in join_handles {
|
for i in join_handles {
|
||||||
let _ = i.await;
|
let _ = i.await;
|
||||||
}
|
}
|
||||||
// let template = Arc::new(self.config.template.clone());
|
|
||||||
|
|
||||||
// if self.is_default().await { return Err(Error::msg("Default config with no endpoints")) }
|
|
||||||
|
|
||||||
// // TODO: rewrite nextly to async
|
|
||||||
// for point in template.iter() {
|
|
||||||
// let point = Arc::new(point.clone());
|
|
||||||
// // let buffer = buffer.clone();
|
|
||||||
// let client = client.clone();
|
|
||||||
// let exporter = exporter.clone();
|
|
||||||
// let endpoint_processer = tokio::spawn(async move {
|
|
||||||
// let point = point.clone();
|
|
||||||
// match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
|
|
||||||
// Ok(resp) => {
|
|
||||||
// if !resp.status().is_success() {
|
|
||||||
// error!("ErrorCode in Response from API. Check configuration");
|
|
||||||
// return Err(Error::msg("Error during sending request"));
|
|
||||||
// }
|
|
||||||
// if let Ok(text) = resp.text().await {
|
|
||||||
// //
|
|
||||||
// let metrics = ProcessedEndpoint::from_target_response(&text, &point)?;
|
|
||||||
// // dbg!(&metrics);
|
|
||||||
// println!("{}", &metrics);
|
|
||||||
// //
|
|
||||||
// if let Some(conn) = exporter.get_connection_from_pool().await {
|
|
||||||
|
|
||||||
// // TEST: to exporter
|
|
||||||
// let res = client.request(
|
|
||||||
// RestMethod::from_str("post").await,
|
|
||||||
// "http://192.168.2.34:9101/update")
|
|
||||||
// .json(&metrics)
|
|
||||||
// .send().await;
|
|
||||||
// if let Err(er) = res {
|
|
||||||
// error!("Cannot send data to exporter due to: {}", er);
|
|
||||||
// } else {
|
|
||||||
// println!("{:?}", res.unwrap().text().await);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if let Err(er) = Exporter::export_data(conn, &metrics).await {
|
|
||||||
// error!("Cannot export data to DB during to: {}", er);
|
|
||||||
// return Err(Error::msg("Error during exporting data to DB"));
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// if !exporter.is_no_connection() {
|
|
||||||
// return Err(Error::msg("Error during getting connection from pool"));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // let mut buffer = buffer.lock().await;
|
|
||||||
// // buffer.push(text);
|
|
||||||
// } else {
|
|
||||||
// error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
|
|
||||||
// return Err(Error::msg("Error with extracting text field from Response"));
|
|
||||||
// }
|
|
||||||
// },
|
|
||||||
// Err(_) => {
|
|
||||||
// error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url);
|
|
||||||
// return Err(Error::msg("Endpoint is unreachable"));
|
|
||||||
// },
|
|
||||||
// }
|
|
||||||
// Ok(())
|
|
||||||
// });
|
|
||||||
// join_handles.push(endpoint_processer);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// for i in join_handles {
|
|
||||||
// let _ = i.await;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // let buffer = buffer.lock().await;
|
|
||||||
// // match &buffer.len() {
|
|
||||||
// // 0 => Err(Error::msg("Error due to API grubbing. Check config" )),
|
|
||||||
// // _ => {
|
|
||||||
// // Ok(())
|
|
||||||
// // },
|
|
||||||
// // }
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,4 +7,11 @@ edition = "2021"
|
||||||
axum = "0.8.1"
|
axum = "0.8.1"
|
||||||
prometheus = "0.13.4"
|
prometheus = "0.13.4"
|
||||||
tokio = { version = "1.43.0", features = ["full"] }
|
tokio = { version = "1.43.0", features = ["full"] }
|
||||||
|
integr-structs = {path = "../integr-structs"}
|
||||||
|
anyhow = "1.0.95"
|
||||||
|
chrono = "0.4.39"
|
||||||
|
tracing = "0.1.41"
|
||||||
|
tracing-subscriber = "0.3.19"
|
||||||
|
serde = { version = "1.0.217", features = ["derive"] }
|
||||||
|
serde_json = "1.0.138"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,68 @@
|
||||||
|
use axum::{
|
||||||
|
body::Body, extract::{Json, Request, State}, http::{self, HeaderMap, Response as Resp}, response::{IntoResponse, Response}, routing::get, Router
|
||||||
|
};
|
||||||
|
use integr_structs::api::v3::PrometheusMetrics;
|
||||||
|
use prometheus::{core::{Collector, Metric}, proto::{Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use crate::AppState;
|
||||||
|
// use log::{warn, info, error};
|
||||||
|
use tracing::{info, error, warn, debug};
|
||||||
|
|
||||||
|
pub async fn update_metrics(
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
Json(request) : Json<PrometheusMetrics>
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
info!("post on /update");
|
||||||
|
// let resp = Response::new("body");
|
||||||
|
debug!("{:?}", &request);
|
||||||
|
// // resp
|
||||||
|
// let mut header_map = HeaderMap::new();
|
||||||
|
// header_map.insert(http::header::CONTENT_TYPE,
|
||||||
|
// "text/plain".parse().unwrap());
|
||||||
|
|
||||||
|
(http::StatusCode::ACCEPTED, "Ok")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn metrics_handler(State(state): State<Arc<AppState>>) -> String {
|
||||||
|
let encoder = TextEncoder::new();
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
//
|
||||||
|
let mut fm = prometheus::proto::MetricFamily::new();
|
||||||
|
// prometheus::Registry::
|
||||||
|
let mut met = prometheus::proto::Metric::new();
|
||||||
|
let mutex_guard = state.sum.lock().unwrap();
|
||||||
|
met.set_summary(mutex_guard.clone());
|
||||||
|
// let metric = met.take_label();
|
||||||
|
fm.set_metric(vec![met].into());
|
||||||
|
fm.set_help("example summary".to_string());
|
||||||
|
fm.set_name("example_summary".to_string());
|
||||||
|
fm.set_field_type(prometheus::proto::MetricType::SUMMARY);
|
||||||
|
|
||||||
|
let mut metric_families = state.registry.gather();
|
||||||
|
metric_families.push(fm);
|
||||||
|
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||||
|
String::from_utf8(buffer).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn increment_handler(State(state): State<Arc<AppState>>) -> &'static str {
|
||||||
|
let mut counter = state.counter.lock().unwrap();
|
||||||
|
counter.inc();
|
||||||
|
"Counter incremented"
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn summary_handler(State(state): State<Arc<AppState>>) -> &'static str {
|
||||||
|
let mut sum = state.sum.lock().unwrap();
|
||||||
|
let qs = sum.get_quantile();
|
||||||
|
let new_qs: Vec<Quantile> = qs
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(idx, q)| {
|
||||||
|
let mut q = q.to_owned();
|
||||||
|
q.set_value(q.get_value() + idx as f64);
|
||||||
|
// q.set_quantile(q.get_quantile() * 2.0);
|
||||||
|
q
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
sum.set_quantile(new_qs.into());
|
||||||
|
"Summary changed"
|
||||||
|
}
|
||||||
|
|
@ -1,7 +1,24 @@
|
||||||
use axum::{extract::State, routing::get, Router};
|
mod endpoints;
|
||||||
use prometheus::{proto::{Histogram, Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder};
|
// mod logger;
|
||||||
|
|
||||||
|
// use logger::setup_logger;
|
||||||
|
use axum::{
|
||||||
|
extract::State,
|
||||||
|
routing::{get, post},
|
||||||
|
Router};
|
||||||
|
use prometheus::{
|
||||||
|
core::{Collector, Metric},
|
||||||
|
proto::{Quantile, Summary},
|
||||||
|
Counter,
|
||||||
|
Encoder,
|
||||||
|
Opts,
|
||||||
|
Registry,
|
||||||
|
TextEncoder};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
use endpoints::*;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
// use log::{warn, info, error};
|
||||||
|
use tracing::{info, error, warn, debug};
|
||||||
|
|
||||||
struct AppState {
|
struct AppState {
|
||||||
registry: Registry,
|
registry: Registry,
|
||||||
|
|
@ -9,53 +26,9 @@ struct AppState {
|
||||||
sum : Mutex<Summary>,
|
sum : Mutex<Summary>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn metrics_handler(State(state): State<Arc<AppState>>) -> String {
|
|
||||||
let encoder = TextEncoder::new();
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
//
|
|
||||||
let mut fm = prometheus::proto::MetricFamily::new();
|
|
||||||
// prometheus::Registry::
|
|
||||||
let mut met = prometheus::proto::Metric::new();
|
|
||||||
let mutex_guard = state.sum.lock().unwrap();
|
|
||||||
met.set_summary(mutex_guard.clone());
|
|
||||||
// let metric = met.take_label();
|
|
||||||
fm.set_metric(vec![met].into());
|
|
||||||
fm.set_help("example summary".to_string());
|
|
||||||
fm.set_name("example_summary".to_string());
|
|
||||||
fm.set_field_type(prometheus::proto::MetricType::SUMMARY);
|
|
||||||
|
|
||||||
let mut metric_families = state.registry.gather();
|
|
||||||
metric_families.push(fm);
|
|
||||||
encoder.encode(&metric_families, &mut buffer).unwrap();
|
|
||||||
String::from_utf8(buffer).unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn increment_handler(State(state): State<Arc<AppState>>) -> &'static str {
|
|
||||||
let mut counter = state.counter.lock().unwrap();
|
|
||||||
counter.inc();
|
|
||||||
"Counter incremented"
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn summary_handler(State(state): State<Arc<AppState>>) -> &'static str {
|
|
||||||
let mut sum = state.sum.lock().unwrap();
|
|
||||||
let qs = sum.get_quantile();
|
|
||||||
let new_qs: Vec<Quantile> = qs
|
|
||||||
.into_iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(idx, q)| {
|
|
||||||
let mut q = q.to_owned();
|
|
||||||
q.set_value(q.get_value() + idx as f64);
|
|
||||||
// q.set_quantile(q.get_quantile() * 2.0);
|
|
||||||
q
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
sum.set_quantile(new_qs.into());
|
|
||||||
"Summary changed"
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
// let _ = setup_logger().await;
|
||||||
let registry = Registry::new();
|
let registry = Registry::new();
|
||||||
let counter_opts = Opts::new("example_counter", "Пример счётчика");
|
let counter_opts = Opts::new("example_counter", "Пример счётчика");
|
||||||
// let histogram_opts = Opts::new("example_histogram", "Пример histogram");
|
// let histogram_opts = Opts::new("example_histogram", "Пример histogram");
|
||||||
|
|
@ -79,6 +52,7 @@ async fn main() {
|
||||||
sunops.set_quantile(vq.into());
|
sunops.set_quantile(vq.into());
|
||||||
|
|
||||||
let counter = Counter::with_opts(counter_opts).unwrap();
|
let counter = Counter::with_opts(counter_opts).unwrap();
|
||||||
|
// counter.desc()
|
||||||
registry.register(Box::new(counter.clone())).unwrap();
|
registry.register(Box::new(counter.clone())).unwrap();
|
||||||
// registry.register(Box::new(prometheus::proto::MetricFamily::));
|
// registry.register(Box::new(prometheus::proto::MetricFamily::));
|
||||||
// registry.register(Box::new(sunops.clone())).unwrap();
|
// registry.register(Box::new(sunops.clone())).unwrap();
|
||||||
|
|
@ -90,12 +64,22 @@ async fn main() {
|
||||||
sum : Mutex::new(sunops)
|
sum : Mutex::new(sunops)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// info!("Configurating Web-Server...");
|
||||||
|
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_max_level(tracing::Level::DEBUG)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
info!("Configurating Web-Server...");
|
||||||
|
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/metrics", get(metrics_handler))
|
.route("/metrics", get(metrics_handler))
|
||||||
.route("/increment", get(increment_handler))
|
.route("/increment", get(increment_handler))
|
||||||
.route("/sum", get(summary_handler))
|
.route("/sum", get(summary_handler))
|
||||||
|
.route("/update", post(update_metrics))
|
||||||
.with_state(state.clone());
|
.with_state(state.clone());
|
||||||
|
|
||||||
let listener = TcpListener::bind("0.0.0.0:9100").await.unwrap();
|
let listener = TcpListener::bind("127.0.0.1:9100").await.unwrap();
|
||||||
|
info!("Serving on ...:9100");
|
||||||
axum::serve(listener, app).await.unwrap();
|
axum::serve(listener, app).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
@ -258,5 +258,19 @@ pub mod v3 {
|
||||||
endpoint_name: String,
|
endpoint_name: String,
|
||||||
metrics: Value,
|
metrics: Value,
|
||||||
}
|
}
|
||||||
|
impl PrometheusMetrics {
|
||||||
|
pub fn new(service: &str, endpoint: &str, metrics: Value) -> Self {
|
||||||
|
Self {
|
||||||
|
service_name: service.to_string(),
|
||||||
|
endpoint_name: endpoint.to_string(),
|
||||||
|
metrics: metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn get_bytes_len(&self) -> usize {
|
||||||
|
let str_metrics = serde_json::to_vec(self).unwrap_or_else(
|
||||||
|
|_| Vec::new()
|
||||||
|
);
|
||||||
|
str_metrics.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue