diff --git a/crates/api-grub/src/export.rs b/crates/api-grub/src/export.rs new file mode 100644 index 0000000..ee73421 --- /dev/null +++ b/crates/api-grub/src/export.rs @@ -0,0 +1,54 @@ +use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; +use tokio_postgres::NoTls; +use std::env; +use anyhow::{Error, Result}; +use log::{info, error}; + +pub struct Exporter { + pool : Option, +} + +impl Exporter { + fn config_construct() -> Result { + let mut cfg = Config::new(); + cfg.host = Some(env::var("DB_HOST")?); + cfg.dbname = Some(env::var("DB_DBNAME")?); + cfg.user = Some(env::var("DB_USER")?); + cfg.password = Some(env::var("DB_PASSWORD")?); + Ok(cfg) + } + fn pool_construct() -> Option { + return match Self::config_construct() { + Ok(config) => { + if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) { + info!("Connected to PostgreSQL"); + return Some(pool); + } + None + }, + Err(_) => { + error!("Bad DB credentials or it's unreachable"); + None + }, + } + } + pub fn is_no_connection(&self) -> bool { self.pool.is_none() } + pub fn init() -> Self { + Self { + pool : Self::pool_construct() + } + } + pub async fn get_connection_from_pool(&self) -> Option { + if let Some(pool) = &self.pool { + return Some(pool.get().await.ok()?); + } + None + } + pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { + // client. + let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; + let _ = client.query(&query, &[&metrics]).await?; + Ok(()) + } + +} \ No newline at end of file diff --git a/crates/api-grub/src/json.rs b/crates/api-grub/src/json.rs index 58bb968..c8c6c12 100644 --- a/crates/api-grub/src/json.rs +++ b/crates/api-grub/src/json.rs @@ -1,22 +1,22 @@ // use serde::{de::value, Serialize}; use serde_json::{json, Value}; -use integr_structs::api::v3::{TestMetric, TestMetricOutput}; +use integr_structs::api::v3::{Metric, MetricOutput}; pub struct JsonParser; impl JsonParser { - pub fn parse(targets: &Vec, json: &str) -> Value { - let mut res_vec: Vec = Vec::new(); + pub fn parse(targets: &Vec, json: &str) -> Value { + let mut res_vec: Vec = Vec::new(); for target in targets { let metric = match target.addr.contains("[") { true => JsonParser::get_sum_of_metrics_in_array(target, json), false => JsonParser::get_metric(target, json), }; - res_vec.push(TestMetricOutput::new_with_slices(&target.id, &target.json_type, metric)); + res_vec.push(MetricOutput::new_with_slices(&target.id, &target.json_type, metric)); } serde_json::to_value(res_vec).unwrap_or(Value::Null) } - fn get_sum_of_metrics_in_array(target: &TestMetric, json: &str) -> Value { + fn get_sum_of_metrics_in_array(target: &Metric, json: &str) -> Value { if target.addr.is_empty() { return Value::Null; } @@ -133,7 +133,7 @@ impl JsonParser { values } - fn get_metric(target: &TestMetric, json: &str) -> Value { + fn get_metric(target: &Metric, json: &str) -> Value { if target.addr.is_empty() { return Value::Null; } @@ -151,26 +151,3 @@ impl JsonParser { value_json } } - - -// pub struct TestMetric { -// pub id : String, -// pub json_type : String, -// pub addr : String, -// } - -// #[derive(Serialize)] -// struct TestMetricOutput { -// id : String, -// json_type : String, -// value : Value, -// } -// impl TestMetricOutput { -// fn new_with_slices(id : &str, json_type : &str, value : Value) -> Self { -// TestMetricOutput { -// id : id.to_string(), -// json_type : json_type.to_string(), -// value : value, -// } -// } -// } \ No newline at end of file diff --git a/crates/api-grub/src/main.rs b/crates/api-grub/src/main.rs index 5997ae8..f35511c 100644 --- a/crates/api-grub/src/main.rs +++ b/crates/api-grub/src/main.rs @@ -2,14 +2,16 @@ mod config; mod net; mod logger; mod json; +mod export; use anyhow::Result; use integr_structs::api::ApiConfigV2; use logger::setup_logger; -use log::{info, warn}; +// use log::{info, warn}; use config::{pull_local_config, init_config_grub_mechanism}; use net::init_api_grub_mechanism; use tokio::sync::mpsc; +use log::{error, info, warn}; #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()>{ @@ -23,10 +25,34 @@ async fn main() -> Result<()>{ let (tx, mut rx) = mpsc::channel::(1); // futures // todo : rewrite with spawn - let config_fut = init_config_grub_mechanism(&tx); - let grub_fut = init_api_grub_mechanism(config, &mut rx); + // let config_fut = init_config_grub_mechanism(&tx); + // let grub_fut = init_api_grub_mechanism(config, &mut rx); - let _ = tokio::join!(config_fut, grub_fut); + let event_config = tokio::spawn(async move { + match init_config_grub_mechanism(&tx).await { + Ok(_) => { + info!("Config task deinitialized"); + }, + Err(er) => { + error!("Config task returned an error : {}", er); + }, + } + }); + let event_grub = tokio::spawn(async move { + match init_api_grub_mechanism(config, &mut rx).await { + Ok(_) => { + info!("Grabing task deinitialized"); + }, + Err(er) => { + error!("Grabing task returned an error : {}", er); + }, + } + }); + let events_handler = vec![event_config, event_grub]; + for event in events_handler { + let _ = event.await; + } + // let _ = tokio::join!(config_fut, grub_fut); Ok(()) } diff --git a/crates/api-grub/src/net.rs b/crates/api-grub/src/net.rs index b7b48bd..61a394a 100644 --- a/crates/api-grub/src/net.rs +++ b/crates/api-grub/src/net.rs @@ -8,62 +8,42 @@ use reqwest::{Client, Method}; use std::sync::Arc; use tokio::task::JoinHandle; // use tokio::sync::Mutex; -use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; -use tokio_postgres::NoTls; use dotenv::dotenv; -use std::env; +use crate::json::JsonParser; +use crate::export::Exporter; +use integr_structs::api::v3::Config; // type BufferType = Arc>>; -struct Exporter { - pool : Option, -} +// for api info pulling +pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver) -> Result<()> { + info!("Initializing API-info grubbing mechanism..."); + info!("Loading vars from .env file if exists..."); + let _ = dotenv().ok(); -impl Exporter { - fn config_construct() -> Result { - let mut cfg = Config::new(); - cfg.host = Some(env::var("DB_HOST")?); - cfg.dbname = Some(env::var("DB_DBNAME")?); - cfg.user = Some(env::var("DB_USER")?); - cfg.password = Some(env::var("DB_PASSWORD")?); - Ok(cfg) - } - fn pool_construct() -> Option { - return match Self::config_construct() { - Ok(config) => { - if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) { - info!("Connected to PostgreSQL"); - return Some(pool); + let mut config = config; + let mut poller = ApiPoll::new(&mut config).await; + let client = Exporter::init(); + let shared_pool = Arc::new(client); + loop { + let shared_pool = shared_pool.clone(); + if poller.is_default().await { + sleep(Duration::from_secs(5)).await; + } else { + if rx.len() > 0 { + if let Some(conf) = rx.recv().await { + poller.change_config(conf).await; + info!("Config changed"); } - None - }, - Err(_) => { - error!("Bad DB credentials or it's unreachable"); - None - }, - } - } - pub fn is_no_connection(&self) -> bool { self.pool.is_none() } - pub fn init() -> Self { - Self { - pool : Self::pool_construct() + } + info!("Data from API: {:?}", poller.process_polling(shared_pool).await); + sleep(Duration::from_secs(poller.get_delay().await as u64)).await; } } - pub async fn get_connection_from_pool(&self) -> Option { - if let Some(pool) = &self.pool { - return Some(pool.get().await.ok()?); - } - None - } - pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { - // client. - let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; - let _ = client.query(&query, &[&metrics]).await?; - Ok(()) - } - + // Ok(()) } + struct RestMethod; impl RestMethod { @@ -187,34 +167,6 @@ impl<'a> ApiPoll<'a> { } } -// for api info pulling -pub async fn init_api_grub_mechanism(config: ApiConfigV2, rx: &mut Receiver) -> Result<()> { - info!("Initializing API-info grubbing mechanism..."); - info!("Loading vars from .env file if exists..."); - let _ = dotenv().ok(); - - let mut config = config; - let mut poller = ApiPoll::new(&mut config).await; - let client = Exporter::init(); - let shared_pool = Arc::new(client); - loop { - let shared_pool = shared_pool.clone(); - if poller.is_default().await { - sleep(Duration::from_secs(5)).await; - } else { - if rx.len() > 0 { - if let Some(conf) = rx.recv().await { - poller.change_config(conf).await; - info!("Config changed"); - } - } - info!("Data from API: {:?}", poller.process_polling(shared_pool).await); - sleep(Duration::from_secs(poller.get_delay().await as u64)).await; - } - } - // Ok(()) -} - #[cfg(test)] mod net_unittests { use super::*; diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index 97e96f5..4563655 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -151,23 +151,49 @@ pub mod v3 { // in config #[derive(Deserialize)] - pub struct TestMetric { + pub struct Metric { pub id : String, #[serde(rename = "type")] pub json_type : String, pub addr : String, } - + #[derive(Deserialize)] + pub struct Metrics { + pub name : String, + pub url : String, + #[serde(default)] + pub measure : Vec + } + + #[derive(Deserialize)] + pub struct ConfigEndpoint { + ip : String, + login : String, + #[serde(rename = "pass")] + password : String, + api_key : String, + period : String, + timeout : String, + #[serde(default)] + metrics : Vec, + } + + #[derive(Deserialize)] + pub struct Config { + config : Vec, + } + + // to prometheus and nmns #[derive(Serialize, Deserialize)] - pub struct TestMetricOutput { + pub struct MetricOutput { id : String, #[serde(rename = "type")] json_type : String, value : Value, } - impl TestMetricOutput { + impl MetricOutput { pub fn new_with_slices(id : &str, json_type : &str, value : Value) -> Self { - TestMetricOutput { + MetricOutput { id : id.to_string(), json_type : json_type.to_string(), value : value, diff --git a/template_global_config.json b/template_global_config.json index fe911ce..0b5e33f 100644 --- a/template_global_config.json +++ b/template_global_config.json @@ -1,31 +1,32 @@ -{ - "id" : 1 , - "template" : - [{ - "id" :"mock_api_1", - "name" : "Mock / ", - "url" : "http://127.0.0.1:8081/", - "method" : "GET", - "measure" : - [ - "operation", "response", "empty_field" - ] - }, +{ + "config": [ { - "id" :"mock_api_2", - "name" : "Mock /ping ", - "url" : "http://127.0.0.1:8081/ping", - "method" : "GET", - "measure" : - [ - "operation", "response", "empty_field" - ] - } - ], - "ip_address" : "127.0.0.1:8081", - "login" : "", - "pass" : "" , - "api_key" : "908c709827bd40n98r7209837x98273", - "period" : 10, - "timeout" : 2 -} + "id":"demo_vcs_vinteo_dev_api", + "login" : "", + "pass" : "", + "api_key" : "6fe8b0db-62b4-4065-9c1e-441ec4228341.9acec20bd17d7178f332896f8c006452877a22b8627d089105ed39c5baef9711", + "period" : "", + "timeout" : "3", + "metrics" : [ + { + "name": "conferences", + "url": "https://demo.vcs.vinteo.dev/api/v1/conferences", + "measure": [ + { "id":"number", "type": "text", "addr": "data.conferences[].number" }, + { "id":"total", "type": "integer", "addr": "data.total" }, + { "id":"participants_total", "type": "integer", "addr": "data.conferences[].participants.total" }, + { "id":"parts_total_in_each", "type": "integer", "addr": "data.conferences[description].participants.total" }, + { "id":"participants_online", "type": "integer", "addr": "data.conferences[].participants.online" } + ] + }, + { + "name": "abonents", + "url": "https://demo.vcs.vinteo.dev/api/v1/accounts", + "measure": [ + { "id":"total", "type": "integer", "addr": "data.total" } + ] + } + ] + } + ] +} \ No newline at end of file