integration-module/crates/api-grub/src/export.rs

111 lines
3.3 KiB
Rust

use deadpool_postgres::{Config, Pool, Runtime, Client as PgClient};
use integr_structs::api::v3::{PrometheusMetrics, PrometheusMetricsExtended};
use reqwest::Client;
use tokio_postgres::NoTls;
use std::env;
use anyhow::Result;
use log::{info, error};
use std::ops::Drop;
/// An entity which handles DB connections.
///
/// This struct is used to be a layer between API-grab workers
/// and the `pool` of DB connections to save grabbing processes statuses
/// (now `PostgreSQL`, slowly migrating to `ClickHouse` support)
///
/// # Examples
///
/// ```
/// use api-grub::export::Expoter;
///
/// let exporter = Expoter::init();
///
/// assert!(exporter.get_connection_from_pool().is_some());
/// ```
///
/// # Hint:
///
/// - use `export_data` method to export metrics to DB
///
/// - use `export_metrics` method to export metrics (`PrometehusMetrics`
/// type) to the Prometehus exporter (`$EXPORTER_URL`)
///
/// - use `export_metrics_extended` method to export metrics (
/// `*PrometehusMetricsExtended*` type with `desc` field) to the
/// Prometehus exporter (`$EXPORTER_URL`)
pub struct Exporter {
pool : Option<Pool>,
}
impl Exporter {
fn config_construct() -> Result<Config> {
let mut cfg = Config::new();
cfg.host = Some(env::var("DB_HOST")?);
cfg.dbname = Some(env::var("DB_DBNAME")?);
cfg.user = Some(env::var("DB_USER")?);
cfg.password = Some(env::var("DB_PASSWORD")?);
Ok(cfg)
}
fn pool_construct() -> Option<Pool> {
return match Self::config_construct() {
Ok(config) => {
if let Ok(pool) = config.create_pool(Some(Runtime::Tokio1), NoTls) {
info!("Connected to PostgreSQL");
return Some(pool);
}
None
},
Err(_) => {
error!("Bad DB credentials or it's unreachable");
None
},
}
}
#[allow(unused)]
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self {
Self {
pool : Self::pool_construct(),
}
}
#[allow(unused)]
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool {
return Some(pool.get().await.ok()?);
}
None
}
#[allow(unused)]
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
let req = Client::new()
.post(url)
.json(&metrics)
.send().await;
req?;
Ok(metrics.get_bytes_len())
}
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
let req = Client::new()
.post(url)
.json(&metrics)
.send().await;
req?;
Ok(metrics.get_bytes_len())
}
}
impl Drop for Exporter {
fn drop(&mut self) {
info!("Deinitializng Exporter and DB connection pool ...")
}
}