Compare commits
No commits in common. "f7ec3f24027d19cea5f9feadeb9e9ca5fbb3dcf6" and "45305d4c356904044ddf15ad66de2b2298e313b3" have entirely different histories.
f7ec3f2402
...
45305d4c35
|
|
@ -1,6 +1,7 @@
|
||||||
// mod to communicate with api-grub config file
|
// mod to communicate with api-grub config file
|
||||||
// 1) check changes in unix-socket
|
// 1) check changes in unix-socket
|
||||||
// 2) save changes in local config file
|
// 2) save changes in local config file
|
||||||
|
use integr_structs::api::ApiConfigV2;
|
||||||
use anyhow::{Error, Ok, Result};
|
use anyhow::{Error, Ok, Result};
|
||||||
use log::{info, warn, error};
|
use log::{info, warn, error};
|
||||||
use std::{fs, path::Path};
|
use std::{fs, path::Path};
|
||||||
|
|
@ -14,9 +15,19 @@ use integr_structs::api::v3::Config;
|
||||||
const CONFIG_PATH: &str = "config_api.json";
|
const CONFIG_PATH: &str = "config_api.json";
|
||||||
const SOCKET_PATH: &str = "api-grub.sock";
|
const SOCKET_PATH: &str = "api-grub.sock";
|
||||||
|
|
||||||
// TODO: rewrite to use current_exe
|
// todo! rewrite to use current_exe
|
||||||
pub async fn pull_local_config() -> Result<Config> {
|
pub async fn pull_local_config() -> Result<Config> {
|
||||||
|
// let conf_path = std::env::current_exe()?;
|
||||||
let path = Path::new(CONFIG_PATH);
|
let path = Path::new(CONFIG_PATH);
|
||||||
|
// return match conf_path.parent() {
|
||||||
|
// Some(dir) => {
|
||||||
|
// let config: ApiConfig = from_str(
|
||||||
|
// &fs::read_to_string(dir.join(CONFIG_PATH))?
|
||||||
|
// )?;
|
||||||
|
// Ok(config)
|
||||||
|
// },
|
||||||
|
// None => Err(Error::msg("No local conf was found"))
|
||||||
|
// }
|
||||||
if path.exists() && path.is_file() {
|
if path.exists() && path.is_file() {
|
||||||
let config: Config = from_str(
|
let config: Config = from_str(
|
||||||
&fs::read_to_string(CONFIG_PATH)?
|
&fs::read_to_string(CONFIG_PATH)?
|
||||||
|
|
@ -28,13 +39,14 @@ pub async fn pull_local_config() -> Result<Config> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// for config pulling
|
// for config pulling
|
||||||
|
// ++++ reader to channel
|
||||||
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
||||||
info!("Initializing Unix-Socket listening for pulling new configs...");
|
info!("Initializing Unix-Socket listening for pulling new configs...");
|
||||||
let server = init_unix_listener().await?;
|
let server = init_unix_listener().await?;
|
||||||
|
//
|
||||||
info!("Listening Unix-Socket...");
|
info!("Listening Unix-Socket...");
|
||||||
let mut buffer = String::new();
|
let mut buffer = String::new();
|
||||||
|
//
|
||||||
loop {
|
loop {
|
||||||
if let stdOk((mut stream, _)) = server.accept().await {
|
if let stdOk((mut stream, _)) = server.accept().await {
|
||||||
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
if let Err(er) = stream.read_to_string(&mut buffer).await {
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ use integr_structs::api::v3::{PrometheusMetrics, PrometheusMetricsExtended};
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use tokio_postgres::NoTls;
|
use tokio_postgres::NoTls;
|
||||||
use std::env;
|
use std::env;
|
||||||
use anyhow::Result;
|
use anyhow::{Result};
|
||||||
use log::{info, error};
|
use log::{info, error};
|
||||||
|
|
||||||
pub struct Exporter {
|
pub struct Exporter {
|
||||||
|
|
@ -34,43 +34,51 @@ impl Exporter {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
|
||||||
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||||
pub fn init() -> Self {
|
pub fn init() -> Self {
|
||||||
Self {
|
Self {
|
||||||
pool : Self::pool_construct(),
|
pool : Self::pool_construct(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
|
||||||
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||||
if let Some(pool) = &self.pool {
|
if let Some(pool) = &self.pool {
|
||||||
return Some(pool.get().await.ok()?);
|
return Some(pool.get().await.ok()?);
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
#[allow(unused)]
|
|
||||||
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||||
|
// client.
|
||||||
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||||
let _ = client.query(&query, &[&metrics]).await?;
|
let _ = client.query(&query, &[&metrics]).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
|
// let req = Request::new(Method::PUT,
|
||||||
|
// Url::parse(metrics)?);
|
||||||
|
// dbg!(&metrics);
|
||||||
let req = Client::new()
|
let req = Client::new()
|
||||||
.post(url)
|
.post(url)
|
||||||
.json(&metrics)
|
.json(&metrics)
|
||||||
.send().await;
|
.send().await;
|
||||||
|
// dbg!(&req);
|
||||||
|
// dbg!(&req.unwrap().text().await);
|
||||||
|
// todo : rewrite with status code wrapping
|
||||||
req?;
|
req?;
|
||||||
Ok(metrics.get_bytes_len())
|
Ok(metrics.get_bytes_len())
|
||||||
}
|
}
|
||||||
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
|
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
|
// let req = Request::new(Method::PUT,
|
||||||
|
// Url::parse(metrics)?);
|
||||||
|
// dbg!(&metrics);
|
||||||
let req = Client::new()
|
let req = Client::new()
|
||||||
.post(url)
|
.post(url)
|
||||||
.json(&metrics)
|
.json(&metrics)
|
||||||
.send().await;
|
.send().await;
|
||||||
|
// dbg!(&req);
|
||||||
|
// dbg!(&req.unwrap().text().await);
|
||||||
|
// todo : rewrite with status code wrapping
|
||||||
req?;
|
req?;
|
||||||
Ok(metrics.get_bytes_len())
|
Ok(metrics.get_bytes_len())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,18 +1,22 @@
|
||||||
// module to handle unix-socket connection + pulling info from api
|
// module to handle unix-socket connection + pulling info from api
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
// use integr_structs::api::{ApiConfigV2, ProcessedEndpoint};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use rand::random;
|
use rand::random;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use reqwest::Client;
|
use reqwest::{Client, Method};
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
// use tokio::sync::Mutex;
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use crate::json::JsonParser;
|
use crate::json::JsonParser;
|
||||||
use crate::export::Exporter;
|
use crate::export::Exporter;
|
||||||
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
|
use integr_structs::api::v3::{Config, ConfigEndpoint, Credentials, Metrics, PrometheusMetrics};
|
||||||
|
// use md5::compute;
|
||||||
|
|
||||||
|
// type BufferType = Arc<Mutex<Vec<String>>>;
|
||||||
|
|
||||||
// for api info pulling
|
// for api info pulling
|
||||||
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
|
pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>) -> Result<()> {
|
||||||
|
|
@ -38,13 +42,32 @@ pub async fn init_api_grub_mechanism(config: Config, rx: &mut Receiver<Config>)
|
||||||
}
|
}
|
||||||
let shared_pool = shared_pool.clone();
|
let shared_pool = shared_pool.clone();
|
||||||
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
info!("Data from API: {:?}", poller.process_polling(shared_pool).await);
|
||||||
|
// sleep(Duration::from_secs(poller.get_delay().await as u64)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct RestMethod;
|
||||||
|
|
||||||
|
impl RestMethod {
|
||||||
|
pub async fn from_str(method: &str) -> Method {
|
||||||
|
return match method.trim().to_lowercase().as_str() {
|
||||||
|
"post" => Method::POST,
|
||||||
|
"patch" => Method::PATCH,
|
||||||
|
"put" => Method::PUT,
|
||||||
|
"delete" => Method::DELETE,
|
||||||
|
"head" => Method::HEAD,
|
||||||
|
"trace" => Method::TRACE,
|
||||||
|
"options" => Method::OPTIONS,
|
||||||
|
"connect" => Method::CONNECT,
|
||||||
|
"get" | _ => Method::GET
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ApiPoll<'a> {
|
struct ApiPoll<'a> {
|
||||||
config : &'a mut Config,
|
config : &'a mut Config,
|
||||||
#[allow(unused)]
|
|
||||||
client : Client,
|
client : Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -55,18 +78,26 @@ impl<'a> ApiPoll<'a> {
|
||||||
client : Client::new(),
|
client : Client::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// can be weak and with bug test needed
|
||||||
pub async fn change_config(&mut self, conf: Config) {
|
pub async fn change_config(&mut self, conf: Config) {
|
||||||
*self.config = conf;
|
*self.config = conf;
|
||||||
}
|
}
|
||||||
pub async fn is_default(&self) -> bool {
|
pub async fn is_default(&self) -> bool {
|
||||||
self.config.is_default().await
|
self.config.is_default().await
|
||||||
}
|
}
|
||||||
|
// pub async fn get_delay(&self) -> u32 {
|
||||||
|
// self.config.timeout
|
||||||
|
// }
|
||||||
pub async fn process_metrics(
|
pub async fn process_metrics(
|
||||||
service_id: Arc<String>,
|
service_id: Arc<String>,
|
||||||
metrics: Arc<Metrics>,
|
metrics: Arc<Metrics>,
|
||||||
creds: Credentials,
|
creds: Credentials,
|
||||||
|
// exporter: Arc<Exporter>
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// processing metrics
|
// processing metrics
|
||||||
|
// let mut req = Client::new()
|
||||||
|
// // .user_agent("api_grub/integration_module")
|
||||||
|
// .get(&metrics.url);
|
||||||
use std::hash::DefaultHasher;
|
use std::hash::DefaultHasher;
|
||||||
|
|
||||||
let rand = random::<char>();
|
let rand = random::<char>();
|
||||||
|
|
@ -82,15 +113,32 @@ impl<'a> ApiPoll<'a> {
|
||||||
let api_key = &creds.endpoint.api_key;
|
let api_key = &creds.endpoint.api_key;
|
||||||
|
|
||||||
if !login.is_empty() && !password.is_empty() {
|
if !login.is_empty() && !password.is_empty() {
|
||||||
|
// dbg!("kjgbkasgksjd");
|
||||||
req = req.basic_auth(login, Some(password));
|
req = req.basic_auth(login, Some(password));
|
||||||
}
|
}
|
||||||
if !api_key.is_empty() {
|
if !api_key.is_empty() {
|
||||||
|
// req = req.bearer_auth(&api_key);
|
||||||
|
// req = req.header("authorization", "bearer ");
|
||||||
|
|
||||||
req = req.header("accept", "application/json");
|
req = req.header("accept", "application/json");
|
||||||
req = req.header("x-api-key", api_key);
|
req = req.header("x-api-key", api_key);
|
||||||
|
|
||||||
|
// req = req.query(&["Bearer", "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711"])
|
||||||
}
|
}
|
||||||
|
// dbg!(&req);
|
||||||
|
// let (client, res) = req.build_split();
|
||||||
|
// let res = res.unwrap();
|
||||||
|
// res.url_mut().is_special()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// dbg!(client);
|
||||||
|
// dbg!(res);
|
||||||
|
// todo!();
|
||||||
|
|
||||||
match req.send().await {
|
match req.send().await {
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
|
// dbg!(&resp.text().await);
|
||||||
if let Ok(response) = resp.text().await {
|
if let Ok(response) = resp.text().await {
|
||||||
match serde_json::to_value(&response) {
|
match serde_json::to_value(&response) {
|
||||||
Err(er) => {
|
Err(er) => {
|
||||||
|
|
@ -99,6 +147,7 @@ impl<'a> ApiPoll<'a> {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
let endpoint_name = &metrics.name;
|
let endpoint_name = &metrics.name;
|
||||||
let preproc = JsonParser::parse(&metrics.measure, &response);
|
let preproc = JsonParser::parse(&metrics.measure, &response);
|
||||||
|
// dbg!(serde_json::to_string_pretty(&preproc));
|
||||||
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
let preproc = PrometheusMetrics::new(&service_id, endpoint_name, preproc);
|
||||||
match Exporter::export_metrics(preproc).await {
|
match Exporter::export_metrics(preproc).await {
|
||||||
Ok(bytes) => {
|
Ok(bytes) => {
|
||||||
|
|
@ -121,15 +170,18 @@ impl<'a> ApiPoll<'a> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn process_endpoint(
|
pub async fn process_endpoint(
|
||||||
|
// client : Arc<Client>,
|
||||||
config : Arc<ConfigEndpoint>,
|
config : Arc<ConfigEndpoint>,
|
||||||
creds : Credentials,
|
creds : Credentials,
|
||||||
|
// exporter : Arc<Exporter>
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// TODO: HAVE TO BE USED
|
//
|
||||||
let _period = config.get_period().unwrap_or(0);
|
let period = config.get_period().unwrap_or(0);
|
||||||
let timeout = config.get_timeout().unwrap_or(5);
|
let timeout = config.get_timeout().unwrap_or(5);
|
||||||
let metrics = Arc::new(config.metrics.clone());
|
let metrics = Arc::new(config.metrics.clone());
|
||||||
let service_id = Arc::new(config.id.clone());
|
let service_id = Arc::new(config.id.clone());
|
||||||
loop {
|
loop {
|
||||||
|
// let exporter = exporter.clone();
|
||||||
let creds = creds.clone();
|
let creds = creds.clone();
|
||||||
let metrics = metrics.clone();
|
let metrics = metrics.clone();
|
||||||
let service_id = service_id.clone();
|
let service_id = service_id.clone();
|
||||||
|
|
@ -159,23 +211,28 @@ impl<'a> ApiPoll<'a> {
|
||||||
// processing
|
// processing
|
||||||
sleep(Duration::from_secs(timeout)).await
|
sleep(Duration::from_secs(timeout)).await
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
pub async fn process_polling(&self, exporter: Arc<Exporter>) -> Result<()> {
|
||||||
|
// let buffer: BufferType = Arc::new(Mutex::new(vec![]));
|
||||||
|
// let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
|
// let client = Arc::new(self.client.clone());
|
||||||
let config = Arc::new(self.config.clone());
|
let config = Arc::new(self.config.clone());
|
||||||
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
|
let endpoints: Vec<Arc<ConfigEndpoint>> = ConfigEndpoint::from_config(config.clone());
|
||||||
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
let mut join_handles: Vec<JoinHandle<Result<()>>> = vec![];
|
||||||
|
|
||||||
for (idx, _) in config.config.iter().enumerate() {
|
for (idx, _) in config.config.iter().enumerate() {
|
||||||
|
// let for_creds = endpoints[idx].clone();
|
||||||
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
|
let creds = Credentials::from_config_endpoint(endpoints[idx].clone());
|
||||||
let endpoint = endpoints[idx].clone();
|
let endpoint = endpoints[idx].clone();
|
||||||
|
// let client = client.clone();
|
||||||
// TODO: USE EXPORTER
|
|
||||||
#[allow(unused)]
|
|
||||||
let exporter = exporter.clone();
|
let exporter = exporter.clone();
|
||||||
let join_handler = tokio::spawn(async move {
|
let join_handler = tokio::spawn(async move {
|
||||||
Self::process_endpoint(
|
Self::process_endpoint(
|
||||||
|
// client,
|
||||||
endpoint,
|
endpoint,
|
||||||
creds,
|
creds,
|
||||||
|
// exporter.clone()
|
||||||
).await
|
).await
|
||||||
});
|
});
|
||||||
join_handles.push(join_handler);
|
join_handles.push(join_handler);
|
||||||
|
|
@ -188,7 +245,6 @@ impl<'a> ApiPoll<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: FIX TESTS
|
|
||||||
// #[cfg(test)]
|
// #[cfg(test)]
|
||||||
// mod net_unittests {
|
// mod net_unittests {
|
||||||
// use super::*;
|
// use super::*;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue