diff --git a/.env b/.env index 2cbea56..b406691 100644 --- a/.env +++ b/.env @@ -1,3 +1,8 @@ CONFIG_SERVER_CREDS = "ws://127.0.0.1:8080" API_GRUBBER_SOCKET = "api-grub.sock" -PREPROC_SOCKET = "preproc.sock" \ No newline at end of file +PREPROC_SOCKET = "preproc.sock" + +DB_HOST = "192.168.2.37" +DB_USER = "vlad" +DB_PASSWORD = "vlad" +DB_DBNAME = "vks" \ No newline at end of file diff --git a/crates/api-grub/Cargo.toml b/crates/api-grub/Cargo.toml index 8c51835..b5e4a92 100644 --- a/crates/api-grub/Cargo.toml +++ b/crates/api-grub/Cargo.toml @@ -12,4 +12,7 @@ env_logger = "0.11.6" log = "0.4.25" anyhow = "1.0.95" chrono = "0.4.39" -reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] } \ No newline at end of file +reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] } +deadpool-postgres = "0.14.1" +tokio-postgres = "0.7.12" +dotenv = "0.15.0" diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index ba587cb..ee13e5f 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -2,14 +2,70 @@ use anyhow::{Error, Result}; use integr_structs::api::{ApiConfigV2, ProcessedEndpoint}; use log::{error, info}; +use serde_json::Value; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; use reqwest::{Client, Method}; use std::sync::Arc; use tokio::task::JoinHandle; -use tokio::sync::Mutex; +// use tokio::sync::Mutex; +use tokio_postgres::types::ToSql; +use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; +use tokio_postgres::NoTls; +use dotenv::dotenv; +use std::env; + +// type BufferType = Arc>>; + +struct Exporter { + pool : Option, +} + +impl Exporter { + fn config_construct() -> Result { + let mut cfg = Config::new(); + cfg.host = Some(env::var("DB_HOST")?); + cfg.dbname = Some(env::var("DB_DBNAME")?); + cfg.user = Some(env::var("DB_USER")?); + cfg.password = Some(env::var("DB_PASSWORD")?); + Ok(cfg) + } + fn pool_construct() -> Option { + return match Self::config_construct() { + Ok(config) => { + if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) { + info!("Connected to PostgreSQL"); + return Some(pool); + } + None + }, + Err(_) => { + error!("Bad DB credentials or it's unreachable"); + None + }, + } + } + pub fn is_no_connection(&self) -> bool { self.pool.is_none() } + pub fn init() -> Self { + Self { + pool : Self::pool_construct() + } + } + pub async fn get_connection_from_pool(&self) -> Option { + if let Some(pool) = &self.pool { + return Some(pool.get().await.ok()?); + } + None + } + pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { + // client. + let query = "INSERT INTO metrics (body) VALUES ($1);"; + let _ = client.execute(query, &[&query]).await?; + Ok(()) + } + +} -type BufferType = Arc>>; struct RestMethod; impl RestMethod { @@ -46,34 +102,43 @@ impl<'a> ApiPoll<'a> { pub async fn is_default(&self) -> bool { self.config.template.len() == 0 } - pub async fn process_polling(&self) -> Result<()> { - let buffer: BufferType = Arc::new(Mutex::new(vec![])); - let mut join_handles: Vec> = vec![]; + pub async fn process_polling(&self, exporter: Arc) -> Result<()> { + // let buffer: BufferType = Arc::new(Mutex::new(vec![])); + let mut join_handles: Vec>> = vec![]; let client = Arc::new(self.client.clone()); let template = Arc::new(self.config.template.clone()); // TODO: rewrite nextly to async for point in template.iter() { let point = Arc::new(point.clone()); - let buffer = buffer.clone(); + // let buffer = buffer.clone(); let client = client.clone(); - // let a = self.client.get(&point.url).send().await.unwrap(); - // a.text().await.unwrap(); + let exporter = exporter.clone(); let endpoint_processer = tokio::spawn(async move { let point = point.clone(); match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { Ok(resp) => { if !resp.status().is_success() { error!("ErrorCode in Response from API. Check configuration"); - return; + return Err(Error::msg("Error during sending request")); } if let Ok(text) = resp.text().await { // - let a = ProcessedEndpoint::from_target_response(&text, &point); - println!("{}", a.unwrap()); + let metrics = ProcessedEndpoint::from_target_response(&text, &point)?; // - let mut buffer = buffer.lock().await; - buffer.push(text); + if let Some(conn) = exporter.get_connection_from_pool().await { + if let Err(er) = Exporter::export_data(conn, &metrics).await { + error!("Cannot export data to DB during to: {}", er); + return Err(Error::msg("Error during exporting data to DB")); + } + } else { + if !exporter.is_no_connection() { + return Err(Error::msg("Error during getting connection from pool")); + } + } + + // let mut buffer = buffer.lock().await; + // buffer.push(text); } else { error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); } @@ -82,6 +147,7 @@ impl<'a> ApiPoll<'a> { error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url); }, } + Ok(()) }); join_handles.push(endpoint_processer); } @@ -90,14 +156,14 @@ impl<'a> ApiPoll<'a> { let _ = i.await; } - let buffer = buffer.lock().await; - match &buffer.len() { - 0 => Err(Error::msg("Error due to API grubbing. Check config" )), - _ => { - Ok(()) - }, - } - // Ok(()) + // let buffer = buffer.lock().await; + // match &buffer.len() { + // 0 => Err(Error::msg("Error due to API grubbing. Check config" )), + // _ => { + // Ok(()) + // }, + // } + Ok(()) } pub async fn get_delay(&self) -> u32 { self.config.timeout @@ -107,10 +173,15 @@ impl<'a> ApiPoll<'a> { // for api info pulling pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver) -> Result<()> { info!("Initializing API-info grubbing mechanism..."); + info!("Loading vars from .env file if exists..."); + let _ = dotenv().ok(); + let mut config = config; let mut poller = ApiPoll::new(&mut config).await; - // let arc_poller = Arc::new(poller); + let client = Exporter::init(); + let shared_pool = Arc::new(client); loop { + let shared_pool = shared_pool.clone(); if poller.is_default().await { sleep(Duration::from_secs(5)).await; } else { @@ -120,7 +191,7 @@ pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver