Compare commits

..

7 Commits

Author SHA1 Message Date
deployer3000 03c9a12ffa Merge pull request 'rc' (#35) from rc into master 2025-05-20 12:30:28 +03:00
Vladislav Drozdov f4b560a454 Merge pull request 'feature/160' (#34) from feature/160 into rc
test-org/integration-module/pipeline/pr-master Build started... Details
Reviewed-on: http://git.enode/deployer3000/integration-module/pulls/34
Reviewed-by: DmitriyA <faleo1999@mail.ru>
2025-05-20 12:26:32 +03:00
prplV adb1ba4c07 refactor
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-05-20 04:39:50 -04:00
prplV 774a517def final fixes 2025-05-20 04:37:09 -04:00
prplV 3cca316978 Merge branch 'master' into feature/160 2025-05-20 03:14:26 -04:00
prplV 05b173408e -comments 2025-05-20 03:10:11 -04:00
prplV 7c7db5e510 enode big update 2025-05-20 03:09:59 -04:00
6 changed files with 189 additions and 67 deletions

View File

@ -20,6 +20,7 @@ STATUS_SYSTEM_URL = "http://192.168.2.39:9999/api/input"
ENODE_MONITORING_IP = "ip.ip.ip.ip" ENODE_MONITORING_IP = "ip.ip.ip.ip"
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
ENODE_TARGET_DEVICES = "device$18,device$19"
# IM configuration for max level of logging info # IM configuration for max level of logging info
# for example DEBUG, INFO, WARN, ERROR, TRACE # for example DEBUG, INFO, WARN, ERROR, TRACE

View File

@ -183,7 +183,7 @@ impl Requester {
error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er); error!("Error casting jitter value from participant {} (id: {}), conference - {} (id: {}). Error: {}", name, id, conf_name, conf_id, er);
Value::Null Value::Null
}); });
metrics.add(MetricOutputExtended::new_with_slices(&metric_id, "int", "Vinteo native", &desc, val)); metrics.add(MetricOutputExtended::new_with_slices(&metric_id, &metric_id,"int", "Vinteo native", &desc, None, None,val));
}); });
}); });
metrics metrics

View File

@ -4,7 +4,7 @@ use serde_json::{Map, Value};
use reqwest::Client; use reqwest::Client;
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use std::sync::Arc; use std::sync::Arc;
use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, GenericUrl, LazyUnzip, get_chunk_size}; use integr_structs::api::enode_monitoring::{AuthResponse, ForTokenCredentials, get_chunk_size};
use integr_structs::api::enode_monitoring::cmdb::Query; use integr_structs::api::enode_monitoring::cmdb::Query;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use std::pin::Pin; use std::pin::Pin;
@ -13,7 +13,99 @@ use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use std::collections::HashMap; use std::collections::HashMap;
// const IM_CONNECTION_TIMEOUT: String = std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()); trait AsDeviceRequest {
fn as_devices(self) -> Vec<String>;
}
trait IntoEnodeRequset {
fn into_enode_request(self) -> String;
}
impl AsDeviceRequest for Vec<String> {
fn as_devices(mut self) -> Vec<String> {
self.iter_mut()
.for_each(|dev| *dev = format!("/measures/device${}", dev));
self
}
}
#[derive(Debug)]
struct MetricInstance {
dola_id : String,
name : String,
desc : String,
device : String,
source : String,
}
impl IntoEnodeRequset for &[MetricInstance] {
fn into_enode_request(self) -> String {
let mut vec: Vec<String> = Vec::new();
vec.push("%5B".to_owned());
self.iter()
.enumerate()
.for_each(|(id, val)| {
if id > 0 {
vec.push(",".to_owned());
}
vec.push(format!("%22{}%22", val.dola_id));
});
vec.push("%5D".to_owned());
vec.concat()
}
}
#[derive(Debug)]
struct MetricMeta {
name : String,
desc : String,
device : String,
source : String,
}
impl Default for MetricMeta {
fn default() -> Self {
Self {
name : String::new(),
desc : String::new(),
device : String::new(),
source : String::new(),
}
}
}
#[allow(private_interfaces)]
pub trait LazyUnzipInstance {
fn lazy_unzip(&self) -> HashMap<String, MetricMeta>;
}
impl LazyUnzipInstance for &[MetricInstance] {
fn lazy_unzip(&self) -> HashMap<String, MetricMeta> {
self.iter().map(|obj| (obj.dola_id.to_string(), MetricMeta::new(&obj.name, &obj.desc, &obj.device, &obj.source))).collect()
}
}
impl MetricInstance {
fn new(id : &str, name : &str, desc : &str, device : &str, source : &str) -> Self {
Self {
dola_id : id.to_owned(),
name : name.to_owned(),
desc : desc.to_owned(),
device : device.to_owned(),
source : source.to_owned(),
}
}
}
impl MetricMeta {
fn new(name : &str, desc : &str, device : &str, source : &str) -> Self {
Self {
name : name.to_owned(),
desc : desc.to_owned(),
device : device.to_owned(),
source : source.to_owned(),
}
}
}
/// # Fn `get_metrics_from_monitoring` /// # Fn `get_metrics_from_monitoring`
/// ///
@ -185,32 +277,63 @@ impl MonitoringImporter {
/// , where `(String, String)` is a tuple of measure `id` and `description` /// , where `(String, String)` is a tuple of measure `id` and `description`
/// (`name`) /// (`name`)
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)] #[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> { pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<MetricInstance>> {
tracing::trace!("Trying ti get measures list from CM ..."); tracing::trace!("Trying ti get measures list from CM ...");
let client = Client::new(); let client = Client::new();
let mut vec: Vec<(String, String)> = Vec::new(); let mut vec: Vec<MetricInstance> = Vec::new();
let url = format!("http://{}/e-cmdb/api/query", self.ip); let url = format!("http://{}/e-cmdb/api/query", self.ip);
let id_list = {
match std::env::var("ENODE_TARGET_DEVICES") {
Err(_) => vec![String::from("18"), String::from("19")],
Ok(var) => var.split(',').into_iter().map(|st| st.trim().to_string()).collect::<Vec<String>>(),
}
};
let list_of_devices = id_list.clone().as_devices();
let client = client let client = client
.post(url) .post(url)
.timeout(tokio::time::Duration::from_secs(self.timeout as u64)) .timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.bearer_auth(&self.access_token) .bearer_auth(&self.access_token)
.json(&Query::default()); .json(&Query::device_oriented(list_of_devices));
let resp = client.send().await?.text().await?; let resp = client.send().await?.text().await?;
let resp: Value = serde_json::from_str(&resp)?; let resp: Value = serde_json::from_str(&resp)?;
if let Some(arr) = resp.as_array() { if let Some(arr) = resp.as_array() {
for measure in arr { for device in arr {
let id = measure.get("id"); let device_id = {
let cls = measure.get("cls"); match device.get("name") {
let name = measure.get("name"); Some(name) => {
if id.is_some() && cls.is_some() { match serde_json::to_string(name) {
let id = id.unwrap().as_i64().unwrap_or_default(); Ok(name) => {
let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); name.split('$').last().unwrap_or_else(|| "undefined-device").to_owned()
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null"); },
if cls.is_empty() { Err(_) => "undefined-device".to_string(),
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)")); }
},
None => "undefined-device".to_string(),
}
};
let device_id = device_id.trim_end_matches('"');
if let Some(links) = device.get("links") {
if let Some(measures) = links.as_array() {
for measure in measures.iter() {
let dola_id = measure.get("id");
let id = measure.get("measure_id");
let source = measure.get("source_id");
let desc = measure.get("name");
if id.is_some() && source.is_some() && dola_id.is_some() {
let dola_id = format!("measure${}", dola_id.unwrap().as_i64().unwrap_or_else(|| 0));
let id = id.unwrap().as_str().unwrap_or_else(|| "no-name");
let source = source.unwrap().as_str().unwrap_or_else(|| "no-source");
let desc = desc.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "no description");
if source.is_empty() {
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`measure_id` or `source_id`)"));
}
vec.push(MetricInstance::new(&dola_id, id, desc, device_id.as_ref(), source));
}
}
} }
vec.push((format!("{}${}", cls, id), name.to_string()));
} }
} }
} else { } else {
@ -236,8 +359,8 @@ impl MonitoringImporter {
/// exprots all data by itselfs /// exprots all data by itselfs
/// ///
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)] #[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> { pub async fn get_measure_info(&self, measures: Arc<Vec<MetricInstance>>) -> anyhow::Result<()> {
tracing::trace!("Trying ti get info about each measure ..."); tracing::trace!("Trying to get info about each measure ...");
let mut sys = sysinfo::System::new(); let mut sys = sysinfo::System::new();
sys.refresh_cpu_all(); sys.refresh_cpu_all();
// adaptive permition on task spawm to prevent system overload // adaptive permition on task spawm to prevent system overload
@ -254,7 +377,7 @@ impl MonitoringImporter {
let arc = arc.clone(); let arc = arc.clone();
let client = client.clone(); let client = client.clone();
let hm = measure.lazy_unzip(); let hm = measure.lazy_unzip();
let measure = Arc::new(measure.display()); let measure = Arc::new(measure.into_enode_request());
let _permit = permit.acquire().await.unwrap(); let _permit = permit.acquire().await.unwrap();
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move { let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
@ -300,7 +423,7 @@ impl MonitoringImporter {
measure: Arc<String>, measure: Arc<String>,
client: Arc<Client>, client: Arc<Client>,
arc: Arc<Self>, arc: Arc<Self>,
hm: &HashMap<String, String>, hm: &HashMap<String, MetricMeta>,
) -> anyhow::Result<PrometheusMetricsExtended> { ) -> anyhow::Result<PrometheusMetricsExtended> {
tracing::trace!("Processing CM endpoint with one or more measure names"); tracing::trace!("Processing CM endpoint with one or more measure names");
let resp = client let resp = client
@ -332,7 +455,7 @@ impl MonitoringImporter {
/// ///
/// 3) if `Value` is `_` -> returns error **Invalid JSON format** /// 3) if `Value` is `_` -> returns error **Invalid JSON format**
/// ///
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> { fn extract_metric_data(json: Value, hm: &HashMap<String, MetricMeta>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
Box::pin(async move { Box::pin(async move {
return match json { return match json {
Value::Object(obj) => { Value::Object(obj) => {
@ -357,53 +480,48 @@ impl MonitoringImporter {
/// ///
/// Searches for certain fields and aggregates it in the `MetricOutputExtended` /// Searches for certain fields and aggregates it in the `MetricOutputExtended`
/// object /// object
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> { async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, MetricMeta>) -> anyhow::Result<MetricOutputExtended> {
tracing::trace!("Processing atomic Object value in CM JSON-response"); tracing::trace!("Processing atomic Object value in CM JSON-response");
let id = obj.get("$id"); let id = obj.get("$id");
let val = obj.get("value"); let val = obj.get("value");
let description = {
let dola_ip = obj.get("$id").unwrap_or_else(|| &Value::Null);
let zero = String::new();
if dola_ip.is_null() {
zero
} else {
hm.get(
dola_ip.as_str().unwrap_or_else(|| "")
)
.unwrap_or_else(|| &zero)
.to_owned()
}
};
if id.is_none() || val.is_none() { if id.is_none() || val.is_none() {
return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response")) return Err(Error::msg("Cannot get values of fields `id` and `value` from JSON Response"))
} }
let id = id.unwrap().as_str().unwrap_or_else(|| "").replace("$", "_");
let id = id.unwrap().as_str().unwrap_or_else(|| "");
let default_meta = MetricMeta::default();
let meta = hm.get(id).unwrap_or_else(|| &default_meta);
let id = id.replace("$", "_");
let val = val.unwrap(); let val = val.unwrap();
let device = meta.device.parse::<usize>().unwrap_or_else(|_| 0);
if id.is_empty() { if id.is_empty() {
return Err(Error::msg("Empty `id` field. Invalid JSON response")) return Err(Error::msg("Empty `id` field. Invalid JSON response"))
} }
Ok(MetricOutputExtended::new_with_slices(
Ok(MetricOutputExtended { id.as_ref(),
id : id.to_owned(), &meta.name,
json_type : match val { {
match val {
Value::Number(val) => { Value::Number(val) => {
if val.is_i64() { if val.is_i64() {
"i64".to_owned() "i64"
} else if val.is_u64() { } else if val.is_u64() {
"u64".to_owned() "u64"
} else { } else {
"f64".to_owned() "f64"
} }
}, },
_ => "unknown".to_owned(), _ => "unknown",
}
}, },
addr : "enode.monitoring.api".to_owned(), "enode.monitoring.api",
desc : description, &meta.desc,
value : val.clone(), Some(device),
status: 0, Some(meta.source.clone()),
}) val.clone(),
))
} }
} }

View File

@ -97,9 +97,8 @@ impl<'a> ApiPoll<'a> {
error!("Bad JSON in response. Error: {}", er); error!("Bad JSON in response. Error: {}", er);
}, },
Ok(_) => { Ok(_) => {
let endpoint_name = &metrics.name;
let preproc = JsonParser::parse(&metrics.measure, &response); let preproc = JsonParser::parse(&metrics.measure, &response);
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); let preproc = PrometheusMetrics::new(&service_id, preproc);
match Exporter::export_metrics(preproc).await { match Exporter::export_metrics(preproc).await {
Ok(bytes) => { Ok(bytes) => {
info!("Successfully exported {} bytes of metrics data to Prometheus", bytes); info!("Successfully exported {} bytes of metrics data to Prometheus", bytes);

View File

@ -12,5 +12,6 @@ publish = ["kellnr"]
[dependencies] [dependencies]
anyhow = "1.0.95" anyhow = "1.0.95"
chrono = "0.4.40" chrono = "0.4.40"
dotenv = "0.15.0"
serde = { version = "1.0.217", features = ["derive"] } serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.135" serde_json = "1.0.135"

View File

@ -265,6 +265,7 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct MetricOutputExtended { pub struct MetricOutputExtended {
pub id : String, pub id : String,
pub name : String,
#[serde(rename = "type")] #[serde(rename = "type")]
pub json_type : String, pub json_type : String,
pub addr : String, pub addr : String,
@ -272,15 +273,20 @@ pub mod v3 {
#[serde(rename = "description")] #[serde(rename = "description")]
pub desc : String, pub desc : String,
pub status: usize, pub status: usize,
pub device: Option<usize>,
pub source: Option<String>,
} }
impl MetricOutputExtended { impl MetricOutputExtended {
pub fn new_with_slices(id : &str, json_type : &str, addr: &str, desc : &str, value : Value) -> Self { pub fn new_with_slices(id : &str, name: &str, json_type : &str, addr: &str, desc : &str, device: Option<usize>, source: Option<String>, value : Value) -> Self {
MetricOutputExtended { MetricOutputExtended {
id : id.to_string(), id : id.to_string(),
name : name.to_string(),
json_type : json_type.to_string(), json_type : json_type.to_string(),
addr : addr.to_string(), addr : addr.to_string(),
value : value, value : value,
desc : desc.to_string(), desc : desc.to_string(),
device,
source,
status: 0, status: 0,
} }
} }
@ -289,21 +295,21 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetrics { pub struct PrometheusMetrics {
pub service_name: String, pub service_name: String,
pub endpoint_name: String, // pub endpoint_name: String,
pub metrics: Vec<MetricOutput>, pub metrics: Vec<MetricOutput>,
} }
impl PrometheusMetrics { impl PrometheusMetrics {
pub fn new(service: &str, endpoint: &str, metrics: Vec<MetricOutput>) -> Self { pub fn new(service: &str, metrics: Vec<MetricOutput>) -> Self {
Self { Self {
service_name: service.to_string(), service_name: service.to_string(),
endpoint_name: endpoint.to_string(), // endpoint_name: endpoint.to_string(),
metrics: metrics metrics: metrics
} }
} }
pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self { pub async fn new_zvks(metrics: Vec<MetricOutput>) -> Self {
Self { Self {
service_name : "zvks".to_owned(), service_name : "zvks".to_owned(),
endpoint_name : "apiforsnmp".to_owned(), // endpoint_name : "apiforsnmp".to_owned(),
metrics : metrics, metrics : metrics,
} }
} }
@ -317,14 +323,12 @@ pub mod v3 {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct PrometheusMetricsExtended { pub struct PrometheusMetricsExtended {
pub service_name: String, pub service_name: String,
pub endpoint_name: String,
pub metrics: Vec<MetricOutputExtended>, pub metrics: Vec<MetricOutputExtended>,
} }
impl PrometheusMetricsExtended { impl PrometheusMetricsExtended {
pub fn new_empty_jitter() -> Self { pub fn new_empty_jitter() -> Self {
Self { Self {
service_name : "zvks".to_owned(), service_name : "zvks".to_owned(),
endpoint_name : "jitter".to_owned(),
metrics : Vec::new(), metrics : Vec::new(),
} }
} }
@ -334,7 +338,6 @@ pub mod v3 {
pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self { pub async fn new_zvks(metrics: Vec<MetricOutputExtended>) -> Self {
Self { Self {
service_name : "zvks".to_owned(), service_name : "zvks".to_owned(),
endpoint_name : "apiforsnmp".to_owned(),
metrics : metrics, metrics : metrics,
} }
} }
@ -377,18 +380,18 @@ pub mod enode_monitoring {
pub struct Query { pub struct Query {
id : Vec<String>, id : Vec<String>,
data : Data, data : Data,
#[serde(rename = "postQuery")] // #[serde(rename = "postQuery")]
post_query : String, // post_query : String,
#[serde(rename = "enableActions")] #[serde(rename = "enableActions")]
enable_actions : bool, enable_actions : bool,
ts : usize ts : usize
} }
impl Default for Query { impl Query {
fn default() -> Self { pub fn device_oriented(devices: Vec<String>) -> Self {
Self { Self {
id : vec!["/measures/device$18".to_owned(), "/measures/device$19".to_owned()], id : devices,
data : Data::default(), data : Data::default(),
post_query : "links".to_owned(), // post_query : "links".to_owned(),
enable_actions : false, enable_actions : false,
ts : 1740060679399 ts : 1740060679399
} }
@ -416,7 +419,7 @@ pub mod enode_monitoring {
fn default() -> Self { fn default() -> Self {
Self { Self {
flatten : true, flatten : true,
filter : Filter::default() filter : Filter::default(),
} }
} }
} }