-dbg +refactor
parent
4b730be85b
commit
5e62e98c6e
|
|
@ -70,34 +70,21 @@ impl MonitoringImporter {
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
||||||
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
||||||
// dbg!(&fortoken);
|
|
||||||
let client = client
|
let client = client
|
||||||
.post(url)
|
.post(url)
|
||||||
.header("Content-Type", "application/json")
|
.header("Content-Type", "application/json")
|
||||||
.json(&fortoken);
|
.json(&fortoken);
|
||||||
let resp = client.send().await?;
|
let resp = client.send().await?;
|
||||||
let auth = resp.json::<AuthResponse>().await?;
|
let auth = resp.json::<AuthResponse>().await?;
|
||||||
// dbg!(&auth);
|
|
||||||
self.set_ts(&fortoken.ts).await;
|
self.set_ts(&fortoken.ts).await;
|
||||||
|
|
||||||
self.access_token = auth.access_token.to_owned();
|
self.access_token = auth.access_token.to_owned();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
// [ DEPRECATED ]
|
|
||||||
// pub async fn close_session(&mut self) -> anyhow::Result<()> {
|
|
||||||
// let client = Client::new();
|
|
||||||
// let url = format!("http://{}/e-data-front/auth/logout", self.ip);
|
|
||||||
// let client = client
|
|
||||||
// .post(url)
|
|
||||||
// .header("Content-Type", "application/json")
|
|
||||||
// .header("access-token", &self.access_token);
|
|
||||||
|
|
||||||
// let _ = client.send().await?;
|
|
||||||
|
|
||||||
// self.access_token.clear();
|
|
||||||
// Ok(())
|
|
||||||
// }
|
|
||||||
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let mut vec: Vec<(String, String)> = Vec::new();
|
let mut vec: Vec<(String, String)> = Vec::new();
|
||||||
|
|
@ -108,7 +95,6 @@ impl MonitoringImporter {
|
||||||
.header("access-token", &self.access_token)
|
.header("access-token", &self.access_token)
|
||||||
.json(&Query::default());
|
.json(&Query::default());
|
||||||
let resp = client.send().await?.text().await?;
|
let resp = client.send().await?.text().await?;
|
||||||
// dbg!(&resp.text().await);
|
|
||||||
let resp: Value = serde_json::from_str(&resp)?;
|
let resp: Value = serde_json::from_str(&resp)?;
|
||||||
if let Some(arr) = resp.as_array() {
|
if let Some(arr) = resp.as_array() {
|
||||||
for measure in arr {
|
for measure in arr {
|
||||||
|
|
@ -123,11 +109,9 @@ impl MonitoringImporter {
|
||||||
if cls.is_empty() {
|
if cls.is_empty() {
|
||||||
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
return Err(Error::msg("Invalid JSON in response. Wrong types of fields (`id` or `cls`)"));
|
||||||
}
|
}
|
||||||
// let measure_name = format!("{}${}", cls, id);
|
|
||||||
vec.push((format!("{}${}", cls, id), name.to_string()));
|
vec.push((format!("{}${}", cls, id), name.to_string()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// dbg!(vec);
|
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::msg("Invalid JSON in response"));
|
return Err(Error::msg("Invalid JSON in response"));
|
||||||
}
|
}
|
||||||
|
|
@ -143,9 +127,6 @@ impl MonitoringImporter {
|
||||||
let client = Arc::new(Client::new());
|
let client = Arc::new(Client::new());
|
||||||
let measures = measures.clone();
|
let measures = measures.clone();
|
||||||
let arc = Arc::new(self.clone());
|
let arc = Arc::new(self.clone());
|
||||||
// dbg!(&measures.display());
|
|
||||||
|
|
||||||
// dbg!(&measures.len());
|
|
||||||
let chunk_size = get_chunk_size(measures.len());
|
let chunk_size = get_chunk_size(measures.len());
|
||||||
info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size);
|
info!("List of measures was divided by chunks with len {}, preparing for {} requests ...", chunk_size, measures.len() / chunk_size);
|
||||||
|
|
||||||
|
|
@ -163,26 +144,20 @@ impl MonitoringImporter {
|
||||||
});
|
});
|
||||||
jh_vec.push(jh);
|
jh_vec.push(jh);
|
||||||
}
|
}
|
||||||
// let mut vals = Vec::new();
|
|
||||||
let mut counter = 0;
|
|
||||||
for event in jh_vec {
|
for event in jh_vec {
|
||||||
match event.await {
|
match event.await {
|
||||||
Ok(val) => {
|
Ok(val) => {
|
||||||
if let Ok(val) = val {
|
if let Ok(val) = val {
|
||||||
counter+=1;
|
|
||||||
match crate::export::Exporter::export_extended_metrics(val).await {
|
match crate::export::Exporter::export_extended_metrics(val).await {
|
||||||
Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)},
|
Ok(bytes) => {info!("Successfully transmitted {} bytes to the Prometehus exporter", bytes)},
|
||||||
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
|
Err(er) => error!("Cannot export data to the Prometehus exporter due to : `{}`", er),
|
||||||
}
|
}
|
||||||
// vals.push(val);
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(er) => println!("Fatal error on async task: {}", er),
|
Err(er) => println!("Fatal error on async task: {}", er),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("Total preproccessed metrics - {}", counter);
|
|
||||||
// dbg!(&vals);
|
|
||||||
// dbg!(&vals.len());
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
|
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
|
||||||
|
|
@ -195,20 +170,14 @@ impl MonitoringImporter {
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
let resp: Value = serde_json::from_str(&resp)?;
|
let resp: Value = serde_json::from_str(&resp)?;
|
||||||
// let a = Self::extract_metric_data(resp);
|
|
||||||
// dbg!(&resp);
|
|
||||||
Ok(
|
Ok(
|
||||||
PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
|
PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
||||||
// dbg!(hm);
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
dbg!(&json);
|
|
||||||
return match json {
|
return match json {
|
||||||
Value::Object(obj) => {
|
Value::Object(obj) => {
|
||||||
// let resp: Value = serde_json::from_str(&obj)?;
|
|
||||||
// let desc =
|
|
||||||
return Ok(vec![Self::process_value(&obj, hm).await?])
|
return Ok(vec![Self::process_value(&obj, hm).await?])
|
||||||
},
|
},
|
||||||
Value::Array(arr) => {
|
Value::Array(arr) => {
|
||||||
|
|
@ -219,9 +188,6 @@ impl MonitoringImporter {
|
||||||
vec.append(&mut val);
|
vec.append(&mut val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// dbg!(&vec);
|
|
||||||
// dbg!(vec.len());
|
|
||||||
// todo!();
|
|
||||||
return Ok(vec)
|
return Ok(vec)
|
||||||
},
|
},
|
||||||
_ => Err(Error::msg("Invalid JSON format")),
|
_ => Err(Error::msg("Invalid JSON format")),
|
||||||
|
|
@ -229,9 +195,6 @@ impl MonitoringImporter {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
||||||
// dbg!(obj);
|
|
||||||
// dbg!(hm);
|
|
||||||
// todo!();
|
|
||||||
let id = obj.get("$id");
|
let id = obj.get("$id");
|
||||||
let val = obj.get("value");
|
let val = obj.get("value");
|
||||||
let description = {
|
let description = {
|
||||||
|
|
@ -246,7 +209,6 @@ impl MonitoringImporter {
|
||||||
.unwrap_or_else(|| &zero)
|
.unwrap_or_else(|| &zero)
|
||||||
.to_owned()
|
.to_owned()
|
||||||
}
|
}
|
||||||
// dbg!(dola_ip)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if id.is_none() || val.is_none() {
|
if id.is_none() || val.is_none() {
|
||||||
|
|
@ -258,8 +220,6 @@ impl MonitoringImporter {
|
||||||
if id.is_empty() {
|
if id.is_empty() {
|
||||||
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
return Err(Error::msg("Empty `id` field. Invalid JSON response"))
|
||||||
}
|
}
|
||||||
// dbg!(obj);
|
|
||||||
// todo!();
|
|
||||||
|
|
||||||
Ok(MetricOutputExtended {
|
Ok(MetricOutputExtended {
|
||||||
id : id.to_owned(),
|
id : id.to_owned(),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue