Compare commits

...

3 Commits

Author SHA1 Message Date
prplV f7ec3f2402 net refactor
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-04 15:47:14 +03:00
prplV 7efdfb39ba config refactor 2025-03-04 15:47:09 +03:00
prplV 58dedc1adf refactor 2025-03-04 15:43:04 +03:00
3 changed files with 18 additions and 94 deletions

View File

@ -1,7 +1,6 @@
// mod to communicate with api-grub config file // mod to communicate with api-grub config file
// 1) check changes in unix-socket // 1) check changes in unix-socket
// 2) save changes in local config file // 2) save changes in local config file
use integr_structs::api::ApiConfigV2;
use anyhow::{Error, Ok, Result}; use anyhow::{Error, Ok, Result};
use log::{info, warn, error}; use log::{info, warn, error};
use std::{fs, path::Path}; use std::{fs, path::Path};
@ -15,19 +14,9 @@ use integr_structs::api::v3::Config;
const CONFIG_PATH: &str = "config_api.json"; const CONFIG_PATH: &str = "config_api.json";
const SOCKET_PATH: &str = "api-grub.sock"; const SOCKET_PATH: &str = "api-grub.sock";
// todo! rewrite to use current_exe // TODO: rewrite to use current_exe
pub async fn pull_local_config() -> Result<Config> { pub async fn pull_local_config() -> Result<Config> {
// let conf_path = std::env::current_exe()?;
let path = Path::new(CONFIG_PATH); let path = Path::new(CONFIG_PATH);
// return match conf_path.parent() {
// Some(dir) => {
// let config: ApiConfig = from_str(
// &fs::read_to_string(dir.join(CONFIG_PATH))?
// )?;
// Ok(config)
// },
// None => Err(Error::msg("No local conf was found"))
// }
if path.exists() && path.is_file() { if path.exists() && path.is_file() {
let config: Config = from_str( let config: Config = from_str(
&fs::read_to_string(CONFIG_PATH)? &fs::read_to_string(CONFIG_PATH)?
@ -39,14 +28,13 @@ pub async fn pull_local_config() -> Result<Config> {
} }
// for config pulling // for config pulling
// ++++ reader to channel
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> { pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
info!("Initializing Unix-Socket listening for pulling new configs..."); info!("Initializing Unix-Socket listening for pulling new configs...");
let server = init_unix_listener().await?; let server = init_unix_listener().await?;
//
info!("Listening Unix-Socket..."); info!("Listening Unix-Socket...");
let mut buffer = String::new(); let mut buffer = String::new();
//
loop { loop {
if let stdOk((mut stream, _)) = server.accept().await { if let stdOk((mut stream, _)) = server.accept().await {
if let Err(er) = stream.read_to_string(&mut buffer).await { if let Err(er) = stream.read_to_string(&mut buffer).await {

View File

@ -3,7 +3,7 @@ use integr_structs::api::v3::{PrometheusMetrics, PrometheusMetricsExtended};
use reqwest::Client; use reqwest::Client;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use std::env; use std::env;
use anyhow::{Result}; use anyhow::Result;
use log::{info, error}; use log::{info, error};
pub struct Exporter { pub struct Exporter {
@ -34,51 +34,43 @@ impl Exporter {
}, },
} }
} }
#[allow(unused)]
pub fn is_no_connection(&self) -> bool { self.pool.is_none() } pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self { pub fn init() -> Self {
Self { Self {
pool : Self::pool_construct(), pool : Self::pool_construct(),
} }
} }
#[allow(unused)]
pub async fn get_connection_from_pool(&self) -> Option<PgClient> { pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool { if let Some(pool) = &self.pool {
return Some(pool.get().await.ok()?); return Some(pool.get().await.ok()?);
} }
None None
} }
#[allow(unused)]
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
// client.
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?; let _ = client.query(&query, &[&metrics]).await?;
Ok(()) Ok(())
} }
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> { pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?; let url = env::var("EXPORTER_URL")?;
// let req = Request::new(Method::PUT,
// Url::parse(metrics)?);
// dbg!(&metrics);
let req = Client::new() let req = Client::new()
.post(url) .post(url)
.json(&metrics) .json(&metrics)
.send().await; .send().await;
// dbg!(&req);
// dbg!(&req.unwrap().text().await);
// todo : rewrite with status code wrapping
req?; req?;
Ok(metrics.get_bytes_len()) Ok(metrics.get_bytes_len())
} }
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> { pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
let url = env::var("EXPORTER_URL")?; let url = env::var("EXPORTER_URL")?;
// let req = Request::new(Method::PUT,
// Url::parse(metrics)?);
// dbg!(&metrics);
let req = Client::new() let req = Client::new()
.post(url) .post(url)
.json(&metrics) .json(&metrics)
.send().await; .send().await;
// dbg!(&req);
// dbg!(&req.unwrap().text().await);
// todo : rewrite with status code wrapping
req?; req?;
Ok(metrics.get_bytes_len()) Ok(metrics.get_bytes_len())
} }

View File

@ -1,22 +1,18 @@
// module to handle unix-socket connection + pulling info from api // module to handle unix-socket connection + pulling info from api
use anyhow::Result; use anyhow::Result;
// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
use log::{error, info}; use log::{error, info};
use rand::random; use rand::random;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use reqwest::{Client, Method}; use reqwest::Client;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::Arc; use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
// use tokio::sync::Mutex;
use dotenv::dotenv; use dotenv::dotenv;
use crate::json::JsonParser; use crate::json::JsonParser;
use crate::export::Exporter; use crate::export::Exporter;
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics}; use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
// use md5::compute;
// type BufferType = Arc<Mutex<Vec<String>>>;
// for api info pulling // for api info pulling
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> { pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
@ -42,32 +38,13 @@ pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>)
} }
let shared_pool = shared_pool.clone(); let shared_pool = shared_pool.clone();
info!("Data from API: {:?}", poller.process_polling(shared_pool).await); info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
// sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
}
}
// Ok(())
}
struct RestMethod;
impl RestMethod {
pub async fn from_str(method: &str) -> Method {
return match method.trim().to_lowercase().as_str() {
"post" => Method::POST,
"patch" => Method::PATCH,
"put" => Method::PUT,
"delete" => Method::DELETE,
"head" => Method::HEAD,
"trace" => Method::TRACE,
"options" => Method::OPTIONS,
"connect" => Method::CONNECT,
"get" | _ => Method::GET
} }
} }
} }
struct ApiPoll<'a> { struct ApiPoll<'a> {
config : &'a mut Config, config : &'a mut Config,
#[allow(unused)]
client : Client, client : Client,
} }
@ -78,26 +55,18 @@ impl<'a> ApiPoll<'a> {
client : Client::new(), client : Client::new(),
} }
} }
// can be weak and with bug test needed
pub async fn change_config(&mut self, conf: Config) { pub async fn change_config(&mut self, conf: Config) {
*self.config = conf; *self.config = conf;
} }
pub async fn is_default(&self) -> bool { pub async fn is_default(&self) -> bool {
self.config.is_default().await self.config.is_default().await
} }
// pub async fn get_delay(&self) -> u32 {
// self.config.timeout
// }
pub async fn process_metrics( pub async fn process_metrics(
service_id: Arc<String>, service_id: Arc<String>,
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
creds: Credentials, creds: Credentials,
// exporter: Arc<Exporter>
) -> Result<()> { ) -> Result<()> {
// processing metrics // processing metrics
// let mut req = Client::new()
// // .user_agent("api_grub/integration_module")
// .get(&metrics.url);
use std::hash::DefaultHasher; use std::hash::DefaultHasher;
let rand = random::<char>(); let rand = random::<char>();
@ -113,32 +82,15 @@ impl<'a> ApiPoll<'a> {
let api_key = &creds.endpoint.api_key; let api_key = &creds.endpoint.api_key;
if !login.is_empty() && !password.is_empty() { if !login.is_empty() && !password.is_empty() {
// dbg!("kjgbkasgksjd");
req = req.basic_auth(login, Some(password)); req = req.basic_auth(login, Some(password));
} }
if !api_key.is_empty() { if !api_key.is_empty() {
// req = req.bearer_auth(&api_key);
// req = req.header("authorization", "bearer ");
req = req.header("accept", "application/json"); req = req.header("accept", "application/json");
req = req.header("x-api-key", api_key); req = req.header("x-api-key", api_key);
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
} }
// dbg!(&req);
// let (client, res) = req.build_split();
// let res = res.unwrap();
// res.url_mut().is_special()
// dbg!(client);
// dbg!(res);
// todo!();
match req.send().await { match req.send().await {
Ok(resp) => { Ok(resp) => {
// dbg!(&resp.text().await);
if let Ok(response) = resp.text().await { if let Ok(response) = resp.text().await {
match serde_json::to_value(&response) { match serde_json::to_value(&response) {
Err(er) => { Err(er) => {
@ -147,7 +99,6 @@ impl<'a> ApiPoll<'a> {
Ok(_) => { Ok(_) => {
let endpoint_name = &metrics.name; let endpoint_name = &metrics.name;
let preproc = JsonParser::parse(&metrics.measure, &response); let preproc = JsonParser::parse(&metrics.measure, &response);
// dbg!(serde_json::to_string_pretty(&preproc));
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc); let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
match Exporter::export_metrics(preproc).await { match Exporter::export_metrics(preproc).await {
Ok(bytes) => { Ok(bytes) => {
@ -170,18 +121,15 @@ impl<'a> ApiPoll<'a> {
Ok(()) Ok(())
} }
pub async fn process_endpoint( pub async fn process_endpoint(
// client : Arc<Client>,
config : Arc<ConfigEndpoint>, config : Arc<ConfigEndpoint>,
creds : Credentials, creds : Credentials,
// exporter : Arc<Exporter>
) -> Result<()> { ) -> Result<()> {
// // TODO: HAVE TO BE USED
let period = config.get_period().unwrap_or(0); let _period = config.get_period().unwrap_or(0);
let timeout = config.get_timeout().unwrap_or(5); let timeout = config.get_timeout().unwrap_or(5);
let metrics = Arc::new(config.metrics.clone()); let metrics = Arc::new(config.metrics.clone());
let service_id = Arc::new(config.id.clone()); let service_id = Arc::new(config.id.clone());
loop { loop {
// let exporter = exporter.clone();
let creds = creds.clone(); let creds = creds.clone();
let metrics = metrics.clone(); let metrics = metrics.clone();
let service_id = service_id.clone(); let service_id = service_id.clone();
@ -211,28 +159,23 @@ impl<'a> ApiPoll<'a> {
// processing // processing
sleep(Duration::from_secs(timeout)).await sleep(Duration::from_secs(timeout)).await
} }
Ok(())
} }
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> { pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
// let client = Arc::new(self.client.clone());
let config = Arc::new(self.config.clone()); let config = Arc::new(self.config.clone());
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone()); let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![]; let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
for (idx, _) in config.config.iter().enumerate() { for (idx, _) in config.config.iter().enumerate() {
// let for_creds = endpoints[idx].clone();
let creds = Credentials::from_config_endpoint(endpoints[idx].clone()); let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
let endpoint = endpoints[idx].clone(); let endpoint = endpoints[idx].clone();
// let client = client.clone();
// TODO: USE EXPORTER
#[allow(unused)]
let exporter = exporter.clone(); let exporter = exporter.clone();
let join_handler = tokio::spawn(async move { let join_handler = tokio::spawn(async move {
Self::process_endpoint( Self::process_endpoint(
// client,
endpoint, endpoint,
creds, creds,
// exporter.clone()
).await ).await
}); });
join_handles.push(join_handler); join_handles.push(join_handler);
@ -245,6 +188,7 @@ impl<'a> ApiPoll<'a> {
} }
} }
// TODO: FIX TESTS
// #[cfg(test)] // #[cfg(test)]
// mod net_unittests { // mod net_unittests {
// use super::*; // use super::*;