diff --git a/.env b/.env index 2cbea56..b406691 100644 --- a/.env +++ b/.env @@ -1,3 +1,8 @@ CONFIG_SERVER_CREDS = "ws://127.0.0.1:8080" API_GRUBBER_SOCKET = "api-grub.sock" -PREPROC_SOCKET = "preproc.sock" \ No newline at end of file +PREPROC_SOCKET = "preproc.sock" + +DB_HOST = "192.168.2.37" +DB_USER = "vlad" +DB_PASSWORD = "vlad" +DB_DBNAME = "vks" \ No newline at end of file diff --git a/README.md b/README.md index c8897c7..4bfcad2 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,6 @@ | Crate (submodule) | Progress | |---|---| |`api-grub` | ✅✅✅✅✅✅✅🔲🔲🔲 | -|`config-delivery` | ✅✅✅✅🔲🔲🔲🔲🔲🔲 | +|`config-delivery` | ✅✅✅✅✅✅🔲🔲🔲🔲 | |`integrs-structs` | ✅✅✅✅✅✅🔲🔲🔲🔲 | -|`preproc` | 🔲🔲🔲🔲🔲🔲🔲🔲🔲🔲 | +|`preproc` | ✅✅✅🔲🔲🔲🔲🔲🔲🔲 | diff --git a/config_api.json b/config_api.json index 9136b29..24e96c4 100644 --- a/config_api.json +++ b/config_api.json @@ -1,13 +1,31 @@ -{ - "endpoints" : [ - { - "url" : "http://127.0.0.1:8081/ping", - "method" : "GET" - }, - { - "url" : "http://127.0.0.1:8081/", - "method" : "GET" - } - ], - "delay" : 5 -} \ No newline at end of file +{ + "id" : 1 , + "template" : + [{ + "id" :"mock_api_1", + "name" : "Mock / ", + "url" : "http://127.0.0.1:8081/", + "method" : "GET", + "measure" : + [ + "operation", "response" + ] + }, + { + "id" :"mock_api_2", + "name" : "Mock /ping ", + "url" : "http://127.0.0.1:8081/ping", + "method" : "GET", + "measure" : + [ + "operation", "response", "empty_field" + ] + } + ], + "ip_address" : "127.0.0.1:8081", + "login" : "", + "pass" : "" , + "api_key" : "908c709827bd40n98r7209837x98273", + "period" : 10, + "timeout" : 10 +} diff --git a/crates/api-grub/Cargo.toml b/crates/api-grub/Cargo.toml index 8c51835..b5e4a92 100644 --- a/crates/api-grub/Cargo.toml +++ b/crates/api-grub/Cargo.toml @@ -12,4 +12,7 @@ env_logger = "0.11.6" log = "0.4.25" anyhow = "1.0.95" chrono = "0.4.39" -reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] } \ No newline at end of file +reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] } +deadpool-postgres = "0.14.1" +tokio-postgres = "0.7.12" +dotenv = "0.15.0" diff --git a/crates/api-grub/src/config.rs b/crates/api-grub/src/config.rs index 9a30354..f68e77e 100644 --- a/crates/api-grub/src/config.rs +++ b/crates/api-grub/src/config.rs @@ -1,7 +1,7 @@ // mod to communicate with api-grub config file // 1) check changes in unix-socket // 2) save changes in local config file -use integr_structs::api::ApiConfig; +use integr_structs::api::{ApiConfig, ApiConfigV2}; use anyhow::{Error, Ok, Result}; use log::{info, warn, error}; use std::{fs, path::Path}; @@ -15,7 +15,7 @@ const CONFIG_PATH: &str = "config_api.json"; const SOCKET_PATH: &str = "api-grub.sock"; // todo! rewrite to use current_exe -pub async fn pull_local_config() -> Result { +pub async fn pull_local_config() -> Result { // let conf_path = std::env::current_exe()?; let path = Path::new(CONFIG_PATH); // return match conf_path.parent() { @@ -28,7 +28,7 @@ pub async fn pull_local_config() -> Result { // None => Err(Error::msg("No local conf was found")) // } if path.exists() && path.is_file() { - let config: ApiConfig = from_str( + let config: ApiConfigV2 = from_str( &fs::read_to_string(CONFIG_PATH)? )?; Ok(config) @@ -39,7 +39,7 @@ pub async fn pull_local_config() -> Result { // for config pulling // ++++ reader to channel -pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { +pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { info!("Initializing Unix-Socket listening for pulling new configs..."); let server = init_unix_listener().await?; // @@ -51,7 +51,7 @@ pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { if let Err(er) = stream.read_to_string(&mut buffer).await { warn!("Cannot read config from stream due to {}", er); } else { - let config: Result = from_str(&buffer); + let config: Result = from_str(&buffer); if let stdOk(conf) = config { info!("New config was pulled from Unix-Stream. Saving it locally and sharing with API-grub module..."); if let Err(er) = save_new_config(&buffer).await { @@ -97,13 +97,13 @@ mod config_unittests { #[test] async fn check_save_new_config() { use std::fs; - use integr_structs::api::ApiConfig; + use integr_structs::api::ApiConfigV2; use serde_json::to_string; let test_config_path = "test_config_api.json"; // config gen - let config = to_string::(&ApiConfig::default()); + let config = to_string::(&ApiConfigV2::default()); assert!(config.is_ok()); let config = config.unwrap(); diff --git a/crates/api-grub/src/main.rs b/crates/api-grub/src/main.rs index 208d646..2fe4a95 100644 --- a/crates/api-grub/src/main.rs +++ b/crates/api-grub/src/main.rs @@ -3,7 +3,7 @@ mod net; mod logger; use anyhow::Result; -use integr_structs::api::ApiConfig; +use integr_structs::api::ApiConfigV2; use logger::setup_logger; use log::{info, warn}; use config::{pull_local_config, init_config_grub_mechanism}; @@ -19,8 +19,9 @@ async fn main() -> Result<()>{ setup_logger().await?; let config = get_config().await; // config update channel - let (tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); // futures + // todo : rewrite with spawn let config_fut = init_config_grub_mechanism(&tx); let grub_fut = init_api_grub_mechanism(config, &mut rx); @@ -29,7 +30,7 @@ async fn main() -> Result<()>{ Ok(()) } -async fn get_config() -> ApiConfig { +async fn get_config() -> ApiConfigV2 { return match pull_local_config().await { Ok(conf) => { info!("Local config was loaded"); @@ -37,7 +38,7 @@ async fn get_config() -> ApiConfig { }, Err(er) => { warn!("Cannot get local config due to {}", er); - ApiConfig::default() + ApiConfigV2::default() } } } \ No newline at end of file diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index 62ddb5c..ee13e5f 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -1,10 +1,70 @@ // module to handle unix-socket connection + pulling info from api use anyhow::{Error, Result}; -use integr_structs::api::ApiConfig; +use integr_structs::api::{ApiConfigV2, ProcessedEndpoint}; use log::{error, info}; +use serde_json::Value; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; use reqwest::{Client, Method}; +use std::sync::Arc; +use tokio::task::JoinHandle; +// use tokio::sync::Mutex; +use tokio_postgres::types::ToSql; +use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; +use tokio_postgres::NoTls; +use dotenv::dotenv; +use std::env; + +// type BufferType = Arc>>; + +struct Exporter { + pool : Option, +} + +impl Exporter { + fn config_construct() -> Result { + let mut cfg = Config::new(); + cfg.host = Some(env::var("DB_HOST")?); + cfg.dbname = Some(env::var("DB_DBNAME")?); + cfg.user = Some(env::var("DB_USER")?); + cfg.password = Some(env::var("DB_PASSWORD")?); + Ok(cfg) + } + fn pool_construct() -> Option { + return match Self::config_construct() { + Ok(config) => { + if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) { + info!("Connected to PostgreSQL"); + return Some(pool); + } + None + }, + Err(_) => { + error!("Bad DB credentials or it's unreachable"); + None + }, + } + } + pub fn is_no_connection(&self) -> bool { self.pool.is_none() } + pub fn init() -> Self { + Self { + pool : Self::pool_construct() + } + } + pub async fn get_connection_from_pool(&self) -> Option { + if let Some(pool) = &self.pool { + return Some(pool.get().await.ok()?); + } + None + } + pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { + // client. + let query = "INSERT INTO metrics (body) VALUES ($1);"; + let _ = client.execute(query, &[&query]).await?; + Ok(()) + } + +} struct RestMethod; @@ -15,69 +75,113 @@ impl RestMethod { "patch" => Method::PATCH, "put" => Method::PUT, "delete" => Method::DELETE, + "head" => Method::HEAD, + "trace" => Method::TRACE, + "options" => Method::OPTIONS, + "connect" => Method::CONNECT, "get" | _ => Method::GET } } } struct ApiPoll<'a> { - config : &'a mut ApiConfig, + config : &'a mut ApiConfigV2, client : Client, } impl<'a> ApiPoll<'a> { - pub async fn new(poll_cfg : &'a mut ApiConfig) -> Self { + pub async fn new(poll_cfg : &'a mut ApiConfigV2) -> Self { Self { config : poll_cfg, client : Client::new(), } } // can be weak and with bug test needed - pub async fn change_config(&mut self, conf: ApiConfig) { + pub async fn change_config(&mut self, conf: ApiConfigV2) { *self.config = conf; } pub async fn is_default(&self) -> bool { - self.config.endpoints.len() == 0 + self.config.template.len() == 0 } - pub async fn process_polling(&self) -> Result> { - let mut buffer: Vec = vec![]; + pub async fn process_polling(&self, exporter: Arc) -> Result<()> { + // let buffer: BufferType = Arc::new(Mutex::new(vec![])); + let mut join_handles: Vec>> = vec![]; + let client = Arc::new(self.client.clone()); + let template = Arc::new(self.config.template.clone()); + // TODO: rewrite nextly to async - for point in &self.config.endpoints { - // let a = self.client.get(&point.url).send().await.unwrap(); - // a.text().await.unwrap(); - match self.client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { - Ok(resp) => { - if !resp.status().is_success() { - error!("ErrorCode in Response from API. Check configuration"); - continue; - } - if let Ok(text) = resp.text().await { - info!("{}: {} - Successfull grubbing info", &point.method.to_uppercase(), &point.url); - buffer.push(text); - } else { - error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); - } - }, - Err(er) => { - error!("{}: {} - Query crushed due to {}", &point.method.to_uppercase(), &point.url, er); - }, - } + for point in template.iter() { + let point = Arc::new(point.clone()); + // let buffer = buffer.clone(); + let client = client.clone(); + let exporter = exporter.clone(); + let endpoint_processer = tokio::spawn(async move { + let point = point.clone(); + match client.request(RestMethod::from_str(&point.method).await, &point.url).send().await { + Ok(resp) => { + if !resp.status().is_success() { + error!("ErrorCode in Response from API. Check configuration"); + return Err(Error::msg("Error during sending request")); + } + if let Ok(text) = resp.text().await { + // + let metrics = ProcessedEndpoint::from_target_response(&text, &point)?; + // + if let Some(conn) = exporter.get_connection_from_pool().await { + if let Err(er) = Exporter::export_data(conn, &metrics).await { + error!("Cannot export data to DB during to: {}", er); + return Err(Error::msg("Error during exporting data to DB")); + } + } else { + if !exporter.is_no_connection() { + return Err(Error::msg("Error during getting connection from pool")); + } + } + + // let mut buffer = buffer.lock().await; + // buffer.push(text); + } else { + error!("{}: {} - Error with extracting text field from Response", &point.method.to_uppercase(), &point.url); + } + }, + Err(_) => { + error!("{}: {} endpoint is unreachable", &point.method.to_uppercase(), &point.url); + }, + } + Ok(()) + }); + join_handles.push(endpoint_processer); } - match &buffer.len() { - 0 => Err(Error::msg("Error due to API grubbing. Check config" )), - _ => Ok(buffer), + + for i in join_handles { + let _ = i.await; } + + // let buffer = buffer.lock().await; + // match &buffer.len() { + // 0 => Err(Error::msg("Error due to API grubbing. Check config" )), + // _ => { + // Ok(()) + // }, + // } + Ok(()) } pub async fn get_delay(&self) -> u32 { - self.config.delay + self.config.timeout } } // for api info pulling -pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver) -> Result<()> { +pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver) -> Result<()> { info!("Initializing API-info grubbing mechanism..."); + info!("Loading vars from .env file if exists..."); + let _ = dotenv().ok(); + let mut config = config; let mut poller = ApiPoll::new(&mut config).await; + let client = Exporter::init(); + let shared_pool = Arc::new(client); loop { + let shared_pool = shared_pool.clone(); if poller.is_default().await { sleep(Duration::from_secs(5)).await; } else { @@ -87,7 +191,7 @@ pub async fn init_api_grub_mechanism(config: ApiConfig, rx: &mut Receiver; + +pub async fn init_delivery_mech(rx: ConfigGateway) -> Result<()> { + let mut rx = rx; + loop { + while rx.is_empty() { + continue; + }; + info!("New config was pulled. Redirecting it ..."); + let config = rx.recv().await.unwrap(); + match UnixSocketConsumer::ApiGrubber.get_stream_object().await { + Ok(socket) => { + socket.try_write(to_string_pretty(&config)?.as_bytes())?; + info!("New api config was pulled"); + }, + Err(er) => { + error!("Cannot coonect to ApiGrubber Unix-Socket due to {er}"); + } + } + sleep(Duration::from_secs(3)).await; + } + // let api_sock = UnixSocketConsumer::ApiGrubber.get_stream_object().await?; + // let preproc_sock = UnixSocketConsumer::Preproc.get_stream_object().await?; +} impl UnixSocketConsumer { pub async fn get_stream_object(&self) -> Result { @@ -20,16 +49,17 @@ impl UnixSocketConsumer { UnixSocketConsumer::ApiGrubber => env::var("API_GRUBBER_SOCKET")?, UnixSocketConsumer::Preproc => env::var("PREPROC_SOCKET")?, }; - UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg("Cannot create Unix-Socket client"))) + UnixStream::connect(socket_file).await.or_else(|_| Err(Error::msg(format!("Cannot create {:?} Unix-Socket client", self)))) } } -async fn check_unix_socket_file(path: &str) -> bool { - std::path::Path::new(path).exists() -} +// async fn check_unix_socket_file(path: &str) -> bool { +// std::path::Path::new(path).exists() +// } -#[cfg(tests)] +#[cfg(test)] mod delivery_unittests { + #[allow(unused_imports)] use super::*; use tokio::test; diff --git a/crates/config-delivery/src/integration.rs b/crates/config-delivery/src/integration.rs index e69de29..bf883a7 100644 --- a/crates/config-delivery/src/integration.rs +++ b/crates/config-delivery/src/integration.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use tokio::sync::mpsc::Sender; +use integr_structs::api::ApiConfigV2; +use anyhow::Result; +use tokio::time::{sleep, Duration}; +// mock + +type Worker = Arc>; + +#[allow(dead_code, unused_variables)] +pub async fn init_integration_mech(tx: Worker) -> Result<()> { + loop { + sleep(Duration::from_secs(5)).await; + let conf = ApiConfigV2::default(); + tx.send(conf).await?; + } + // Ok(()) +} + diff --git a/crates/config-delivery/src/main.rs b/crates/config-delivery/src/main.rs index 99eded1..e5955b7 100644 --- a/crates/config-delivery/src/main.rs +++ b/crates/config-delivery/src/main.rs @@ -2,11 +2,15 @@ mod delivery; mod integration; mod logger; +use integr_structs::api::ApiConfigV2; use logger::setup_logger; use dotenv::dotenv; use anyhow::Result; use tokio::sync::mpsc; -use log::info; +use log::{info, error}; +use integration::init_integration_mech; +use delivery::init_delivery_mech; +use std::sync::Arc; // Arch : // 1) 2 Unix-Socket client (for api grub and preproc) :: i think its a continious process for events when services are unavailable @@ -16,10 +20,27 @@ use log::info; #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _ = setup_logger().await?; - dotenv().ok(); info!("Pulling env vars from .env file if exists ..."); + dotenv().ok(); - // - + let (tx, rx) = mpsc::channel::(1); + let tx = Arc::new(tx); + let integr_jh = tokio::spawn( + async move { + if let Err(er) = init_integration_mech(tx).await { + error!("Error in integration submodule during to: {}", er); + } + } + ); + let deliv_future = tokio::spawn(async move { + if let Err(er) = init_delivery_mech(rx).await { + error!("Error in delivery submodule during to: {}", er); + } + }); + // for deliv adding + // let action_vec: Vec> = vec![integr_jh, deliv_future]; + for event in vec![integr_jh, deliv_future] { + let _ = event.await; + } Ok(()) } diff --git a/crates/integr-structs/Cargo.toml b/crates/integr-structs/Cargo.toml index fdf9743..9183deb 100644 --- a/crates/integr-structs/Cargo.toml +++ b/crates/integr-structs/Cargo.toml @@ -4,5 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1.0.95" serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.135" diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 70f07a4..de77a5e 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -1,4 +1,7 @@ +use std::collections::HashMap; use serde::{Serialize, Deserialize}; +use serde_json::{ to_string_pretty, Value }; +use anyhow::Result; #[derive(Serialize, Deserialize, Debug)] @@ -21,4 +24,124 @@ impl Default for ApiConfig { delay : 0, } } +} + +// v2 +#[derive(Serialize, Deserialize, Debug)] +pub struct ApiConfigV2 { + pub id : u64, + #[serde(default)] + pub template : Vec