From a5f67cbeb7ab39e3890c938a438365ec4cbf5ea5 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 18 Feb 2025 13:07:23 +0300 Subject: [PATCH] init commit --- .gitignore | 3 + Cargo.toml | 16 +++ Dockerfile | 17 +++ src/endpoints.rs | 103 ++++++++++++++++++ src/main.rs | 79 ++++++++++++++ src/metrics.rs | 181 +++++++++++++++++++++++++++++++ src/structs.rs | 276 +++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 675 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 Dockerfile create mode 100644 src/endpoints.rs create mode 100644 src/main.rs create mode 100644 src/metrics.rs create mode 100644 src/structs.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3549fae --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +Cargo.lock +.env \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..15e719f --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "exporter" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.8.1" +prometheus = "0.13.4" +tokio = { version = "1.43.0", features = ["full"] } +anyhow = "1.0.95" +chrono = "0.4.39" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.138" + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8d003bd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +FROM rust:1.75 AS builder + +WORKDIR /app + +COPY . . + +RUN cargo build --release + +FROM debian:bullseye-slim + +WORKDIR /app +# RUN sudo apt update +# RUN sudo apt install libc6 +COPY --from=builder /app/target/release/exporter /app/server +EXPOSE 9100 + +CMD ["/app/server"] diff --git a/src/endpoints.rs b/src/endpoints.rs new file mode 100644 index 0000000..dd8f455 --- /dev/null +++ b/src/endpoints.rs @@ -0,0 +1,103 @@ +use axum::{ + extract::{Json, State}, + response::IntoResponse, + http +}; +use crate::structs +use integr_structs::api::v3::PrometheusMetrics; +use prometheus::{Encoder, Registry, TextEncoder, Gauge}; +use std::sync::{Arc, MutexGuard}; +use crate::AppState; +// use log::{warn, info, error}; +use tracing::{info, error, warn}; +use crate::metrics::{MetricsProcesser, MetricsValueType}; + +pub async fn update_metrics( + State(state): State>, + Json(request) : Json +) -> impl IntoResponse { + info!("post on /update"); +// let resp = Response::new("body"); + // println!("{:?}", request); + // debug!("{:?}", MetricsProcesser::get_type_of_value(&request)); + let service = &request.service_name; + let endpoint = &request.endpoint_name; + + for i in request.metrics { + // debug!("{:?}", &i); + // debug!("{:?}", MetricsProcesser::get_type_of_value(&i)); + let metric_name = format!("{}_{}_{}", service, endpoint, &i.id); + match MetricsProcesser::get_type_of_value(&i) { + MetricsValueType::Array | + MetricsValueType::TaggedArray => { + + }, + MetricsValueType::Number => { + let gauge = MetricsProcesser::gauge_from_number( + &i, + &metric_name + ); + if let Some(gauge) = gauge { + match state.registry.lock() { + Err(er) => { + error!("Cannot lock Metric Registry due to {} ", er) + }, + Ok(registry) => { + update_or_insert_metric( + gauge, + registry, + &metric_name + ); + }, + } + } + // dbg!(gauge); + }, + MetricsValueType::ArrayOfStrings => { + warn!("String arrays are unsupported, ignoring ..."); + }, + _ => { + warn!("Unrecognized metric type was supplied, ignoring ..."); + } + } + } + + (http::StatusCode::ACCEPTED, "Ok") +} + +pub async fn metrics_handler(State(state): State>) -> String { + let registry = state.registry.lock(); + + return match registry { + Ok(registry) => { + let encoder = TextEncoder::new(); + let mut buffer = Vec::new(); + let metric_families = registry.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() + }, + Err(er) => { + format!("Cannot lock Metric Registry due to {} ", er) + } + } +} + +pub fn update_or_insert_metric<'a>( + metric: Gauge, + registry: MutexGuard<'a, Registry>, + metric_name: &str +) { + // let mut counter = 0; + match registry.register(Box::new(metric)) { + Ok(_) => { + info!("Metric `{}` was added!", metric_name); + }, + Err(_er) => { + // update + }, + } + // registry.gather() + // .iter() + // .filter(|fam| fam.get_name().) + +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..80e4e15 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,79 @@ +mod endpoints; +// mod logger; +mod metrics; +mod structs; + +// use logger::setup_logger; +use axum::{ + routing::{get, post}, + Router}; +use prometheus::Registry; +use std::sync::{Arc, Mutex}; +use endpoints::*; +use tokio::net::TcpListener; +// use log::{warn, info, error}; +use tracing::info; + +struct AppState { + registry: Mutex, + // counter: Mutex, + // sum : Mutex, +} + +#[tokio::main] +async fn main() { + // let _ = setup_logger().await; + let registry = Registry::new(); + // let counter_opts = Opts::new("example_counter", "Пример счётчика"); + // let histogram_opts = Opts::new("example_histogram", "Пример histogram"); + + // use prometheus::proto::{Summary, Quantile}; + // use prometheus::proto:: + + // let guage = prometheus::ProtobufEncoder::new(); + + + + // let mut sunops = Summary::new(); + // let mut q1 = Quantile::new(); + // let mut q2 = Quantile::new(); + + // q1.set_quantile(25.0); + // q2.set_quantile(75.0); + // // prometheus::proto::Metric:: + + // let vq = vec![q1, q2]; + // sunops.set_quantile(vq.into()); + + // let counter = Counter::with_opts(counter_opts).unwrap(); + // // counter.desc() + // registry.register(Box::new(counter.clone())).unwrap(); + // registry.register(Box::new(prometheus::proto::MetricFamily::)); + // registry.register(Box::new(sunops.clone())).unwrap(); + + + let state = Arc::new(AppState { + registry: Mutex::new(registry), + // counter: Mutex::new(counter), + // sum : Mutex::new(sunops) + }); + + // info!("Configurating Web-Server..."); + + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .init(); + + info!("Configurating Web-Server..."); + + let app = Router::new() + .route("/metrics", get(metrics_handler)) + // .route("/increment", get(increment_handler)) + // .route("/sum", get(summary_handler)) + .route("/update", post(update_metrics)) + .with_state(state.clone()); + + let listener = TcpListener::bind("0.0.0.0:9100").await.unwrap(); + info!("Serving on ...:9100"); + axum::serve(listener, app).await.unwrap(); +} \ No newline at end of file diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..5b94562 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,181 @@ +// use serde_json::; +use integr_structs::api::v3::MetricOutput; +use serde_json::{Map, Value}; +use prometheus::Gauge; +use tracing::error; + +#[derive(Debug)] +pub enum MetricsValueType { + Number, + Array, + TaggedArray, + ArrayOfStrings, + None, +} + +pub struct MetricsProcesser; + + +impl MetricsProcesser { + + pub fn get_type_of_value(metrics: &MetricOutput) -> MetricsValueType { + if Self::is_number(metrics) { + return MetricsValueType::Number; + } + else if Self::is_array(metrics) { + if Self::is_tagged_array(metrics) { + return MetricsValueType::TaggedArray; + } + if Self::is_array_of_string_values(metrics) { + return MetricsValueType::ArrayOfStrings; + } + return MetricsValueType::Array; + } + MetricsValueType::None + } + pub fn gauge_from_number( + metric: &MetricOutput, + metric_name: &str, + ) -> Option { + let gauge = Gauge::new( + metric_name, + &metric.id + ); + + match gauge { + Ok(gauge) => { + // let value = metric.value.as_number().unwrap_or({ + // error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name); + // }); + // let value = value.as_f64() + let val = match metric.value.as_number() { + Some(val) => { + val.as_f64().unwrap_or_else(|| + 0.0 + ) + }, + None => { + error!("Cannot convert {} metric value to f64 type. Value was set to 0.0", &metric_name); + 0.0 + }, + }; + gauge.set(val); + return Some(gauge); + }, + Err(er) => error!("Cannot create Gauge metric {} due to {}", &metric_name, er), + } + + None + } + pub fn gauge_from_map_metrics( + map: &Map, + service: &str, + endpoint: &str + ) -> Option { + let map = map.clone(); + let help: String = map.keys() + .enumerate() + .map(|(idx, key)| { + if idx == 1 { + return key.to_owned(); + } + "".to_owned() + }) + .collect(); + let name = format!("{}_{}_{}", service, endpoint, &help); + if map.len() > 1 { + // tagged + if map.len() > 2 { + error!("Cannot create Gauge {}. It can be only 1 tag", &name); + } else { + let mut label_name = String::new(); + let mut label_value = String::new(); + let mut metric_value = 0.0; + map.iter() + .enumerate() + .for_each(|(idx, (key, value))|{ + if idx == 0 { + label_name = key.to_owned(); + label_value = value.as_str() + .unwrap_or("") + .to_owned(); + } else { + metric_value = value.as_f64().unwrap_or(0.0) + } + }); + use prometheus::Opts; + use prometheus::GaugeVec; + + let opts = Opts::new(&name, &help); + let gauge_vec = GaugeVec::new(opts, &[&label_name]); + match gauge_vec { + Ok(vec) => { + // vec.get_metric_with_label_values(vals) + match vec.get_metric_with_label_values(&[&label_value]) { + Ok(metric) => { + metric.set(metric_value); // Устанавливаем значение метрики + return Some(metric.clone()); // Возвращаем `Gauge` + }, + Err(er) => { + error!("Cannot create Gauge {} due to {}", &name, er); + }, + } + }, + Err(er) => error!("Cannot create Gauge {} due to {}", &name, er), + } + } + } else { + // not-tagged + let metric = Gauge::new(&name, &help); + match metric { + Ok(gauge) => { + let mut value = 0.0; + map.values() + .map(|val| val.clone().as_f64()) + .for_each(|val| { + value = val.unwrap_or(0.0) + }); + gauge.set(value); + return Some(gauge); + }, + Err(er) => { + error!("Cannot create Gauge {} due to {}", &name, er); + } + } + } + None + } + pub fn get_value_as_vec_map(metrics: &MetricOutput) -> Vec>{ + let mut vec: Vec> = Vec::new(); + let arr = metrics.value.as_array().unwrap(); + arr.iter() + .for_each(|a| { + vec.push(serde_json::from_value(a.clone()).unwrap()); + }); + vec + } + pub fn is_array_of_string_values(metrics: &MetricOutput) -> bool { + let arr = metrics.value.clone(); + let arr = arr.as_array().unwrap(); + let map: Map = serde_json::from_value( + arr[0].clone() + ).unwrap(); + map.values() + .all(|val| val.is_string()) + } + // fn is_valid(metrics: &PrometheusMetrics) -> bool { + // false + // } + fn is_array(metrics: &MetricOutput) -> bool { + metrics.value.is_array() + } + fn is_tagged_array(metrics: &MetricOutput) -> bool { + let arr = metrics.value.as_array().unwrap(); + let map: Map = serde_json::from_value(arr[0].clone()).unwrap(); + map.len() > 1 + } + fn is_number(metrics: &MetricOutput) -> bool { + metrics + .value.is_number() + } +} \ No newline at end of file diff --git a/src/structs.rs b/src/structs.rs new file mode 100644 index 0000000..4a824f0 --- /dev/null +++ b/src/structs.rs @@ -0,0 +1,276 @@ +use core::sync; +use std::collections::HashMap; +use serde::{Serialize, Deserialize}; +use serde_json::{ to_string_pretty, Value }; +use anyhow::Result; +use std::sync::Arc; + + +#[derive(Serialize, Deserialize, Debug)] +pub struct ApiConfig { + #[serde(default)] + pub endpoints : Vec, + pub delay : u32, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ApiEndpoint { + pub url : String, + pub method : String, +} + +impl Default for ApiConfig { + fn default() -> Self { + ApiConfig { + endpoints : vec![], + delay : 0, + } + } +} + +// v2 +#[derive(Serialize, Deserialize, Debug, PartialEq)] +pub struct ApiConfigV2 { + pub id : u64, + #[serde(default)] + pub template : Vec