integration-module/crates/api-grub/src/net.rs

370 lines
14 KiB
Rust

// module to handle unix-socket connection + pulling info from api
use anyhow::{Error, Result};
use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
use log::{error, info};
use serde_json::Value;
use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration};
use reqwest::{Client, Method, RequestBuilder};
use std::sync::Arc;
use tokio::task::JoinHandle;
// use tokio::sync::Mutex;
use dotenv::dotenv;
use crate::json::JsonParser;
use crate::export::{self, Exporter};
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics};
// type BufferType = Arc<Mutex<Vec<String>>>;
// for api info pulling
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
info!("Initializing API-info grubbing mechanism :");
info!("1) Loading vars from .env file if exists...");
let _ = dotenv().ok();
let mut config = config;
let mut poller = ApiPoll::new(&mut config).await;
info!("2) Api-Poller has initialized");
let client = Exporter::init();
info!("3) Exporter has initialized");
let shared_pool = Arc::new(client);
loop {
if poller.is_default().await {
sleep(Duration::from_secs(5)).await;
} else {
if rx.len() > 0 {
if let Some(conf) = rx.recv().await {
poller.change_config(conf).await;
info!("Config changed");
}
}
let shared_pool = shared_pool.clone();
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
// sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
}
}
// Ok(())
}
struct RestMethod;
impl RestMethod {
pub async fn from_str(method: &str) -> Method {
return match method.trim().to_lowercase().as_str() {
"post" => Method::POST,
"patch" => Method::PATCH,
"put" => Method::PUT,
"delete" => Method::DELETE,
"head" => Method::HEAD,
"trace" => Method::TRACE,
"options" => Method::OPTIONS,
"connect" => Method::CONNECT,
"get" | _ => Method::GET
}
}
}
struct ApiPoll<'a> {
config : &'a mut Config,
client : Client,
}
impl<'a> ApiPoll<'a> {
pub async fn new(poll_cfg : &'a mut Config) -> Self {
Self {
config : poll_cfg,
client : Client::new(),
}
}
// can be weak and with bug test needed
pub async fn change_config(&mut self, conf: Config) {
*self.config = conf;
}
pub async fn is_default(&self) -> bool {
self.config.is_default().await
}
// pub async fn get_delay(&self) -> u32 {
// self.config.timeout
// }
pub async fn process_metrics(
service_id: Arc<String>,
metrics: Arc<Metrics>,
creds: Credentials,
// exporter: Arc<Exporter>
) -> Result<()> {
// processing metrics
// let mut req = Client::new()
// // .user_agent("api_grub/integration_module")
// .get(&metrics.url);
let client = Client::builder()
.user_agent("api_grub/integration_module");
let mut req = client.build().unwrap().get(&metrics.url);
let login = creds.endpoint.login.clone();
let password = creds.endpoint.password.clone();
let api_key = creds.endpoint.api_key.clone();
// if !login.is_empty() && !password.is_empty() {
// dbg!("kjgbkasgksjd");
// req = req.basic_auth(login, Some(password));
// }
if !api_key.is_empty() {
// req = req.bearer_auth(&api_key);
// req = req.header("authorization", "bearer ");
req = req.header("accept", "application/json");
req = req.header("x-api-key", &api_key);
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
}
// dbg!(&req);
// let (client, res) = req.build_split();
// let res = res.unwrap();
// res.url_mut().is_special()
// dbg!(client);
// dbg!(res);
// todo!();
match req.send().await {
Ok(resp) => {
// dbg!(&resp.text().await);
if let Ok(response) = resp.text().await {
match serde_json::to_value(&response) {
Err(er) => {
error!("Bad JSON in response. Error: {}", er);
},
Ok(_) => {
let endpoint_name = &metrics.name;
let preproc = JsonParser::parse(&metrics.measure, &response);
// dbg!(serde_json::to_string_pretty(&preproc));
let metrics = serde_json::to_string_pretty(&preproc)
.unwrap_or_else(|_| {
error!("Cannot parse grabbed metrics data to String");
String::from(r#""value" : null"#)
});
// dbg!(&metrics);
match Exporter::export_metrics(&metrics).await {
Ok(_) => {
info!("Successfully imported metrics data to Prometheus");
},
Err(er) => {
error!("Failed to export data to Prometheus due to {}", er);
},
}
},
}
} else {
error!("Bad response from {}. No data", &metrics.url);
}
},
Err(er) => {
error!("Cannot API data from {} due to : {}", &metrics.url, er);
},
}
Ok(())
}
pub async fn process_endpoint(
// client : Arc<Client>,
config : Arc<ConfigEndpoint>,
creds : Credentials,
// exporter : Arc<Exporter>
) -> Result<()> {
//
let period = config.get_period().unwrap_or(0);
let timeout = config.get_timeout().unwrap_or(5);
let metrics = Arc::new(config.metrics.clone());
let service_id = Arc::new(config.id.clone());
loop {
// let exporter = exporter.clone();
let creds = creds.clone();
let metrics = metrics.clone();
let service_id = service_id.clone();
let mut jh = Vec::<JoinHandle::<Result<()>>>::new();
for idx in 0..metrics.len() {
// let exporter = exporter.clone();
let creds = creds.clone();
let metrics = metrics.clone();
let service_id = service_id.clone();
let event = tokio::spawn(async move {
Self::process_metrics(
service_id.clone(),
metrics[idx].clone().into(),
creds.clone(),
// exporter.clone()
).await
});
jh.push(event);
}
info!("Initializing another {} subjob(s) for `{}` service",
jh.len(),
&service_id
);
for i in jh {
let _ = i.await;
}
// processing
sleep(Duration::from_secs(timeout)).await
}
Ok(())
}
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
// let client = Arc::new(self.client.clone());
let config = Arc::new(self.config.clone());
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
for (idx, _) in config.config.iter().enumerate() {
// let for_creds = endpoints[idx].clone();
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
let endpoint = endpoints[idx].clone();
// let client = client.clone();
let exporter = exporter.clone();
let join_handler = tokio::spawn(async move {
Self::process_endpoint(
// client,
endpoint,
creds,
// exporter.clone()
).await
});
join_handles.push(join_handler);
}
info!("Initializing {} task(s) for current config", join_handles.len());
for i in join_handles {
let _ = i.await;
}
// let template = Arc::new(self.config.template.clone());
// if self.is_default().await { return Err(Error::msg("Default config with no endpoints")) }
// // TODO: rewrite nextly to async
// for point in template.iter() {
// let point = Arc::new(point.clone());
// // let buffer = buffer.clone();
// let client = client.clone();
// let exporter = exporter.clone();
// let endpoint_processer = tokio::spawn(async move {
// let point = point.clone();
// match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await {
// Ok(resp) => {
// if !resp.status().is_success() {
// error!("ErrorCode in Response from API. Check configuration");
// return Err(Error::msg("Error during sending request"));
// }
// if let Ok(text) = resp.text().await {
// //
// let metrics = ProcessedEndpoint::from_target_response(&text, &point)?;
// // dbg!(&metrics);
// println!("{}", &metrics);
// //
// if let Some(conn) = exporter.get_connection_from_pool().await {
// // TEST: to exporter
// let res = client.request(
// RestMethod::from_str("post").await,
// "http://192.168.2.34:9101/update")
// .json(&metrics)
// .send().await;
// if let Err(er) = res {
// error!("Cannot send data to exporter due to: {}", er);
// } else {
// println!("{:?}", res.unwrap().text().await);
// }
// if let Err(er) = Exporter::export_data(conn, &metrics).await {
// error!("Cannot export data to DB during to: {}", er);
// return Err(Error::msg("Error during exporting data to DB"));
// }
// } else {
// if !exporter.is_no_connection() {
// return Err(Error::msg("Error during getting connection from pool"));
// }
// }
// // let mut buffer = buffer.lock().await;
// // buffer.push(text);
// } else {
// error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url);
// return Err(Error::msg("Error with extracting text field from Response"));
// }
// },
// Err(_) => {
// error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url);
// return Err(Error::msg("Endpoint is unreachable"));
// },
// }
// Ok(())
// });
// join_handles.push(endpoint_processer);
// }
// for i in join_handles {
// let _ = i.await;
// }
// // let buffer = buffer.lock().await;
// // match &buffer.len() {
// // 0 => Err(Error::msg("Error due to API grubbing. Check config" )),
// // _ => {
// // Ok(())
// // },
// // }
Ok(())
}
}
// #[cfg(test)]
// mod net_unittests {
// use super::*;
// use tokio::test;
// #[test]
// async fn check_str_to_rest_method() {
// assert_eq!(RestMethod::from_str("get").await, Method::GET);
// assert_eq!(RestMethod::from_str("post").await, Method::POST);
// assert_eq!(RestMethod::from_str("patch").await, Method::PATCH);
// assert_eq!(RestMethod::from_str("put").await, Method::PUT);
// assert_eq!(RestMethod::from_str("delete").await, Method::DELETE);
// assert_eq!(RestMethod::from_str("invalid_method").await, Method::GET);
// }
// #[test]
// async fn check_api_poll_change_config() {
// let mut conf1 = ApiConfigV2::default();
// let conf2 = ApiConfigV2::pattern();
// let mut poll = ApiPoll::new(&mut conf1).await;
// poll.change_config(conf2).await;
// assert_eq!(poll.config.timeout, 1)
// }
// #[test]
// async fn check_api_poll_is_default() {
// let mut conf1 = ApiConfigV2::default();
// let poll = ApiPoll::new(&mut conf1).await;
// assert!(poll.is_default().await)
// }
// #[test]
// async fn check_api_grubbing_mechanism_on_public_one() {
// use log::{set_max_level, LevelFilter};
// set_max_level(LevelFilter::Off);
// let mut conf1 = ApiConfigV2::pattern();
// let conf2 = ApiConfigV2::default();
// let exporter = Arc::new(Exporter::init());
// let mut poll = ApiPoll::new(&mut conf1).await;
// assert!(poll.process_polling(exporter.clone()).await.is_ok());
// dbg!(&poll.config);
// poll.change_config(conf2).await;
// dbg!(&poll.config);
// assert!(poll.process_polling(exporter.clone()).await.is_err());
// }
// }