diff --git a/crates/api-grub/src/config.rs b/crates/api-grub/src/config.rs index 2d8c835..c0daf03 100644 --- a/crates/api-grub/src/config.rs +++ b/crates/api-grub/src/config.rs @@ -15,6 +15,20 @@ const CONFIG_PATH: &str = "config_api.json"; const SOCKET_PATH: &str = "api-grub.sock"; // TODO: rewrite to use current_exe +/// # Fn `pull_local_config` +/// +/// ## function to one-time pulling local config by straight reading +/// +/// ### Dev-Info : +/// +/// *input* : - +/// +/// *output* : `anyhow::Result` +/// +/// *initiator* : fn `main` +/// +/// *managing* : - +/// pub async fn pull_local_config() -> Result { let path = Path::new(CONFIG_PATH); if path.exists() && path.is_file() { @@ -28,6 +42,22 @@ pub async fn pull_local_config() -> Result { } // for config pulling +/// # Fn `pull_local_config` +/// +/// ## function to init Unix-Socket listening for grabbing new configs +/// +/// ### Dev-Info : +/// +/// *input* : `&tokio::sync::mpsc::Sender` +/// +/// *output* : `anyhow::Result<()>` +/// +/// *initiator* : fn `main` +/// +/// *managing* : - +/// +/// *depends on* : `const SOCKET_PATH` +/// pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { info!("Initializing Unix-Socket listening for pulling new configs..."); let server = init_unix_listener().await?; @@ -57,12 +87,43 @@ pub async fn init_config_grub_mechanism(tx: &Sender) -> Result<()> { } // saving new config locally +/// # Fn `save_new_config` +/// +/// ## function for saving new config locally +/// +/// ### Dev-Info : +/// +/// *input* : `&String | &str` +/// +/// *output* : `anyhow::Result<()>` +/// +/// *initiator* : fn `main` +/// +/// *managing* : - +/// +/// *depends on* : `const CONFIG_PATH` +/// async fn save_new_config(config: &String) -> Result<()> { fs::write(CONFIG_PATH, config)?; Ok(()) } - +/// # Fn `pull_local_config` +/// +/// ## function for saving new config locally +/// +/// ### Dev-Info : +/// +/// *input* : `&tokio::sync::mpsc::Sender` +/// +/// *output* : `anyhow::Result` +/// +/// *initiator* : fn `init_config_grub_mechanism` +/// +/// *managing* : - +/// +/// *depends on* : `const SOCKET_PATH` +/// async fn init_unix_listener() -> Result { let _ = fs::remove_file(SOCKET_PATH); Ok(UnixListener::bind(SOCKET_PATH)?) diff --git a/crates/api-grub/src/export.rs b/crates/api-grub/src/export.rs index a984f7f..bc006e6 100644 --- a/crates/api-grub/src/export.rs +++ b/crates/api-grub/src/export.rs @@ -4,13 +4,45 @@ use reqwest::Client; use tokio_postgres::NoTls; use std::env; use anyhow::Result; -use log::{info, error}; +use log::{debug, error, info}; +use std::ops::Drop; +/// An entity which handles DB connections. +/// +/// This struct is used to be a layer between API-grab workers +/// and the `pool` of DB connections to save grabbing processes statuses +/// (now `PostgreSQL`, slowly migrating to `ClickHouse` support) +/// +/// # Examples +/// +/// ``` +/// use api-grub::export::Expoter; +/// +/// let exporter = Expoter::init(); +/// +/// assert!(exporter.get_connection_from_pool().is_some()); +/// ``` +/// +/// # Hint: +/// +/// - use `export_data` method to export metrics to DB +/// +/// - use `export_metrics` method to export metrics (`PrometehusMetrics` +/// type) to the Prometehus exporter (`$EXPORTER_URL`) +/// +/// - use `export_metrics_extended` method to export metrics ( +/// `*PrometehusMetricsExtended*` type with `desc` field) to the +/// Prometehus exporter (`$EXPORTER_URL`) pub struct Exporter { pool : Option, } impl Exporter { + /// Fills `deadpool_postgres::Config` object with values from ENV VARS: + /// - `DB_HOST` + /// - `DB_DBNAME` + /// - `DB_USER` + /// - `DB_PASSWORD` fn config_construct() -> Result { let mut cfg = Config::new(); cfg.host = Some(env::var("DB_HOST")?); @@ -19,6 +51,9 @@ impl Exporter { cfg.password = Some(env::var("DB_PASSWORD")?); Ok(cfg) } + /// Uses `deadpool_postgres::Config` object to create DB connections + /// pool to share between async tasks and to restrict a count of parallel + /// connections fn pool_construct() -> Option { return match Self::config_construct() { Ok(config) => { @@ -34,6 +69,7 @@ impl Exporter { }, } } + /// Checks if DB connections pool is empty #[allow(unused)] pub fn is_no_connection(&self) -> bool { self.pool.is_none() } pub fn init() -> Self { @@ -41,6 +77,9 @@ impl Exporter { pool : Self::pool_construct(), } } + /// Shares a connection `deadpool_postgres::Client as PgClient` + /// + /// Function awaits til the moment it can return `Option` #[allow(unused)] pub async fn get_connection_from_pool(&self) -> Option { if let Some(pool) = &self.pool { @@ -48,15 +87,20 @@ impl Exporter { } None } + /// Exports data in `&str` jsonb format to DB using connection from the pool #[allow(unused)] pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; let _ = client.query(&query, &[&metrics]).await?; Ok(()) } + /// Exports metrics in `PrometheusMetrics` format to Exporter defined + /// as env var $EXORPTER_URL pub async fn export_metrics(metrics: PrometheusMetrics) -> Result { let url = env::var("EXPORTER_URL")?; + debug!("Exporting: {:?}", &metrics); + let req = Client::new() .post(url) .json(&metrics) @@ -65,8 +109,13 @@ impl Exporter { req?; Ok(metrics.get_bytes_len()) } + /// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined + /// as env var $EXORPTER_URL pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result { let url = env::var("EXPORTER_URL")?; + + debug!("Exporting: {:?}", &metrics); + let req = Client::new() .post(url) .json(&metrics) @@ -75,4 +124,11 @@ impl Exporter { Ok(metrics.get_bytes_len()) } +} + +impl Drop for Exporter { + // Custom destructor to log deinitializing of the `Exporter` + fn drop(&mut self) { + info!("Deinitializng Exporter and DB connection pool ...") + } } \ No newline at end of file diff --git a/crates/api-grub/src/json.rs b/crates/api-grub/src/json.rs index c6ecedd..8e30b83 100644 --- a/crates/api-grub/src/json.rs +++ b/crates/api-grub/src/json.rs @@ -2,6 +2,21 @@ use serde_json::{json, Value}; use integr_structs::api::v3::{Metric, MetricOutput}; +/// A JSON-parser struct +/// +/// Using in metric extracting from Server Response +/// with metrics mechanism +/// +/// # Example +/// +/// ``` +/// use api-grub::json::JsonParser; +/// use use integr_structs::api::v3::Metric; +/// +/// let json = b""flat1" : { "room1" : { "rt_tempo" : "+16" }}".to_vec(); +/// +/// assert!(!JsonParser::parse(vec![Metric::template()], json).is_empty()); +/// ``` pub struct JsonParser; impl JsonParser { diff --git a/crates/api-grub/src/logger.rs b/crates/api-grub/src/logger.rs index de3e8fa..f69fe22 100644 --- a/crates/api-grub/src/logger.rs +++ b/crates/api-grub/src/logger.rs @@ -5,6 +5,22 @@ use std::io::Write; use anyhow::Result; use log::info; +/// # Fn `setup_logger` +/// +/// ## function to init terminal logger +/// +/// ### Dev-Info : +/// +/// *input* : - +/// +/// *output* : `anyhow::Result<()>` +/// +/// *initiator* : fn `main` +/// +/// *managing* : - +/// +/// *depends on* : - +/// pub async fn setup_logger() -> Result<()> { Builder::new() .format(move |buf, record| { @@ -17,7 +33,7 @@ pub async fn setup_logger() -> Result<()> { record.args(), ) }) - .filter(None, LevelFilter::Info) + .filter(None, LevelFilter::Debug) .target(env_logger::Target::Stdout) .init(); diff --git a/crates/api-grub/src/main.rs b/crates/api-grub/src/main.rs index cac81e3..aedd48c 100644 --- a/crates/api-grub/src/main.rs +++ b/crates/api-grub/src/main.rs @@ -6,10 +6,8 @@ mod export; mod monitoring; use anyhow::Result; -// use integr_structs::api::ApiConfigV2; use integr_structs::api::v3::Config; use logger::setup_logger; -// use log::{info, warn}; use config::{pull_local_config, init_config_grub_mechanism}; use net::init_api_grub_mechanism; use tokio::sync::mpsc; @@ -23,10 +21,6 @@ async fn main() -> Result<()>{ let config = get_config().await; // config update channel let (tx, mut rx) = mpsc::channel::(1); - // futures - // todo : rewrite with spawn - // let config_fut = init_config_grub_mechanism(&tx); - // let grub_fut = init_api_grub_mechanism(config, &mut rx); let event_config = tokio::spawn(async move { match init_config_grub_mechanism(&tx).await { @@ -63,8 +57,7 @@ async fn main() -> Result<()>{ for event in events_handler { let _ = event.await; } - // let _ = tokio::join!(config_fut, grub_fut); - + Ok(()) } diff --git a/crates/api-grub/src/monitoring.rs b/crates/api-grub/src/monitoring.rs index aef585e..8203c4c 100644 --- a/crates/api-grub/src/monitoring.rs +++ b/crates/api-grub/src/monitoring.rs @@ -13,6 +13,31 @@ use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended}; use log::{error, info, warn}; use std::collections::HashMap; +/// # Fn `get_metrics_from_monitoring` +/// +/// A function to init pulling and exporting metrics mechanism +/// from CM to Exporter. It spawns async tasks to get measures +/// and their values and then extract needed info to export +/// +/// ### Dev-Info : +/// +/// *input* : duration and delay as `usize` (in secs) +/// +/// *output* : `anyhow::Result<()>` +/// +/// *initiator* : fn `main` +/// +/// *managing* : runtime of N async tasks (N - count of chunks) +/// +/// # Example +/// +/// ``` +/// use api-grub::monitoring::get_metrics_from_monitoring; +/// +/// // exec func without time restriction but with delay in 5 secs +/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(())); +/// ``` +/// pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { let timer = tokio::time::Instant::now(); 'outer: loop { @@ -34,7 +59,24 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho Ok(()) } -#[derive(Debug, Clone)] +/// An entity which handle CM creds +/// +/// Used to capture measures and there values, to preprocess all measures to +/// relevant Exporter's structure +/// +/// # Example: +/// +/// ``` +/// use api-grub::monitoring::MonitoringImporter; +/// +/// let mut a = MonitoringImporter::new().await; +/// a.start_session().await?; +/// let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![])); +/// +/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(())); +/// ``` +/// +#[derive(Clone)] pub struct MonitoringImporter { ip : String, login : String, @@ -44,6 +86,17 @@ pub struct MonitoringImporter { } impl MonitoringImporter { + /// The most simple constructor for `MonitoringImporter` + /// + /// Returns `Self` object that is constructing according to + /// env vars: + /// - `ENODE_MONITORING_IP` + /// - `ENODE_MONITORING_LOGIN` + /// - `ENODE_MONITORING_PASSWORD` + /// + /// If env vars will not be set, it returns `Self` with + /// empty fields + /// pub async fn new() -> Self { MonitoringImporter { ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()), @@ -53,12 +106,29 @@ impl MonitoringImporter { ts : String::new(), } } + /// Function that checks is current `MonitoringImporter` valid + /// and can be used to pull and push info to and from CM + /// async fn is_valid(&self) -> bool { !self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty() } + /// A setter of `timestamp` + /// + /// This function is needed to set a `timestamp` after + /// CM session creation. + /// + /// This `timestamp` is a date of creation a session + /// on the CM Server async fn set_ts(&mut self, ts: &str) { self.ts = ts.to_owned(); } + /// A function for creation CM session + /// + /// Returns OK(()) if session was created and there were + /// no errors (neither internal no external) + /// + /// *Also* it saves ts and access-key in it's runtime environment, + /// there's no way to get access-key of session pub async fn start_session(&mut self) -> anyhow::Result<()> { if !self.is_valid().await { return Err(Error::msg("Invalid eNODE-Monitoring configuration")); @@ -81,6 +151,12 @@ impl MonitoringImporter { Ok(()) } + /// A function for pulling measures list + /// + /// Used with actual credentials for current CM session + /// and returning measures in format of `Ok(Vec<(String, String)>)` + /// , where `(String, String)` is a tuple of measure `id` and `description` + /// (`name`) pub async fn get_metrics_list(&self) -> anyhow::Result> { let client = Client::new(); let mut vec: Vec<(String, String)> = Vec::new(); @@ -98,7 +174,6 @@ impl MonitoringImporter { let cls = measure.get("cls"); let name = measure.get("name"); if id.is_some() && cls.is_some() { - // todo: later wait for Vaitaliy call of classification let id = id.unwrap().as_i64().unwrap_or_default(); let cls = cls.unwrap().as_str().unwrap_or_else(|| ""); let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null"); @@ -114,6 +189,22 @@ impl MonitoringImporter { info!("List of measures was pulled, total - {}", &vec.len()); Ok(vec) } + + /// A function to get realtime data + /// + /// It pulles info about 1 measure or a slice of measures and + /// exports all data to Prometehus exporter + /// + /// # How it works + /// 1) creates a restriction for max count of async + /// tasks (`tokio::sync::Semaphore`) + /// + /// 2) divides vec of measures in case of creating chunks with + /// the most optimal sizes to optimize self and server load + /// + /// 3) spawns async tasks-grabbers to get measures info which + /// exprots all data by itselfs + /// pub async fn get_measure_info(&self, measures: Arc>) -> anyhow::Result<()> { let mut sys = sysinfo::System::new(); sys.refresh_cpu_all(); @@ -156,6 +247,17 @@ impl MonitoringImporter { } Ok(()) } + + /// An async task-grabber + /// + /// Used to create request to the CM server and + /// get all measure(s) data + /// + /// # Also + /// An argument `measure: Arc` can be a single measure like `measure$1` or + /// a slice of measures in special format `%5B%22measure$1%22,%20%22measure$2%22%5D`. + /// This is a neccesary measure to handle two types of requests and URL restrictions + /// async fn process_endpoint(measure: Arc, client: Arc, arc: Arc, hm: &HashMap) -> anyhow::Result { let resp = client .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) @@ -170,6 +272,21 @@ impl MonitoringImporter { PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await ) } + + /// An recursive extractor of data + /// + /// Uses target-json CM-Server response as `Value` and HashMap of + /// measures' `id`s and their appropriate `description`s + /// + /// # How it works + /// 1) if `Value` is an `Object` -> executes `Self::process_value` on it and + /// returns result of the function as `Vec` + /// + /// 2) if `Value` is an `Array` -> self-executes for each pat of the array + /// and aggregates all data in the `Vec` by using `.append(&mut Vec<...>)` + /// + /// 3) if `Value` is `_` -> returns error **Invalid JSON format** + /// fn extract_metric_data(json: Value, hm: &HashMap) -> Pin>> + Send + '_>> { Box::pin(async move { return match json { @@ -190,6 +307,11 @@ impl MonitoringImporter { } }) } + + /// A function-extractor for single measure object + /// + /// Searches for certain fields and aggregates it in the `MetricOutputExtended` + /// object async fn process_value(obj : &Map, hm: &HashMap) -> anyhow::Result { let id = obj.get("$id"); let val = obj.get("value"); @@ -236,4 +358,16 @@ impl MonitoringImporter { value : val.clone() }) } +} + +impl std::fmt::Debug for MonitoringImporter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MonitoringImporter") + .field("ip", &self.ip) + .field("login", &self.login) + .field("password", &"****") + .field("access_key", &"HIDDEN") + .field("ts", &self.ts) + .finish() + } } \ No newline at end of file diff --git a/crates/integr-structs/src/api.rs b/crates/integr-structs/src/api.rs index d96feeb..4c72b68 100644 --- a/crates/integr-structs/src/api.rs +++ b/crates/integr-structs/src/api.rs @@ -160,6 +160,16 @@ pub mod v3 { pub json_type : String, pub addr : String, } + impl Metric { + pub fn template() -> Self { + Self { + id : "room_tempo".to_string(), + json_type : "String".to_string(), + addr : "flat1.room1.rt_tempo".to_string(), + } + } + } + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Metrics { pub name : String, @@ -411,7 +421,6 @@ pub mod enode_monitoring { } } - // "{\"access_token\":\"5BNQsmiGFQRNA651HeQxZekYgYUAWZ4e\",\"role\":\"administrator\",\"startRT\":\"2025-02-25T09:03:27.581Z\",\"login\":\"admin\",\"push_active\":null,\"$id\":\"systemuser$1\"}", #[derive(Debug, Deserialize)] pub struct AuthResponse { pub access_token : String,