net refactor
test-org/integration-module/pipeline/pr-rc This commit looks good Details

feature/1117
prplV 2025-03-04 15:47:14 +03:00
parent 7efdfb39ba
commit f7ec3f2402
1 changed files with 9 additions and 65 deletions

View File

@ -1,22 +1,18 @@
// module to handle unix-socket connection + pulling info from api // module to handle unix-socket connection + pulling info from api
use anyhow::Result; use anyhow::Result;
// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
use log::{error, info}; use log::{error, info};
use rand::random; use rand::random;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use reqwest::{Client, Method}; use reqwest::Client;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::Arc; use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
// use tokio::sync::Mutex;
use dotenv::dotenv; use dotenv::dotenv;
use crate::json::JsonParser; use crate::json::JsonParser;
use crate::export::Exporter; use crate::export::Exporter;
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics}; use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
// use md5::compute;
// type BufferType = Arc<Mutex<Vec<String>>>;
// for api info pulling // for api info pulling
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> { pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
@ -42,32 +38,13 @@ pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>)
} }
let shared_pool = shared_pool.clone(); let shared_pool = shared_pool.clone();
info!("Data from API: {:?}", poller.process_polling(shared_pool).await); info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
// sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
}
}
// Ok(())
}
struct RestMethod;
impl RestMethod {
pub async fn from_str(method: &str) -> Method {
return match method.trim().to_lowercase().as_str() {
"post" => Method::POST,
"patch" => Method::PATCH,
"put" => Method::PUT,
"delete" => Method::DELETE,
"head" => Method::HEAD,
"trace" => Method::TRACE,
"options" => Method::OPTIONS,
"connect" => Method::CONNECT,
"get" | _ => Method::GET
} }
} }
} }
struct ApiPoll<'a> { struct ApiPoll<'a> {
config : &'a mut Config, config : &'a mut Config,
#[allow(unused)]
client : Client, client : Client,
} }
@ -78,26 +55,18 @@ impl<'a> ApiPoll<'a> {
client : Client::new(), client : Client::new(),
} }
} }
// can be weak and with bug test needed
pub async fn change_config(&mut self, conf: Config) { pub async fn change_config(&mut self, conf: Config) {
*self.config = conf; *self.config = conf;
} }
pub async fn is_default(&self) -> bool { pub async fn is_default(&self) -> bool {
self.config.is_default().await self.config.is_default().await
} }
// pub async fn get_delay(&self) -> u32 {
// self.config.timeout
// }
pub async fn process_metrics( pub async fn process_metrics(
service_id: Arc<String>, service_id: Arc<String>,
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
creds: Credentials, creds: Credentials,
// exporter: Arc<Exporter>
) -> Result<()> { ) -> Result<()> {
// processing metrics // processing metrics
// let mut req = Client::new()
// // .user_agent("api_grub/integration_module")
// .get(&metrics.url);
use std::hash::DefaultHasher; use std::hash::DefaultHasher;
let rand = random::<char>(); let rand = random::<char>();
@ -113,32 +82,15 @@ impl<'a> ApiPoll<'a> {
let api_key = &creds.endpoint.api_key; let api_key = &creds.endpoint.api_key;
if !login.is_empty() && !password.is_empty() { if !login.is_empty() && !password.is_empty() {
// dbg!("kjgbkasgksjd");
req = req.basic_auth(login, Some(password)); req = req.basic_auth(login, Some(password));
} }
if !api_key.is_empty() { if !api_key.is_empty() {
// req = req.bearer_auth(&api_key);
// req = req.header("authorization", "bearer ");
req = req.header("accept", "application/json"); req = req.header("accept", "application/json");
req = req.header("x-api-key", api_key); req = req.header("x-api-key", api_key);
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
} }
// dbg!(&req);
// let (client, res) = req.build_split();
// let res = res.unwrap();
// res.url_mut().is_special()
// dbg!(client);
// dbg!(res);
// todo!();
match req.send().await { match req.send().await {
Ok(resp) => { Ok(resp) => {
// dbg!(&resp.text().await);
if let Ok(response) = resp.text().await { if let Ok(response) = resp.text().await {
match serde_json::to_value(&response) { match serde_json::to_value(&response) {
Err(er) => { Err(er) => {
@ -147,7 +99,6 @@ impl<'a> ApiPoll<'a> {
Ok(_) => { Ok(_) => {
let endpoint_name = &metrics.name; let endpoint_name = &metrics.name;
let preproc = JsonParser::parse(&metrics.measure, &response); let preproc = JsonParser::parse(&metrics.measure, &response);
// dbg!(serde_json::to_string_pretty(&preproc));
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
match Exporter::export_metrics(preproc).await { match Exporter::export_metrics(preproc).await {
Ok(bytes) => { Ok(bytes) => {
@ -170,18 +121,15 @@ impl<'a> ApiPoll<'a> {
Ok(()) Ok(())
} }
pub async fn process_endpoint( pub async fn process_endpoint(
// client : Arc<Client>,
config : Arc<ConfigEndpoint>, config : Arc<ConfigEndpoint>,
creds : Credentials, creds : Credentials,
// exporter : Arc<Exporter>
) -> Result<()> { ) -> Result<()> {
// // TODO: HAVE TO BE USED
let period = config.get_period().unwrap_or(0); let _period = config.get_period().unwrap_or(0);
let timeout = config.get_timeout().unwrap_or(5); let timeout = config.get_timeout().unwrap_or(5);
let metrics = Arc::new(config.metrics.clone()); let metrics = Arc::new(config.metrics.clone());
let service_id = Arc::new(config.id.clone()); let service_id = Arc::new(config.id.clone());
loop { loop {
// let exporter = exporter.clone();
let creds = creds.clone(); let creds = creds.clone();
let metrics = metrics.clone(); let metrics = metrics.clone();
let service_id = service_id.clone(); let service_id = service_id.clone();
@ -211,28 +159,23 @@ impl<'a> ApiPoll<'a> {
// processing // processing
sleep(Duration::from_secs(timeout)).await sleep(Duration::from_secs(timeout)).await
} }
Ok(())
} }
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> { pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
// let client = Arc::new(self.client.clone());
let config = Arc::new(self.config.clone()); let config = Arc::new(self.config.clone());
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone()); let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![]; let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
for (idx, _) in config.config.iter().enumerate() { for (idx, _) in config.config.iter().enumerate() {
// let for_creds = endpoints[idx].clone();
let creds = Credentials::from_config_endpoint(endpoints[idx].clone()); let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
let endpoint = endpoints[idx].clone(); let endpoint = endpoints[idx].clone();
// let client = client.clone();
// TODO: USE EXPORTER
#[allow(unused)]
let exporter = exporter.clone(); let exporter = exporter.clone();
let join_handler = tokio::spawn(async move { let join_handler = tokio::spawn(async move {
Self::process_endpoint( Self::process_endpoint(
// client,
endpoint, endpoint,
creds, creds,
// exporter.clone()
).await ).await
}); });
join_handles.push(join_handler); join_handles.push(join_handler);
@ -245,6 +188,7 @@ impl<'a> ApiPoll<'a> {
} }
} }
// TODO: FIX TESTS
// #[cfg(test)] // #[cfg(test)]
// mod net_unittests { // mod net_unittests {
// use super::*; // use super::*;