use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient}; use integr_structs::api::v3::{PrometheusMetrics, PrometheusMetricsExtended}; use reqwest::Client; use tokio_postgres::NoTls; use std::env; use anyhow::Result; use tracing::{debug, error, info, trace}; use std::ops::Drop; /// An entity which handles DB connections. /// /// This struct is used to be a layer between API-grab workers /// and the `pool` of DB connections to save grabbing processes statuses /// (now `PostgreSQL`, slowly migrating to `ClickHouse` support) /// /// # Examples /// /// ``` /// use api-grub::export::Expoter; /// /// let exporter = Expoter::init(); /// /// assert!(exporter.get_connection_from_pool().is_some()); /// ``` /// /// # Hint: /// /// - use `export_data` method to export metrics to DB /// /// - use `export_metrics` method to export metrics (`PrometehusMetrics` /// type) to the Prometehus exporter (`$EXPORTER_URL`) /// /// - use `export_metrics_extended` method to export metrics ( /// `*PrometehusMetricsExtended*` type with `desc` field) to the /// Prometehus exporter (`$EXPORTER_URL`) pub struct Exporter { pool : Option, } impl Exporter { /// Fills `deadpool_postgres::Config` object with values from ENV VARS: /// - `DB_HOST` /// - `DB_DBNAME` /// - `DB_USER` /// - `DB_PASSWORD` fn config_construct() -> Result { let mut cfg = Config::new(); cfg.host = Some(env::var("DB_HOST")?); cfg.dbname = Some(env::var("DB_DBNAME")?); cfg.user = Some(env::var("DB_USER")?); cfg.password = Some(env::var("DB_PASSWORD")?); Ok(cfg) } /// Uses `deadpool_postgres::Config` object to create DB connections /// pool to share between async tasks and to restrict a count of parallel /// connections fn pool_construct() -> Option { return match Self::config_construct() { Ok(config) => { if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) { info!("Connected to PostgreSQL"); return Some(pool); } None }, Err(_) => { error!("Bad DB credentials or it's unreachable"); None }, } } /// Checks if DB connections pool is empty #[allow(unused)] pub fn is_no_connection(&self) -> bool { self.pool.is_none() } pub fn init() -> Self { Self { pool : Self::pool_construct(), } } /// Shares a connection `deadpool_postgres::Client as PgClient` /// /// Function awaits til the moment it can return `Option` #[allow(unused)] pub async fn get_connection_from_pool(&self) -> Option { if let Some(pool) = &self.pool { return Some(pool.get().await.ok()?); } None } /// Exports data in `&str` jsonb format to DB using connection from the pool #[allow(unused)] #[tracing::instrument(name = "PostgreSQL export")] pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; let _ = client.query(&query, &[&metrics]).await?; Ok(()) } /// Exports metrics in `PrometheusMetrics` format to Exporter defined /// as env var $EXORPTER_URL #[tracing::instrument(name = "Prometheus export")] pub async fn export_metrics(metrics: PrometheusMetrics) -> Result { let url = env::var("EXPORTER_URL")?; debug!("Exporting: {:?}", &metrics); let req = Client::new() .post(url) .json(&metrics) .send().await; req?; Ok(metrics.get_bytes_len()) } /// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined /// as env var $EXORPTER_URL #[tracing::instrument(name = "Prometheus/Status System export")] pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result { // let url = env::var("EXPORTER_URL")?; let url = env::var("STATUS_SYSTEM_URL").or_else(|err| { trace!("cannot fetch $STATUS_SYSTEM_URL var due to {}. working only with Prometheus exporter link", err); env::var("EXPORTER_URL") })?; debug!("Exporting: {:?}", &metrics); let req = Client::new() .post(&url) .json(&metrics) .send().await; req?; Ok(metrics.get_bytes_len()) } } impl Drop for Exporter { // Custom destructor to log deinitializing of the `Exporter` fn drop(&mut self) { info!("Deinitializng Exporter and DB connection pool ...") } }