Compare commits

..

4 Commits

Author SHA1 Message Date
prplV 1d31dc6c59 debug rework to hide creds
test-org/integration-module/pipeline/pr-rc This commit looks good Details
2025-03-06 18:10:30 +03:00
prplV 39f901e6a6 main refactor 2025-03-06 17:53:13 +03:00
prplV 4c828c968e export docs fix 2025-03-06 17:52:57 +03:00
prplV c028699bab - useless comment 2025-03-06 11:00:17 +03:00
3 changed files with 60 additions and 10 deletions

View File

@ -38,6 +38,11 @@ pub struct Exporter {
}
impl Exporter {
/// Fills `deadpool_postgres::Config` object with values from ENV VARS:
/// - `DB_HOST`
/// - `DB_DBNAME`
/// - `DB_USER`
/// - `DB_PASSWORD`
fn config_construct() -> Result<Config> {
let mut cfg = Config::new();
cfg.host = Some(env::var("DB_HOST")?);
@ -46,6 +51,9 @@ impl Exporter {
cfg.password = Some(env::var("DB_PASSWORD")?);
Ok(cfg)
}
/// Uses `deadpool_postgres::Config` object to create DB connections
/// pool to share between async tasks and to restrict a count of parallel
/// connections
fn pool_construct() -> Option<Pool> {
return match Self::config_construct() {
Ok(config) => {
@ -61,6 +69,7 @@ impl Exporter {
},
}
}
/// Checks if DB connections pool is empty
#[allow(unused)]
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self {
@ -68,6 +77,9 @@ impl Exporter {
pool : Self::pool_construct(),
}
}
/// Shares a connection `deadpool_postgres::Client as PgClient`
///
/// Function awaits til the moment it can return `Option<deadpool_postgres::Client as PgClient>`
#[allow(unused)]
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool {
@ -75,12 +87,15 @@ impl Exporter {
}
None
}
/// Exports data in `&str` jsonb format to DB using connection from the pool
#[allow(unused)]
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
/// Exports metrics in `PrometheusMetrics` format to Exporter defined
/// as env var $EXORPTER_URL
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
@ -92,6 +107,8 @@ impl Exporter {
req?;
Ok(metrics.get_bytes_len())
}
/// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined
/// as env var $EXORPTER_URL
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
let req = Client::new()
@ -105,6 +122,7 @@ impl Exporter {
}
impl Drop for Exporter {
// Custom destructor to log deinitializing of the `Exporter`
fn drop(&mut self) {
info!("Deinitializng Exporter and DB connection pool ...")
}

View File

@ -6,10 +6,8 @@ mod export;
mod monitoring;
use anyhow::Result;
// use integr_structs::api::ApiConfigV2;
use integr_structs::api::v3::Config;
use logger::setup_logger;
// use log::{info, warn};
use config::{pull_local_config, init_config_grub_mechanism};
use net::init_api_grub_mechanism;
use tokio::sync::mpsc;
@ -23,10 +21,6 @@ async fn main() -> Result<()>{
let config = get_config().await;
// config update channel
let (tx, mut rx) = mpsc::channel::<Config>(1);
// futures
// todo : rewrite with spawn
// let config_fut = init_config_grub_mechanism(&tx);
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
let event_config = tokio::spawn(async move {
match init_config_grub_mechanism(&tx).await {
@ -63,7 +57,6 @@ async fn main() -> Result<()>{
for event in events_handler {
let _ = event.await;
}
// let _ = tokio::join!(config_fut, grub_fut);
Ok(())
}

View File

@ -76,7 +76,7 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(()));
/// ```
///
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct MonitoringImporter {
ip : String,
login : String,
@ -86,6 +86,17 @@ pub struct MonitoringImporter {
}
impl MonitoringImporter {
/// The most simple constructor for `MonitoringImporter`
///
/// Returns `Self` object that is constructing according to
/// env vars:
/// - `ENODE_MONITORING_IP`
/// - `ENODE_MONITORING_LOGIN`
/// - `ENODE_MONITORING_PASSWORD`
///
/// If env vars will not be set, it returns `Self` with
/// empty fields
///
pub async fn new() -> Self {
MonitoringImporter {
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
@ -95,12 +106,29 @@ impl MonitoringImporter {
ts : String::new(),
}
}
/// Function that checks is current `MonitoringImporter` valid
/// and can be used to pull and push info to and from CM
///
async fn is_valid(&self) -> bool {
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
}
/// A setter of `timestamp`
///
/// This function is needed to set a `timestamp` after
/// CM session creation.
///
/// This `timestamp` is a date of creation a session
/// on the CM Server
async fn set_ts(&mut self, ts: &str) {
self.ts = ts.to_owned();
}
/// A function for creation CM session
///
/// Returns OK(()) if session was created and there were
/// no errors (neither internal no external)
///
/// *Also* it saves ts and access-key in it's runtime environment,
/// there's no way to get access-key of session
pub async fn start_session(&mut self) -> anyhow::Result<()> {
if !self.is_valid().await {
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
@ -140,7 +168,6 @@ impl MonitoringImporter {
let cls = measure.get("cls");
let name = measure.get("name");
if id.is_some() && cls.is_some() {
// todo: later wait for Vaitaliy call of classification
let id = id.unwrap().as_i64().unwrap_or_default();
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
@ -279,3 +306,15 @@ impl MonitoringImporter {
})
}
}
impl std::fmt::Debug for MonitoringImporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MonitoringImporter")
.field("ip", &self.ip)
.field("login", &self.login)
.field("password", &"****")
.field("access_key", &"HIDDEN")
.field("ts", &self.ts)
.finish()
}
}