Compare commits
6 Commits
6228b2393d
...
32a43f9ed8
| Author | SHA1 | Date |
|---|---|---|
|
|
32a43f9ed8 | |
|
|
6297ea2a50 | |
|
|
2adb706a69 | |
|
|
644b18cfd7 | |
|
|
fa47771cff | |
|
|
a758db9bb4 |
|
|
@ -18,3 +18,4 @@ tokio-postgres = "0.7.12"
|
|||
dotenv = "0.15.0"
|
||||
md5 = "0.7.0"
|
||||
rand = "0.9.0"
|
||||
sysinfo = "0.33.1"
|
||||
|
|
|
|||
|
|
@ -3,9 +3,10 @@ mod net;
|
|||
mod logger;
|
||||
mod json;
|
||||
mod export;
|
||||
mod monitoring;
|
||||
|
||||
use anyhow::Result;
|
||||
use integr_structs::api::ApiConfigV2;
|
||||
// use integr_structs::api::ApiConfigV2;
|
||||
use integr_structs::api::v3::Config;
|
||||
use logger::setup_logger;
|
||||
// use log::{info, warn};
|
||||
|
|
|
|||
|
|
@ -0,0 +1,243 @@
|
|||
use std::env;
|
||||
use anyhow::Error;
|
||||
use serde_json::{Map, Value};
|
||||
use reqwest::Client;
|
||||
use tokio::sync::Semaphore;
|
||||
use std::sync::Arc;
|
||||
// use crate::structs::{AuthResponse, ForTokenCredentials, GenericUrl};
|
||||
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, get_chunk_size};
|
||||
// use crate::structs::cmdb::Query;
|
||||
use integr_structs::api::enode_monitoring::cmdb::Query;
|
||||
use tokio::task::JoinHandle;
|
||||
// use crate::structs::get_chunk_size;
|
||||
use std::pin::Pin;
|
||||
use std::future::Future;
|
||||
use integr_structs::api::v3::{MetricOutput, PrometheusMetrics};
|
||||
// use chrono::{Local, DateTime};
|
||||
|
||||
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
||||
let timer = tokio::time::Instant::now();
|
||||
loop {
|
||||
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
|
||||
break;
|
||||
}
|
||||
let mut a = MonitoringImporter::new().await;
|
||||
a.start_session().await?;
|
||||
let vec = a.get_metrics_list().await?;
|
||||
let _ = a.get_measure_info(Arc::new(vec)).await;
|
||||
a.close_session().await?;
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MonitoringImporter {
|
||||
ip : String,
|
||||
login : String,
|
||||
password : String,
|
||||
access_token : String,
|
||||
ts : String,
|
||||
}
|
||||
|
||||
impl MonitoringImporter {
|
||||
pub async fn new() -> Self {
|
||||
MonitoringImporter {
|
||||
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
|
||||
login : env::var("ENODE_MONITORING_LOGIN").unwrap_or_else(|_| String::new()),
|
||||
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
|
||||
access_token : String::new(),
|
||||
ts : String::new(),
|
||||
}
|
||||
}
|
||||
async fn is_valid(&self) -> bool {
|
||||
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
||||
}
|
||||
async fn set_ts(&mut self, ts: &str) {
|
||||
self.ts = ts.to_owned();
|
||||
}
|
||||
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
||||
if !self.is_valid().await {
|
||||
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
||||
}
|
||||
let client = Client::new();
|
||||
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
||||
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
||||
// dbg!(&fortoken);
|
||||
let client = client
|
||||
.post(url)
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&fortoken);
|
||||
let resp = client.send().await?;
|
||||
let auth = resp.json::<AuthResponse>().await?;
|
||||
// dbg!(&auth);
|
||||
self.set_ts(&fortoken.ts).await;
|
||||
|
||||
self.access_token = auth.access_token.to_owned();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
pub async fn close_session(&mut self) -> anyhow::Result<()> {
|
||||
let client = Client::new();
|
||||
let url = format!("http://{}/e-data-front/auth/logout", self.ip);
|
||||
let client = client
|
||||
.post(url)
|
||||
.header("Content-Type", "application/json")
|
||||
.header("access-token", &self.access_token);
|
||||
|
||||
let _ = client.send().await?;
|
||||
|
||||
self.access_token.clear();
|
||||
Ok(())
|
||||
}
|
||||
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<String>> {
|
||||
let client = Client::new();
|
||||
let mut vec: Vec<String> = Vec::new();
|
||||
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
||||
let client = client
|
||||
.post(url)
|
||||
.header("Content-Type", "application/json")
|
||||
.header("access-token", &self.access_token)
|
||||
.json(&Query::default());
|
||||
let resp = client.send().await?.text().await?;
|
||||
// dbg!(&resp.text().await);
|
||||
let resp: Value = serde_json::from_str(&resp)?;
|
||||
if let Some(arr) = resp.as_array() {
|
||||
for measure in arr {
|
||||
let id = measure.get("id");
|
||||
let cls = measure.get("cls");
|
||||
if id.is_some() && cls.is_some() {
|
||||
// todo: later wait for Vaitaliy call of classification
|
||||
let id = id.unwrap().as_i64().unwrap_or_default();
|
||||
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
||||
if cls.is_empty() {
|
||||
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
||||
}
|
||||
// let measure_name = format!("{}${}", cls, id);
|
||||
vec.push(format!("{}${}", cls, id));
|
||||
}
|
||||
}
|
||||
// dbg!(vec);
|
||||
} else {
|
||||
return Err(Error::msg("Invalid JSON in response"));
|
||||
}
|
||||
Ok(vec)
|
||||
}
|
||||
pub async fn get_measure_info(&self, measures: Arc<Vec<String>>) -> anyhow::Result<()> {
|
||||
let mut sys = sysinfo::System::new();
|
||||
sys.refresh_cpu_all();
|
||||
// adaptive permition on task spawm to prevent system overload
|
||||
let sem = Arc::new(Semaphore::new(sys.cpus().len()));
|
||||
let mut jh_vec = Vec::new();
|
||||
let client = Arc::new(Client::new());
|
||||
let measures = measures.clone();
|
||||
let arc = Arc::new(self.clone());
|
||||
// dbg!(&measures.display());
|
||||
|
||||
// dbg!(&measures.len());
|
||||
for measure in measures.chunks(get_chunk_size(measures.len())) {
|
||||
let permit = sem.clone();
|
||||
let arc = arc.clone();
|
||||
let client = client.clone();
|
||||
let measure = Arc::new(measure.display());
|
||||
|
||||
let _permit = permit.acquire().await.unwrap();
|
||||
|
||||
let jh: JoinHandle<anyhow::Result<PrometheusMetrics>> = tokio::spawn(async move {
|
||||
Self::process_endpoint(measure.clone(), client.clone(), arc.clone()).await
|
||||
|
||||
});
|
||||
jh_vec.push(jh);
|
||||
}
|
||||
let mut vals = Vec::new();
|
||||
for event in jh_vec {
|
||||
match event.await {
|
||||
Ok(val) => {
|
||||
if let Ok(val) = val {
|
||||
vals.push(val);
|
||||
}
|
||||
},
|
||||
Err(er) => println!("Fatal error on async task: {}", er),
|
||||
}
|
||||
}
|
||||
// dbg!(&vals);
|
||||
// dbg!(&vals.len());
|
||||
Ok(())
|
||||
}
|
||||
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>) -> anyhow::Result<PrometheusMetrics> {
|
||||
let resp = client
|
||||
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
||||
.header("Content-Type", "application/json")
|
||||
.header("access-token", &arc.access_token)
|
||||
.send().await?
|
||||
.text().await?;
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let resp: Value = serde_json::from_str(&resp)?;
|
||||
// let a = Self::extract_metric_data(resp);
|
||||
|
||||
Ok(
|
||||
PrometheusMetrics::new_zvks(Self::extract_metric_data(resp).await?).await
|
||||
)
|
||||
}
|
||||
fn extract_metric_data(json: Value) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutput>>> + Send>> {
|
||||
Box::pin(async move {
|
||||
return match json {
|
||||
Value::Object(obj) => {
|
||||
// let resp: Value = serde_json::from_str(&obj)?;
|
||||
return Ok(vec![Self::process_value(&obj).await?])
|
||||
},
|
||||
Value::Array(arr) => {
|
||||
let mut vec = Vec::new();
|
||||
for obj in arr {
|
||||
if let Ok(mut val) = Self::extract_metric_data(obj).await {
|
||||
// vec.push(val);
|
||||
vec.append(&mut val);
|
||||
}
|
||||
}
|
||||
return Ok(vec)
|
||||
},
|
||||
_ => Err(Error::msg("Invalid JSON format")),
|
||||
}
|
||||
})
|
||||
}
|
||||
async fn process_value(obj : &Map<String, Value>) -> anyhow::Result<MetricOutput> {
|
||||
let id = obj.get("id");
|
||||
let val = obj.get("value");
|
||||
|
||||
if id.is_none() || val.is_none() {
|
||||
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
|
||||
}
|
||||
let id = id.unwrap().as_str().unwrap_or_else(|| "");
|
||||
let val = val.unwrap();
|
||||
|
||||
if id.is_empty() {
|
||||
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
||||
}
|
||||
// pub struct MetricOutput {
|
||||
// pub id : String,
|
||||
// #[serde(rename = "type")]
|
||||
// json_type : String,
|
||||
// addr : String,
|
||||
// pub value : Value,
|
||||
// }
|
||||
|
||||
Ok(MetricOutput {
|
||||
id : id.to_owned(),
|
||||
json_type : match val {
|
||||
Value::Number(val) => {
|
||||
if val.is_i64() {
|
||||
"i64".to_owned()
|
||||
} else if val.is_u64() {
|
||||
"u64".to_owned()
|
||||
} else {
|
||||
"f64".to_owned()
|
||||
}
|
||||
},
|
||||
_ => "unknown".to_owned(),
|
||||
},
|
||||
addr : "enode.monitoring.api".to_owned(),
|
||||
value : val.clone()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -5,5 +5,6 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
anyhow = "1.0.95"
|
||||
chrono = "0.4.40"
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.135"
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
use core::sync;
|
||||
use std::collections::HashMap;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use serde_json::{ to_string_pretty, Value };
|
||||
use anyhow::Result;
|
||||
use std::sync::Arc;
|
||||
use std::fmt::Display;
|
||||
use chrono::{DateTime, Local};
|
||||
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
|
@ -237,8 +238,8 @@ pub mod v3 {
|
|||
pub struct MetricOutput {
|
||||
pub id : String,
|
||||
#[serde(rename = "type")]
|
||||
json_type : String,
|
||||
addr : String,
|
||||
pub json_type : String,
|
||||
pub addr : String,
|
||||
pub value : Value,
|
||||
}
|
||||
impl MetricOutput {
|
||||
|
|
@ -266,6 +267,13 @@ pub mod v3 {
|
|||
metrics: metrics
|
||||
}
|
||||
}
|
||||
pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self {
|
||||
Self {
|
||||
service_name : "zvks".to_owned(),
|
||||
endpoint_name : "apiforsnmp".to_owned(),
|
||||
metrics : metrics,
|
||||
}
|
||||
}
|
||||
pub fn get_bytes_len(&self) -> usize {
|
||||
let str_metrics = serde_json::to_vec(self).unwrap_or_else(
|
||||
|_| Vec::new()
|
||||
|
|
@ -273,4 +281,137 @@ pub mod v3 {
|
|||
str_metrics.len()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub mod enode_monitoring {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ForTokenCredentials {
|
||||
login : String,
|
||||
password : String,
|
||||
pub ts : String,
|
||||
}
|
||||
|
||||
impl ForTokenCredentials {
|
||||
pub fn new(login: &str, pass: &str) -> Self {
|
||||
Self {
|
||||
login : login.to_owned(),
|
||||
password : pass.to_owned(),
|
||||
ts : format!("{}", DateTime::timestamp(&Local::now())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod cmdb {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct Query {
|
||||
id : Vec<String>,
|
||||
data : Data,
|
||||
#[serde(rename = "postQuery")]
|
||||
post_query : String,
|
||||
#[serde(rename = "enableActions")]
|
||||
enable_actions : bool,
|
||||
ts : usize
|
||||
}
|
||||
impl Default for Query {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id : vec!["/measures/device$18".to_owned()],
|
||||
data : Data::default(),
|
||||
post_query : "links".to_owned(),
|
||||
enable_actions : false,
|
||||
ts : 1740060679399
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Serialize)]
|
||||
struct Data {
|
||||
links : Links,
|
||||
fields : Vec<String>,
|
||||
}
|
||||
impl Default for Data {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
links : Links::default(),
|
||||
fields : vec![ "$id".to_owned(), "id".to_owned(), "cls".to_owned(), "name".to_owned()]
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Serialize)]
|
||||
struct Links {
|
||||
flatten : bool,
|
||||
filter : Filter,
|
||||
}
|
||||
impl Default for Links {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
flatten : true,
|
||||
filter : Filter::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Serialize)]
|
||||
struct Filter {
|
||||
cls : String,
|
||||
}
|
||||
impl Default for Filter {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cls : "measure".to_owned()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// "{\"access_token\":\"5BNQsmiGFQRNA651HeQxZekYgYUAWZ4e\",\"role\":\"administrator\",\"startRT\":\"2025-02-25T09:03:27.581Z\",\"login\":\"admin\",\"push_active\":null,\"$id\":\"systemuser$1\"}",
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AuthResponse {
|
||||
pub access_token : String,
|
||||
// role : String,
|
||||
// startRT : String,
|
||||
// login : String,
|
||||
// push_active : Value,
|
||||
// #[serde(rename = "$id")]
|
||||
// id : String,
|
||||
}
|
||||
|
||||
pub trait GenericUrl {
|
||||
fn display(&self) -> String;
|
||||
}
|
||||
|
||||
impl<T> GenericUrl for [T]
|
||||
where T : Display {
|
||||
fn display(&self) -> String {
|
||||
let mut vec: Vec<String> = Vec::new();
|
||||
vec.push("%5B".to_owned());
|
||||
self.iter()
|
||||
.enumerate()
|
||||
.for_each(|(id, val)| {
|
||||
if id > 0 {
|
||||
vec.push(",".to_owned());
|
||||
}
|
||||
vec.push(format!("%22{}%22", val));
|
||||
});
|
||||
vec.push("%5D".to_owned());
|
||||
vec.concat()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_chunk_size(total_measures: usize) -> usize {
|
||||
match total_measures {
|
||||
0..=144 => total_measures,
|
||||
145..=288 => total_measures / 4,
|
||||
289..=432 => total_measures / 5,
|
||||
433..=576 => total_measures / 6,
|
||||
577..=720 => total_measures / 7,
|
||||
721..=864 => total_measures / 8,
|
||||
865..=1008 => total_measures / 9,
|
||||
_ => total_measures / 10,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue