timeout for reqs added

pull/18/head
prplV 2025-03-12 12:58:54 +03:00
parent 330463ea47
commit f238f2ce28
3 changed files with 32 additions and 5 deletions

View File

@ -17,3 +17,6 @@ ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password
# IM configuration for max level of logging info # IM configuration for max level of logging info
# for example DEBUG, INFO, WARN, ERROR, TRACE # for example DEBUG, INFO, WARN, ERROR, TRACE
IM_LOG_INFO = "INFO" IM_LOG_INFO = "INFO"
# IM configuration for setting up API connetion
# timeout (in secs). Default value - 10
IM_CONNECTION_TIMEOUT = "10"

View File

@ -2,6 +2,13 @@
name = "api-grub" name = "api-grub"
version = "1.0.2" version = "1.0.2"
edition = "2021" edition = "2021"
authors = ["Vladislav Drozdov <maseeeeeeeed@gmail.com>"]
description = "API poller for ZVKS project"
homepage = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/api-grub"
repository = "http://git.enode/deployer3000/integration-module/src/branch/master/crates/api-grub"
license = "MIT OR Apache-2.0"
keywords = ["api", "grub", "zvks"]
publish = ["kellnr"]
[dependencies] [dependencies]
serde = { version = "1.0.217", features = ["derive"] } serde = { version = "1.0.217", features = ["derive"] }

View File

@ -13,6 +13,8 @@ use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use std::collections::HashMap; use std::collections::HashMap;
// const IM_CONNECTION_TIMEOUT: String = std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string());
/// # Fn `get_metrics_from_monitoring` /// # Fn `get_metrics_from_monitoring`
/// ///
/// A function to init pulling and exporting metrics mechanism /// A function to init pulling and exporting metrics mechanism
@ -38,7 +40,7 @@ use std::collections::HashMap;
/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(())); /// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(()));
/// ``` /// ```
/// ///
#[tracing::instrument(name = "CM mechanism", skip_all)] #[tracing::instrument(name = "cm_fn_initiator", skip_all)]
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
let timer = tokio::time::Instant::now(); let timer = tokio::time::Instant::now();
@ -94,6 +96,7 @@ pub struct MonitoringImporter {
password : String, password : String,
access_token : String, access_token : String,
ts : String, ts : String,
timeout : usize,
} }
impl MonitoringImporter { impl MonitoringImporter {
@ -115,6 +118,7 @@ impl MonitoringImporter {
password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()), password : env::var("ENODE_MONITORING_PASSWORD").unwrap_or_else(|_| String::new()),
access_token : String::new(), access_token : String::new(),
ts : String::new(), ts : String::new(),
timeout : std::env::var("IM_CONNECTION_TIMEOUT").unwrap_or_else(|_| "10".to_string()).parse().unwrap_or_else(|_| 10)
} }
} }
/// Function that checks is current `MonitoringImporter` valid /// Function that checks is current `MonitoringImporter` valid
@ -140,7 +144,7 @@ impl MonitoringImporter {
/// ///
/// *Also* it saves ts and access-key in it's runtime environment, /// *Also* it saves ts and access-key in it's runtime environment,
/// there's no way to get access-key of session /// there's no way to get access-key of session
#[tracing::instrument(name = "CM-session mechanism", skip_all)] #[tracing::instrument(name = "cm_fn_session_start", skip_all)]
pub async fn start_session(&mut self) -> anyhow::Result<()> { pub async fn start_session(&mut self) -> anyhow::Result<()> {
if !self.is_valid().await { if !self.is_valid().await {
return Err(Error::msg("Invalid eNODE-Monitoring configuration")); return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
@ -153,6 +157,7 @@ impl MonitoringImporter {
loop { loop {
let client = client let client = client
.post(&url) .post(&url)
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.json(&fortoken); .json(&fortoken);
// let resp = client.send().await?; // let resp = client.send().await?;
@ -191,6 +196,7 @@ impl MonitoringImporter {
let url = format!("http://{}/e-cmdb/api/query", self.ip); let url = format!("http://{}/e-cmdb/api/query", self.ip);
let client = client let client = client
.post(url) .post(url)
.timeout(tokio::time::Duration::from_secs(self.timeout as u64))
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.header("access-token", &self.access_token) .header("access-token", &self.access_token)
.json(&Query::default()); .json(&Query::default());
@ -256,7 +262,12 @@ impl MonitoringImporter {
let _permit = permit.acquire().await.unwrap(); let _permit = permit.acquire().await.unwrap();
let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move { let jh: JoinHandle<anyhow::Result<PrometheusMetricsExtended>> = tokio::spawn(async move {
Self::process_endpoint(measure.clone(), client.clone(), arc.clone(), &hm).await Self::process_endpoint(
measure.clone(),
client.clone(),
arc.clone(),
&hm,
).await
}); });
jh_vec.push(jh); jh_vec.push(jh);
@ -289,10 +300,16 @@ impl MonitoringImporter {
/// a slice of measures in special format `%5B%22measure$1%22,%20%22measure$2%22%5D`. /// a slice of measures in special format `%5B%22measure$1%22,%20%22measure$2%22%5D`.
/// This is a neccesary measure to handle two types of requests and URL restrictions /// This is a neccesary measure to handle two types of requests and URL restrictions
/// ///
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> { async fn process_endpoint(
measure: Arc<String>,
client: Arc<Client>,
arc: Arc<Self>,
hm: &HashMap<String, String>,
) -> anyhow::Result<PrometheusMetricsExtended> {
tracing::trace!("Processing CM endpoint with one or more measure names"); tracing::trace!("Processing CM endpoint with one or more measure names");
let resp = client let resp = client
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
.timeout(tokio::time::Duration::from_secs(arc.timeout as u64))
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.header("access-token", &arc.access_token) .header("access-token", &arc.access_token)
.send().await? .send().await?