exporter migration
parent
3eb8e749b5
commit
000ca7c3c2
|
|
@ -1,7 +1,7 @@
|
|||
[workspace]
|
||||
resolver = "2"
|
||||
members = [
|
||||
"crates/api-grub", "crates/config-delivery", "crates/exporter", "crates/integr-structs", "crates/preproc",
|
||||
"crates/api-grub", "crates/config-delivery", "crates/integr-structs", "crates/preproc",
|
||||
]
|
||||
|
||||
[profile.dev]
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ impl Exporter {
|
|||
let url = env::var("EXPORTER_URL")?;
|
||||
// let req = Request::new(Method::PUT,
|
||||
// Url::parse(metrics)?);
|
||||
dbg!(&metrics);
|
||||
// dbg!(&metrics);
|
||||
let req = Client::new()
|
||||
.post(url)
|
||||
.json(&metrics)
|
||||
|
|
|
|||
|
|
@ -1,17 +0,0 @@
|
|||
[package]
|
||||
name = "exporter"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
axum = "0.8.1"
|
||||
prometheus = "0.13.4"
|
||||
tokio = { version = "1.43.0", features = ["full"] }
|
||||
integr-structs = {path = "../integr-structs"}
|
||||
anyhow = "1.0.95"
|
||||
chrono = "0.4.39"
|
||||
tracing = "0.1.41"
|
||||
tracing-subscriber = "0.3.19"
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.138"
|
||||
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
FROM rust:1.75 AS builder
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY . .
|
||||
|
||||
RUN cargo build --release
|
||||
|
||||
FROM debian:bullseye-slim
|
||||
|
||||
WORKDIR /app
|
||||
# RUN sudo apt update
|
||||
# RUN sudo apt install libc6
|
||||
COPY --from=builder /app/target/release/exporter /app/server
|
||||
EXPOSE 9100
|
||||
|
||||
CMD ["/app/server"]
|
||||
|
|
@ -1,102 +0,0 @@
|
|||
use axum::{
|
||||
extract::{Json, State},
|
||||
response::IntoResponse,
|
||||
http
|
||||
};
|
||||
use integr_structs::api::v3::PrometheusMetrics;
|
||||
use prometheus::{Encoder, Registry, TextEncoder, Gauge};
|
||||
use std::sync::{Arc, MutexGuard};
|
||||
use crate::AppState;
|
||||
// use log::{warn, info, error};
|
||||
use tracing::{info, error, warn};
|
||||
use crate::metrics::{MetricsProcesser, MetricsValueType};
|
||||
|
||||
pub async fn update_metrics(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Json(request) : Json<PrometheusMetrics>
|
||||
) -> impl IntoResponse {
|
||||
info!("post on /update");
|
||||
// let resp = Response::new("body");
|
||||
// println!("{:?}", request);
|
||||
// debug!("{:?}", MetricsProcesser::get_type_of_value(&request));
|
||||
let service = &request.service_name;
|
||||
let endpoint = &request.endpoint_name;
|
||||
|
||||
for i in request.metrics {
|
||||
// debug!("{:?}", &i);
|
||||
// debug!("{:?}", MetricsProcesser::get_type_of_value(&i));
|
||||
let metric_name = format!("{}_{}_{}", service, endpoint, &i.id);
|
||||
match MetricsProcesser::get_type_of_value(&i) {
|
||||
MetricsValueType::Array |
|
||||
MetricsValueType::TaggedArray => {
|
||||
|
||||
},
|
||||
MetricsValueType::Number => {
|
||||
let gauge = MetricsProcesser::gauge_from_number(
|
||||
&i,
|
||||
&metric_name
|
||||
);
|
||||
if let Some(gauge) = gauge {
|
||||
match state.registry.lock() {
|
||||
Err(er) => {
|
||||
error!("Cannot lock Metric Registry due to {} ", er)
|
||||
},
|
||||
Ok(registry) => {
|
||||
update_or_insert_metric(
|
||||
gauge,
|
||||
registry,
|
||||
&metric_name
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
// dbg!(gauge);
|
||||
},
|
||||
MetricsValueType::ArrayOfStrings => {
|
||||
warn!("String arrays are unsupported, ignoring ...");
|
||||
},
|
||||
_ => {
|
||||
warn!("Unrecognized metric type was supplied, ignoring ...");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(http::StatusCode::ACCEPTED, "Ok")
|
||||
}
|
||||
|
||||
pub async fn metrics_handler(State(state): State<Arc<AppState>>) -> String {
|
||||
let registry = state.registry.lock();
|
||||
|
||||
return match registry {
|
||||
Ok(registry) => {
|
||||
let encoder = TextEncoder::new();
|
||||
let mut buffer = Vec::new();
|
||||
let metric_families = registry.gather();
|
||||
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||
String::from_utf8(buffer).unwrap()
|
||||
},
|
||||
Err(er) => {
|
||||
format!("Cannot lock Metric Registry due to {} ", er)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_or_insert_metric<'a>(
|
||||
metric: Gauge,
|
||||
registry: MutexGuard<'a, Registry>,
|
||||
metric_name: &str
|
||||
) {
|
||||
// let mut counter = 0;
|
||||
match registry.register(Box::new(metric)) {
|
||||
Ok(_) => {
|
||||
info!("Metric `{}` was added!", metric_name);
|
||||
},
|
||||
Err(_er) => {
|
||||
// update
|
||||
},
|
||||
}
|
||||
// registry.gather()
|
||||
// .iter()
|
||||
// .filter(|fam| fam.get_name().)
|
||||
|
||||
}
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
mod endpoints;
|
||||
// mod logger;
|
||||
mod metrics;
|
||||
|
||||
// use logger::setup_logger;
|
||||
use axum::{
|
||||
routing::{get, post},
|
||||
Router};
|
||||
use prometheus::Registry;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use endpoints::*;
|
||||
use tokio::net::TcpListener;
|
||||
// use log::{warn, info, error};
|
||||
use tracing::info;
|
||||
|
||||
struct AppState {
|
||||
registry: Mutex<Registry>,
|
||||
// counter: Mutex<Counter>,
|
||||
// sum : Mutex<Summary>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// let _ = setup_logger().await;
|
||||
let registry = Registry::new();
|
||||
// let counter_opts = Opts::new("example_counter", "Пример счётчика");
|
||||
// let histogram_opts = Opts::new("example_histogram", "Пример histogram");
|
||||
|
||||
// use prometheus::proto::{Summary, Quantile};
|
||||
// use prometheus::proto::
|
||||
|
||||
// let guage = prometheus::ProtobufEncoder::new();
|
||||
|
||||
|
||||
|
||||
// let mut sunops = Summary::new();
|
||||
// let mut q1 = Quantile::new();
|
||||
// let mut q2 = Quantile::new();
|
||||
|
||||
// q1.set_quantile(25.0);
|
||||
// q2.set_quantile(75.0);
|
||||
// // prometheus::proto::Metric::
|
||||
|
||||
// let vq = vec![q1, q2];
|
||||
// sunops.set_quantile(vq.into());
|
||||
|
||||
// let counter = Counter::with_opts(counter_opts).unwrap();
|
||||
// // counter.desc()
|
||||
// registry.register(Box::new(counter.clone())).unwrap();
|
||||
// registry.register(Box::new(prometheus::proto::MetricFamily::));
|
||||
// registry.register(Box::new(sunops.clone())).unwrap();
|
||||
|
||||
|
||||
let state = Arc::new(AppState {
|
||||
registry: Mutex::new(registry),
|
||||
// counter: Mutex::new(counter),
|
||||
// sum : Mutex::new(sunops)
|
||||
});
|
||||
|
||||
// info!("Configurating Web-Server...");
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::DEBUG)
|
||||
.init();
|
||||
|
||||
info!("Configurating Web-Server...");
|
||||
|
||||
let app = Router::new()
|
||||
.route("/metrics", get(metrics_handler))
|
||||
// .route("/increment", get(increment_handler))
|
||||
// .route("/sum", get(summary_handler))
|
||||
.route("/update", post(update_metrics))
|
||||
.with_state(state.clone());
|
||||
|
||||
let listener = TcpListener::bind("0.0.0.0:9100").await.unwrap();
|
||||
info!("Serving on ...:9100");
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
|
|
@ -1,181 +0,0 @@
|
|||
// use serde_json::;
|
||||
use integr_structs::api::v3::MetricOutput;
|
||||
use serde_json::{Map, Value};
|
||||
use prometheus::Gauge;
|
||||
use tracing::error;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MetricsValueType {
|
||||
Number,
|
||||
Array,
|
||||
TaggedArray,
|
||||
ArrayOfStrings,
|
||||
None,
|
||||
}
|
||||
|
||||
pub struct MetricsProcesser;
|
||||
|
||||
|
||||
impl MetricsProcesser {
|
||||
|
||||
pub fn get_type_of_value(metrics: &MetricOutput) -> MetricsValueType {
|
||||
if Self::is_number(metrics) {
|
||||
return MetricsValueType::Number;
|
||||
}
|
||||
else if Self::is_array(metrics) {
|
||||
if Self::is_tagged_array(metrics) {
|
||||
return MetricsValueType::TaggedArray;
|
||||
}
|
||||
if Self::is_array_of_string_values(metrics) {
|
||||
return MetricsValueType::ArrayOfStrings;
|
||||
}
|
||||
return MetricsValueType::Array;
|
||||
}
|
||||
MetricsValueType::None
|
||||
}
|
||||
pub fn gauge_from_number(
|
||||
metric: &MetricOutput,
|
||||
metric_name: &str,
|
||||
) -> Option<Gauge> {
|
||||
let gauge = Gauge::new(
|
||||
metric_name,
|
||||
&metric.id
|
||||
);
|
||||
|
||||
match gauge {
|
||||
Ok(gauge) => {
|
||||
// let value = metric.value.as_number().unwrap_or({
|
||||
// error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name);
|
||||
// });
|
||||
// let value = value.as_f64()
|
||||
let val = match metric.value.as_number() {
|
||||
Some(val) => {
|
||||
val.as_f64().unwrap_or_else(||
|
||||
0.0
|
||||
)
|
||||
},
|
||||
None => {
|
||||
error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name);
|
||||
0.0
|
||||
},
|
||||
};
|
||||
gauge.set(val);
|
||||
return Some(gauge);
|
||||
},
|
||||
Err(er) => error!("Cannot create Gauge metric {} due to {}", &metric_name, er),
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
pub fn gauge_from_map_metrics(
|
||||
map: &Map<String, Value>,
|
||||
service: &str,
|
||||
endpoint: &str
|
||||
) -> Option<Gauge> {
|
||||
let map = map.clone();
|
||||
let help: String = map.keys()
|
||||
.enumerate()
|
||||
.map(|(idx, key)| {
|
||||
if idx == 1 {
|
||||
return key.to_owned();
|
||||
}
|
||||
"".to_owned()
|
||||
})
|
||||
.collect();
|
||||
let name = format!("{}_{}_{}", service, endpoint, &help);
|
||||
if map.len() > 1 {
|
||||
// tagged
|
||||
if map.len() > 2 {
|
||||
error!("Cannot create Gauge {}. It can be only 1 tag", &name);
|
||||
} else {
|
||||
let mut label_name = String::new();
|
||||
let mut label_value = String::new();
|
||||
let mut metric_value = 0.0;
|
||||
map.iter()
|
||||
.enumerate()
|
||||
.for_each(|(idx, (key, value))|{
|
||||
if idx == 0 {
|
||||
label_name = key.to_owned();
|
||||
label_value = value.as_str()
|
||||
.unwrap_or("")
|
||||
.to_owned();
|
||||
} else {
|
||||
metric_value = value.as_f64().unwrap_or(0.0)
|
||||
}
|
||||
});
|
||||
use prometheus::Opts;
|
||||
use prometheus::GaugeVec;
|
||||
|
||||
let opts = Opts::new(&name, &help);
|
||||
let gauge_vec = GaugeVec::new(opts, &[&label_name]);
|
||||
match gauge_vec {
|
||||
Ok(vec) => {
|
||||
// vec.get_metric_with_label_values(vals)
|
||||
match vec.get_metric_with_label_values(&[&label_value]) {
|
||||
Ok(metric) => {
|
||||
metric.set(metric_value); // Устанавливаем значение метрики
|
||||
return Some(metric.clone()); // Возвращаем `Gauge`
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot create Gauge {} due to {}", &name, er);
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(er) => error!("Cannot create Gauge {} due to {}", &name, er),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// not-tagged
|
||||
let metric = Gauge::new(&name, &help);
|
||||
match metric {
|
||||
Ok(gauge) => {
|
||||
let mut value = 0.0;
|
||||
map.values()
|
||||
.map(|val| val.clone().as_f64())
|
||||
.for_each(|val| {
|
||||
value = val.unwrap_or(0.0)
|
||||
});
|
||||
gauge.set(value);
|
||||
return Some(gauge);
|
||||
},
|
||||
Err(er) => {
|
||||
error!("Cannot create Gauge {} due to {}", &name, er);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
pub fn get_value_as_vec_map(metrics: &MetricOutput) -> Vec<Map<String, Value>>{
|
||||
let mut vec: Vec<Map<String, Value>> = Vec::new();
|
||||
let arr = metrics.value.as_array().unwrap();
|
||||
arr.iter()
|
||||
.for_each(|a| {
|
||||
vec.push(serde_json::from_value(a.clone()).unwrap());
|
||||
});
|
||||
vec
|
||||
}
|
||||
pub fn is_array_of_string_values(metrics: &MetricOutput) -> bool {
|
||||
let arr = metrics.value.clone();
|
||||
let arr = arr.as_array().unwrap();
|
||||
let map: Map<String, Value> = serde_json::from_value(
|
||||
arr[0].clone()
|
||||
).unwrap();
|
||||
map.values()
|
||||
.all(|val| val.is_string())
|
||||
}
|
||||
// fn is_valid(metrics: &PrometheusMetrics) -> bool {
|
||||
// false
|
||||
// }
|
||||
fn is_array(metrics: &MetricOutput) -> bool {
|
||||
metrics.value.is_array()
|
||||
}
|
||||
fn is_tagged_array(metrics: &MetricOutput) -> bool {
|
||||
let arr = metrics.value.as_array().unwrap();
|
||||
let map: Map<String, Value> = serde_json::from_value(arr[0].clone()).unwrap();
|
||||
map.len() > 1
|
||||
}
|
||||
fn is_number(metrics: &MetricOutput) -> bool {
|
||||
metrics
|
||||
.value.is_number()
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue