Compare commits

..

No commits in common. "1d31dc6c5908598cc86373dfb26e27b3208d6095" and "e952a07318cd8a106b2ec52615daaffa4548b877" have entirely different histories.

3 changed files with 10 additions and 60 deletions

View File

@ -38,11 +38,6 @@ pub struct Exporter {
}
impl Exporter {
/// Fills `deadpool_postgres::Config` object with values from ENV VARS:
/// - `DB_HOST`
/// - `DB_DBNAME`
/// - `DB_USER`
/// - `DB_PASSWORD`
fn config_construct() -> Result<Config> {
let mut cfg = Config::new();
cfg.host = Some(env::var("DB_HOST")?);
@ -51,9 +46,6 @@ impl Exporter {
cfg.password = Some(env::var("DB_PASSWORD")?);
Ok(cfg)
}
/// Uses `deadpool_postgres::Config` object to create DB connections
/// pool to share between async tasks and to restrict a count of parallel
/// connections
fn pool_construct() -> Option<Pool> {
return match Self::config_construct() {
Ok(config) => {
@ -69,7 +61,6 @@ impl Exporter {
},
}
}
/// Checks if DB connections pool is empty
#[allow(unused)]
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
pub fn init() -> Self {
@ -77,9 +68,6 @@ impl Exporter {
pool : Self::pool_construct(),
}
}
/// Shares a connection `deadpool_postgres::Client as PgClient`
///
/// Function awaits til the moment it can return `Option<deadpool_postgres::Client as PgClient>`
#[allow(unused)]
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
if let Some(pool) = &self.pool {
@ -87,15 +75,12 @@ impl Exporter {
}
None
}
/// Exports data in `&str` jsonb format to DB using connection from the pool
#[allow(unused)]
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?;
Ok(())
}
/// Exports metrics in `PrometheusMetrics` format to Exporter defined
/// as env var $EXORPTER_URL
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
@ -107,8 +92,6 @@ impl Exporter {
req?;
Ok(metrics.get_bytes_len())
}
/// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined
/// as env var $EXORPTER_URL
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
let url = env::var("EXPORTER_URL")?;
let req = Client::new()
@ -122,7 +105,6 @@ impl Exporter {
}
impl Drop for Exporter {
// Custom destructor to log deinitializing of the `Exporter`
fn drop(&mut self) {
info!("Deinitializng Exporter and DB connection pool ...")
}

View File

@ -6,8 +6,10 @@ mod export;
mod monitoring;
use anyhow::Result;
// use integr_structs::api::ApiConfigV2;
use integr_structs::api::v3::Config;
use logger::setup_logger;
// use log::{info, warn};
use config::{pull_local_config, init_config_grub_mechanism};
use net::init_api_grub_mechanism;
use tokio::sync::mpsc;
@ -21,6 +23,10 @@ async fn main() -> Result<()>{
let config = get_config().await;
// config update channel
let (tx, mut rx) = mpsc::channel::<Config>(1);
// futures
// todo : rewrite with spawn
// let config_fut = init_config_grub_mechanism(&tx);
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
let event_config = tokio::spawn(async move {
match init_config_grub_mechanism(&tx).await {
@ -57,7 +63,8 @@ async fn main() -> Result<()>{
for event in events_handler {
let _ = event.await;
}
// let _ = tokio::join!(config_fut, grub_fut);
Ok(())
}

View File

@ -76,7 +76,7 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(()));
/// ```
///
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct MonitoringImporter {
ip : String,
login : String,
@ -86,17 +86,6 @@ pub struct MonitoringImporter {
}
impl MonitoringImporter {
/// The most simple constructor for `MonitoringImporter`
///
/// Returns `Self` object that is constructing according to
/// env vars:
/// - `ENODE_MONITORING_IP`
/// - `ENODE_MONITORING_LOGIN`
/// - `ENODE_MONITORING_PASSWORD`
///
/// If env vars will not be set, it returns `Self` with
/// empty fields
///
pub async fn new() -> Self {
MonitoringImporter {
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
@ -106,29 +95,12 @@ impl MonitoringImporter {
ts : String::new(),
}
}
/// Function that checks is current `MonitoringImporter` valid
/// and can be used to pull and push info to and from CM
///
async fn is_valid(&self) -> bool {
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
}
/// A setter of `timestamp`
///
/// This function is needed to set a `timestamp` after
/// CM session creation.
///
/// This `timestamp` is a date of creation a session
/// on the CM Server
async fn set_ts(&mut self, ts: &str) {
self.ts = ts.to_owned();
}
/// A function for creation CM session
///
/// Returns OK(()) if session was created and there were
/// no errors (neither internal no external)
///
/// *Also* it saves ts and access-key in it's runtime environment,
/// there's no way to get access-key of session
pub async fn start_session(&mut self) -> anyhow::Result<()> {
if !self.is_valid().await {
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
@ -168,6 +140,7 @@ impl MonitoringImporter {
let cls = measure.get("cls");
let name = measure.get("name");
if id.is_some() && cls.is_some() {
// todo: later wait for Vaitaliy call of classification
let id = id.unwrap().as_i64().unwrap_or_default();
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
@ -305,16 +278,4 @@ impl MonitoringImporter {
value : val.clone()
})
}
}
impl std::fmt::Debug for MonitoringImporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MonitoringImporter")
.field("ip", &self.ip)
.field("login", &self.login)
.field("password", &"****")
.field("access_key", &"HIDDEN")
.field("ts", &self.ts)
.finish()
}
}