api grub set up
parent
fe7aaaaef3
commit
32ab234236
|
|
@ -6,7 +6,7 @@
|
||||||
"pass" : "",
|
"pass" : "",
|
||||||
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
"api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711",
|
||||||
"period" : "",
|
"period" : "",
|
||||||
"timeout" : "10",
|
"timeout" : "5",
|
||||||
"metrics" : [
|
"metrics" : [
|
||||||
{
|
{
|
||||||
"name": "conferences",
|
"name": "conferences",
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ impl Exporter {
|
||||||
let _ = client.query(&query, &[&metrics]).await?;
|
let _ = client.query(&query, &[&metrics]).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn export_metrics(metrics: &str) -> Result<()> {
|
pub async fn export_metrics(metrics: &str) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
// let req = Request::new(Method::PUT,
|
// let req = Request::new(Method::PUT,
|
||||||
// Url::parse(metrics)?);
|
// Url::parse(metrics)?);
|
||||||
|
|
@ -60,7 +60,7 @@ impl Exporter {
|
||||||
.json(metrics)
|
.json(metrics)
|
||||||
.send().await;
|
.send().await;
|
||||||
req?;
|
req?;
|
||||||
Ok(())
|
Ok(metrics.as_bytes().len())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -12,7 +12,7 @@ use tokio::task::JoinHandle;
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use crate::json::JsonParser;
|
use crate::json::JsonParser;
|
||||||
use crate::export::{self, Exporter};
|
use crate::export::{self, Exporter};
|
||||||
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics};
|
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
|
||||||
|
|
||||||
// type BufferType = Arc<Mutex<Vec<String>>>;
|
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||||
|
|
||||||
|
|
@ -139,15 +139,24 @@ impl<'a> ApiPoll<'a> {
|
||||||
let endpoint_name = &metrics.name;
|
let endpoint_name = &metrics.name;
|
||||||
let preproc = JsonParser::parse(&metrics.measure, &response);
|
let preproc = JsonParser::parse(&metrics.measure, &response);
|
||||||
// dbg!(serde_json::to_string_pretty(&preproc));
|
// dbg!(serde_json::to_string_pretty(&preproc));
|
||||||
|
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
||||||
|
|
||||||
|
|
||||||
let metrics = serde_json::to_string_pretty(&preproc)
|
let metrics = serde_json::to_string_pretty(&preproc)
|
||||||
.unwrap_or_else(|_| {
|
.unwrap_or_else(|_| {
|
||||||
error!("Cannot parse grabbed metrics data to String");
|
error!("Cannot parse grabbed metrics data to String");
|
||||||
String::from(r#""value" : null"#)
|
String::from(r#"
|
||||||
|
{
|
||||||
|
"service_name" : null,
|
||||||
|
"endpoint_name" : null,
|
||||||
|
"value" : null
|
||||||
|
}
|
||||||
|
"#)
|
||||||
});
|
});
|
||||||
|
println!("{}", &metrics);
|
||||||
match Exporter::export_metrics(&metrics).await {
|
match Exporter::export_metrics(&metrics).await {
|
||||||
Ok(_) => {
|
Ok(bytes) => {
|
||||||
info!("Successfully imported metrics data to Prometheus");
|
info!("Successfully imported {} bytes of metrics data to Prometheus", bytes);
|
||||||
},
|
},
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
error!("Failed to export data to Prometheus due to {}", er);
|
error!("Failed to export data to Prometheus due to {}", er);
|
||||||
|
|
@ -184,7 +193,6 @@ impl<'a> ApiPoll<'a> {
|
||||||
let mut jh = Vec::<JoinHandle::<Result<()>>>::new();
|
let mut jh = Vec::<JoinHandle::<Result<()>>>::new();
|
||||||
|
|
||||||
for idx in 0..metrics.len() {
|
for idx in 0..metrics.len() {
|
||||||
// let exporter = exporter.clone();
|
|
||||||
let creds = creds.clone();
|
let creds = creds.clone();
|
||||||
let metrics = metrics.clone();
|
let metrics = metrics.clone();
|
||||||
let service_id = service_id.clone();
|
let service_id = service_id.clone();
|
||||||
|
|
@ -193,7 +201,6 @@ impl<'a> ApiPoll<'a> {
|
||||||
service_id.clone(),
|
service_id.clone(),
|
||||||
metrics[idx].clone().into(),
|
metrics[idx].clone().into(),
|
||||||
creds.clone(),
|
creds.clone(),
|
||||||
// exporter.clone()
|
|
||||||
).await
|
).await
|
||||||
});
|
});
|
||||||
jh.push(event);
|
jh.push(event);
|
||||||
|
|
@ -239,82 +246,6 @@ impl<'a> ApiPoll<'a> {
|
||||||
for i in join_handles {
|
for i in join_handles {
|
||||||
let _ = i.await;
|
let _ = i.await;
|
||||||
}
|
}
|
||||||
// let template = Arc::new(self.config.template.clone());
|
|
||||||
|
|
||||||
// if self.is_default().await { return Err(Error::msg("Default config with no endpoints")) }
|
|
||||||
|
|
||||||
// // TODO: rewrite nextly to async
|
|
||||||
// for point in template.iter() {
|
|
||||||
// let point = Arc::new(point.clone());
|
|
||||||
// // let buffer = buffer.clone();
|
|
||||||
// let client = client.clone();
|
|
||||||
// let exporter = exporter.clone();
|
|
||||||
// let endpoint_processer = tokio::spawn(async move {
|
|
||||||
// let point = point.clone();
|
|
||||||
// match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
|
|
||||||
// Ok(resp) => {
|
|
||||||
// if !resp.status().is_success() {
|
|
||||||
// error!("ErrorCode in Response from API. Check configuration");
|
|
||||||
// return Err(Error::msg("Error during sending request"));
|
|
||||||
// }
|
|
||||||
// if let Ok(text) = resp.text().await {
|
|
||||||
// //
|
|
||||||
// let metrics = ProcessedEndpoint::from_target_response(&text, &point)?;
|
|
||||||
// // dbg!(&metrics);
|
|
||||||
// println!("{}", &metrics);
|
|
||||||
// //
|
|
||||||
// if let Some(conn) = exporter.get_connection_from_pool().await {
|
|
||||||
|
|
||||||
// // TEST: to exporter
|
|
||||||
// let res = client.request(
|
|
||||||
// RestMethod::from_str("post").await,
|
|
||||||
// "http://192.168.2.34:9101/update")
|
|
||||||
// .json(&metrics)
|
|
||||||
// .send().await;
|
|
||||||
// if let Err(er) = res {
|
|
||||||
// error!("Cannot send data to exporter due to: {}", er);
|
|
||||||
// } else {
|
|
||||||
// println!("{:?}", res.unwrap().text().await);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if let Err(er) = Exporter::export_data(conn, &metrics).await {
|
|
||||||
// error!("Cannot export data to DB during to: {}", er);
|
|
||||||
// return Err(Error::msg("Error during exporting data to DB"));
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// if !exporter.is_no_connection() {
|
|
||||||
// return Err(Error::msg("Error during getting connection from pool"));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // let mut buffer = buffer.lock().await;
|
|
||||||
// // buffer.push(text);
|
|
||||||
// } else {
|
|
||||||
// error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
|
|
||||||
// return Err(Error::msg("Error with extracting text field from Response"));
|
|
||||||
// }
|
|
||||||
// },
|
|
||||||
// Err(_) => {
|
|
||||||
// error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url);
|
|
||||||
// return Err(Error::msg("Endpoint is unreachable"));
|
|
||||||
// },
|
|
||||||
// }
|
|
||||||
// Ok(())
|
|
||||||
// });
|
|
||||||
// join_handles.push(endpoint_processer);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// for i in join_handles {
|
|
||||||
// let _ = i.await;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // let buffer = buffer.lock().await;
|
|
||||||
// // match &buffer.len() {
|
|
||||||
// // 0 => Err(Error::msg("Error due to API grubbing. Check config" )),
|
|
||||||
// // _ => {
|
|
||||||
// // Ok(())
|
|
||||||
// // },
|
|
||||||
// // }
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -258,5 +258,13 @@ pub mod v3 {
|
||||||
endpoint_name: String,
|
endpoint_name: String,
|
||||||
metrics: Value,
|
metrics: Value,
|
||||||
}
|
}
|
||||||
|
impl PrometheusMetrics {
|
||||||
|
pub fn new(service: &str, endpoint: &str, metrics: Value) -> Self {
|
||||||
|
Self {
|
||||||
|
service_name: service.to_string(),
|
||||||
|
endpoint_name: endpoint.to_string(),
|
||||||
|
metrics: metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue