PROMETHEUSv1

pull/6/head
prplV 2025-02-14 18:44:08 +03:00
parent 39ad05c21c
commit 3eb8e749b5
7 changed files with 297 additions and 102 deletions

View File

@ -56,6 +56,7 @@ impl Exporter {
let url = env::var("EXPORTER_URL")?; let url = env::var("EXPORTER_URL")?;
// let req = Request::new(Method::PUT, // let req = Request::new(Method::PUT,
// Url::parse(metrics)?); // Url::parse(metrics)?);
dbg!(&metrics);
let req = Client::new() let req = Client::new()
.post(url) .post(url)
.json(&metrics) .json(&metrics)

View File

@ -5,7 +5,7 @@ use integr_structs::api::v3::{Metric, MetricOutput};
pub struct JsonParser; pub struct JsonParser;
impl JsonParser { impl JsonParser {
pub fn parse(targets: &Vec<Metric>, json: &str) -> Value { pub fn parse(targets: &Vec<Metric>, json: &str) -> Vec<MetricOutput> {
let mut res_vec: Vec<MetricOutput> = Vec::new(); let mut res_vec: Vec<MetricOutput> = Vec::new();
for target in targets { for target in targets {
let metric = match target.addr.contains("[") { let metric = match target.addr.contains("[") {
@ -14,7 +14,7 @@ impl JsonParser {
}; };
res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, &target.addr, metric)); res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, &target.addr, metric));
} }
serde_json::to_value(res_vec).unwrap_or(Value::Null) res_vec
} }
fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value { fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value {
if target.addr.is_empty() { if target.addr.is_empty() {

View File

@ -140,20 +140,6 @@ impl<'a> ApiPoll<'a> {
let preproc = JsonParser::parse(&metrics.measure, &response); let preproc = JsonParser::parse(&metrics.measure, &response);
// dbg!(serde_json::to_string_pretty(&preproc)); // dbg!(serde_json::to_string_pretty(&preproc));
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
// let metrics = serde_json::to_string_pretty(&preproc)
// .unwrap_or_else(|_| {
// error!("Cannot parse grabbed metrics data to String");
// String::from(r#"
// {
// "service_name" : null,
// "endpoint_name" : null,
// "value" : null
// }
// "#)
// });
// println!("{}", &metrics);
match Exporter::export_metrics(preproc).await { match Exporter::export_metrics(preproc).await {
Ok(bytes) => { Ok(bytes) => {
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes); info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);

View File

@ -1,12 +1,15 @@
use axum::{ use axum::{
body::Body, extract::{Json, Request, State}, http::{self, HeaderMap, Response as Resp}, response::{IntoResponse, Response}, routing::get, Router extract::{Json, State},
response::IntoResponse,
http
}; };
use integr_structs::api::v3::PrometheusMetrics; use integr_structs::api::v3::PrometheusMetrics;
use prometheus::{core::{Collector, Metric}, proto::{Quantile, Summary}, Counter, Encoder, Opts, Registry, TextEncoder}; use prometheus::{Encoder, Registry, TextEncoder, Gauge};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, MutexGuard};
use crate::AppState; use crate::AppState;
// use log::{warn, info, error}; // use log::{warn, info, error};
use tracing::{info, error, warn, debug}; use tracing::{info, error, warn};
use crate::metrics::{MetricsProcesser, MetricsValueType};
pub async fn update_metrics( pub async fn update_metrics(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
@ -14,55 +17,86 @@ pub async fn update_metrics(
) -> impl IntoResponse { ) -> impl IntoResponse {
info!("post on /update"); info!("post on /update");
// let resp = Response::new("body"); // let resp = Response::new("body");
debug!("{:?}", &request); // println!("{:?}", request);
// // resp // debug!("{:?}", MetricsProcesser::get_type_of_value(&request));
// let mut header_map = HeaderMap::new(); let service = &request.service_name;
// header_map.insert(http::header::CONTENT_TYPE, let endpoint = &request.endpoint_name;
// "text/plain".parse().unwrap());
for i in request.metrics {
// debug!("{:?}", &i);
// debug!("{:?}", MetricsProcesser::get_type_of_value(&i));
let metric_name = format!("{}_{}_{}", service, endpoint, &i.id);
match MetricsProcesser::get_type_of_value(&i) {
MetricsValueType::Array |
MetricsValueType::TaggedArray => {
},
MetricsValueType::Number => {
let gauge = MetricsProcesser::gauge_from_number(
&i,
&metric_name
);
if let Some(gauge) = gauge {
match state.registry.lock() {
Err(er) => {
error!("Cannot lock Metric Registry due to {} ", er)
},
Ok(registry) => {
update_or_insert_metric(
gauge,
registry,
&metric_name
);
},
}
}
// dbg!(gauge);
},
MetricsValueType::ArrayOfStrings => {
warn!("String arrays are unsupported, ignoring ...");
},
_ => {
warn!("Unrecognized metric type was supplied, ignoring ...");
}
}
}
(http::StatusCode::ACCEPTED, "Ok") (http::StatusCode::ACCEPTED, "Ok")
} }
pub async fn metrics_handler(State(state): State<Arc<AppState>>) -> String { pub async fn metrics_handler(State(state): State<Arc<AppState>>) -> String {
let encoder = TextEncoder::new(); let registry = state.registry.lock();
let mut buffer = Vec::new();
//
let mut fm = prometheus::proto::MetricFamily::new();
// prometheus::Registry::
let mut met = prometheus::proto::Metric::new();
let mutex_guard = state.sum.lock().unwrap();
met.set_summary(mutex_guard.clone());
// let metric = met.take_label();
fm.set_metric(vec![met].into());
fm.set_help("example summary".to_string());
fm.set_name("example_summary".to_string());
fm.set_field_type(prometheus::proto::MetricType::SUMMARY);
let mut metric_families = state.registry.gather(); return match registry {
metric_families.push(fm); Ok(registry) => {
encoder.encode(&metric_families, &mut buffer).unwrap(); let encoder = TextEncoder::new();
String::from_utf8(buffer).unwrap() let mut buffer = Vec::new();
let metric_families = registry.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
},
Err(er) => {
format!("Cannot lock Metric Registry due to {} ", er)
}
}
} }
pub async fn increment_handler(State(state): State<Arc<AppState>>) -> &'static str { pub fn update_or_insert_metric<'a>(
let mut counter = state.counter.lock().unwrap(); metric: Gauge,
counter.inc(); registry: MutexGuard<'a, Registry>,
"Counter incremented" metric_name: &str
} ) {
// let mut counter = 0;
match registry.register(Box::new(metric)) {
Ok(_) => {
info!("Metric `{}` was added!", metric_name);
},
Err(_er) => {
// update
},
}
// registry.gather()
// .iter()
// .filter(|fam| fam.get_name().)
pub async fn summary_handler(State(state): State<Arc<AppState>>) -> &'static str {
let mut sum = state.sum.lock().unwrap();
let qs = sum.get_quantile();
let new_qs: Vec<Quantile> = qs
.into_iter()
.enumerate()
.map(|(idx, q)| {
let mut q = q.to_owned();
q.set_value(q.get_value() + idx as f64);
// q.set_quantile(q.get_quantile() * 2.0);
q
})
.collect();
sum.set_quantile(new_qs.into());
"Summary changed"
} }

View File

@ -1,67 +1,60 @@
mod endpoints; mod endpoints;
// mod logger; // mod logger;
mod metrics;
// use logger::setup_logger; // use logger::setup_logger;
use axum::{ use axum::{
extract::State,
routing::{get, post}, routing::{get, post},
Router}; Router};
use prometheus::{ use prometheus::Registry;
core::{Collector, Metric},
proto::{Quantile, Summary},
Counter,
Encoder,
Opts,
Registry,
TextEncoder};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use endpoints::*; use endpoints::*;
use tokio::net::TcpListener; use tokio::net::TcpListener;
// use log::{warn, info, error}; // use log::{warn, info, error};
use tracing::{info, error, warn, debug}; use tracing::info;
struct AppState { struct AppState {
registry: Registry, registry: Mutex<Registry>,
counter: Mutex<Counter>, // counter: Mutex<Counter>,
sum : Mutex<Summary>, // sum : Mutex<Summary>,
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// let _ = setup_logger().await; // let _ = setup_logger().await;
let registry = Registry::new(); let registry = Registry::new();
let counter_opts = Opts::new("example_counter", "Пример счётчика"); // let counter_opts = Opts::new("example_counter", "Пример счётчика");
// let histogram_opts = Opts::new("example_histogram", "Пример histogram"); // let histogram_opts = Opts::new("example_histogram", "Пример histogram");
use prometheus::proto::{Summary, Quantile}; // use prometheus::proto::{Summary, Quantile};
// use prometheus::proto:: // use prometheus::proto::
// let guage = prometheus::ProtobufEncoder::new(); // let guage = prometheus::ProtobufEncoder::new();
let mut sunops = Summary::new(); // let mut sunops = Summary::new();
let mut q1 = Quantile::new(); // let mut q1 = Quantile::new();
let mut q2 = Quantile::new(); // let mut q2 = Quantile::new();
q1.set_quantile(25.0); // q1.set_quantile(25.0);
q2.set_quantile(75.0); // q2.set_quantile(75.0);
// prometheus::proto::Metric:: // // prometheus::proto::Metric::
let vq = vec![q1, q2]; // let vq = vec![q1, q2];
sunops.set_quantile(vq.into()); // sunops.set_quantile(vq.into());
let counter = Counter::with_opts(counter_opts).unwrap(); // let counter = Counter::with_opts(counter_opts).unwrap();
// counter.desc() // // counter.desc()
registry.register(Box::new(counter.clone())).unwrap(); // registry.register(Box::new(counter.clone())).unwrap();
// registry.register(Box::new(prometheus::proto::MetricFamily::)); // registry.register(Box::new(prometheus::proto::MetricFamily::));
// registry.register(Box::new(sunops.clone())).unwrap(); // registry.register(Box::new(sunops.clone())).unwrap();
let state = Arc::new(AppState { let state = Arc::new(AppState {
registry, registry: Mutex::new(registry),
counter: Mutex::new(counter), // counter: Mutex::new(counter),
sum : Mutex::new(sunops) // sum : Mutex::new(sunops)
}); });
// info!("Configurating Web-Server..."); // info!("Configurating Web-Server...");
@ -74,12 +67,12 @@ async fn main() {
let app = Router::new() let app = Router::new()
.route("/metrics", get(metrics_handler)) .route("/metrics", get(metrics_handler))
.route("/increment", get(increment_handler)) // .route("/increment", get(increment_handler))
.route("/sum", get(summary_handler)) // .route("/sum", get(summary_handler))
.route("/update", post(update_metrics)) .route("/update", post(update_metrics))
.with_state(state.clone()); .with_state(state.clone());
let listener = TcpListener::bind("127.0.0.1:9100").await.unwrap(); let listener = TcpListener::bind("0.0.0.0:9100").await.unwrap();
info!("Serving on ...:9100"); info!("Serving on ...:9100");
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }

View File

@ -0,0 +1,181 @@
// use serde_json::;
use integr_structs::api::v3::MetricOutput;
use serde_json::{Map, Value};
use prometheus::Gauge;
use tracing::error;
#[derive(Debug)]
pub enum MetricsValueType {
Number,
Array,
TaggedArray,
ArrayOfStrings,
None,
}
pub struct MetricsProcesser;
impl MetricsProcesser {
pub fn get_type_of_value(metrics: &MetricOutput) -> MetricsValueType {
if Self::is_number(metrics) {
return MetricsValueType::Number;
}
else if Self::is_array(metrics) {
if Self::is_tagged_array(metrics) {
return MetricsValueType::TaggedArray;
}
if Self::is_array_of_string_values(metrics) {
return MetricsValueType::ArrayOfStrings;
}
return MetricsValueType::Array;
}
MetricsValueType::None
}
pub fn gauge_from_number(
metric: &MetricOutput,
metric_name: &str,
) -> Option<Gauge> {
let gauge = Gauge::new(
metric_name,
&metric.id
);
match gauge {
Ok(gauge) => {
// let value = metric.value.as_number().unwrap_or({
// error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name);
// });
// let value = value.as_f64()
let val = match metric.value.as_number() {
Some(val) => {
val.as_f64().unwrap_or_else(||
0.0
)
},
None => {
error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name);
0.0
},
};
gauge.set(val);
return Some(gauge);
},
Err(er) => error!("Cannot create Gauge metric {} due to {}", &metric_name, er),
}
None
}
pub fn gauge_from_map_metrics(
map: &Map<String, Value>,
service: &str,
endpoint: &str
) -> Option<Gauge> {
let map = map.clone();
let help: String = map.keys()
.enumerate()
.map(|(idx, key)| {
if idx == 1 {
return key.to_owned();
}
"".to_owned()
})
.collect();
let name = format!("{}_{}_{}", service, endpoint, &help);
if map.len() > 1 {
// tagged
if map.len() > 2 {
error!("Cannot create Gauge {}. It can be only 1 tag", &name);
} else {
let mut label_name = String::new();
let mut label_value = String::new();
let mut metric_value = 0.0;
map.iter()
.enumerate()
.for_each(|(idx, (key, value))|{
if idx == 0 {
label_name = key.to_owned();
label_value = value.as_str()
.unwrap_or("")
.to_owned();
} else {
metric_value = value.as_f64().unwrap_or(0.0)
}
});
use prometheus::Opts;
use prometheus::GaugeVec;
let opts = Opts::new(&name, &help);
let gauge_vec = GaugeVec::new(opts, &[&label_name]);
match gauge_vec {
Ok(vec) => {
// vec.get_metric_with_label_values(vals)
match vec.get_metric_with_label_values(&[&label_value]) {
Ok(metric) => {
metric.set(metric_value); // Устанавливаем значение метрики
return Some(metric.clone()); // Возвращаем `Gauge`
},
Err(er) => {
error!("Cannot create Gauge {} due to {}", &name, er);
},
}
},
Err(er) => error!("Cannot create Gauge {} due to {}", &name, er),
}
}
} else {
// not-tagged
let metric = Gauge::new(&name, &help);
match metric {
Ok(gauge) => {
let mut value = 0.0;
map.values()
.map(|val| val.clone().as_f64())
.for_each(|val| {
value = val.unwrap_or(0.0)
});
gauge.set(value);
return Some(gauge);
},
Err(er) => {
error!("Cannot create Gauge {} due to {}", &name, er);
}
}
}
None
}
pub fn get_value_as_vec_map(metrics: &MetricOutput) -> Vec<Map<String, Value>>{
let mut vec: Vec<Map<String, Value>> = Vec::new();
let arr = metrics.value.as_array().unwrap();
arr.iter()
.for_each(|a| {
vec.push(serde_json::from_value(a.clone()).unwrap());
});
vec
}
pub fn is_array_of_string_values(metrics: &MetricOutput) -> bool {
let arr = metrics.value.clone();
let arr = arr.as_array().unwrap();
let map: Map<String, Value> = serde_json::from_value(
arr[0].clone()
).unwrap();
map.values()
.all(|val| val.is_string())
}
// fn is_valid(metrics: &PrometheusMetrics) -> bool {
// false
// }
fn is_array(metrics: &MetricOutput) -> bool {
metrics.value.is_array()
}
fn is_tagged_array(metrics: &MetricOutput) -> bool {
let arr = metrics.value.as_array().unwrap();
let map: Map<String, Value> = serde_json::from_value(arr[0].clone()).unwrap();
map.len() > 1
}
fn is_number(metrics: &MetricOutput) -> bool {
metrics
.value.is_number()
}
}

View File

@ -235,11 +235,11 @@ pub mod v3 {
// to prometheus and nmns // to prometheus and nmns
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct MetricOutput { pub struct MetricOutput {
id : String, pub id : String,
#[serde(rename = "type")] #[serde(rename = "type")]
json_type : String, json_type : String,
addr : String, addr : String,
value : Value, pub value : Value,
} }
impl MetricOutput { impl MetricOutput {
pub fn new_with_slices(id : &str, json_type : &str, addr: &str,value : Value) -> Self { pub fn new_with_slices(id : &str, json_type : &str, addr: &str,value : Value) -> Self {
@ -254,12 +254,12 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetrics { pub struct PrometheusMetrics {
service_name: String, pub service_name: String,
endpoint_name: String, pub endpoint_name: String,
metrics: Value, pub metrics: Vec<MetricOutput>,
} }
impl PrometheusMetrics { impl PrometheusMetrics {
pub fn new(service: &str, endpoint: &str, metrics: Value) -> Self { pub fn new(service: &str, endpoint: &str, metrics: Vec<MetricOutput>) -> Self {
Self { Self {
service_name: service.to_string(), service_name: service.to_string(),
endpoint_name: endpoint.to_string(), endpoint_name: endpoint.to_string(),