Compare commits
4 Commits
e952a07318
...
1d31dc6c59
| Author | SHA1 | Date |
|---|---|---|
|
|
1d31dc6c59 | |
|
|
39f901e6a6 | |
|
|
4c828c968e | |
|
|
c028699bab |
|
|
@ -38,6 +38,11 @@ pub struct Exporter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Exporter {
|
impl Exporter {
|
||||||
|
/// Fills `deadpool_postgres::Config` object with values from ENV VARS:
|
||||||
|
/// - `DB_HOST`
|
||||||
|
/// - `DB_DBNAME`
|
||||||
|
/// - `DB_USER`
|
||||||
|
/// - `DB_PASSWORD`
|
||||||
fn config_construct() -> Result<Config> {
|
fn config_construct() -> Result<Config> {
|
||||||
let mut cfg = Config::new();
|
let mut cfg = Config::new();
|
||||||
cfg.host = Some(env::var("DB_HOST")?);
|
cfg.host = Some(env::var("DB_HOST")?);
|
||||||
|
|
@ -46,6 +51,9 @@ impl Exporter {
|
||||||
cfg.password = Some(env::var("DB_PASSWORD")?);
|
cfg.password = Some(env::var("DB_PASSWORD")?);
|
||||||
Ok(cfg)
|
Ok(cfg)
|
||||||
}
|
}
|
||||||
|
/// Uses `deadpool_postgres::Config` object to create DB connections
|
||||||
|
/// pool to share between async tasks and to restrict a count of parallel
|
||||||
|
/// connections
|
||||||
fn pool_construct() -> Option<Pool> {
|
fn pool_construct() -> Option<Pool> {
|
||||||
return match Self::config_construct() {
|
return match Self::config_construct() {
|
||||||
Ok(config) => {
|
Ok(config) => {
|
||||||
|
|
@ -61,6 +69,7 @@ impl Exporter {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Checks if DB connections pool is empty
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||||
pub fn init() -> Self {
|
pub fn init() -> Self {
|
||||||
|
|
@ -68,6 +77,9 @@ impl Exporter {
|
||||||
pool : Self::pool_construct(),
|
pool : Self::pool_construct(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Shares a connection `deadpool_postgres::Client as PgClient`
|
||||||
|
///
|
||||||
|
/// Function awaits til the moment it can return `Option<deadpool_postgres::Client as PgClient>`
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||||
if let Some(pool) = &self.pool {
|
if let Some(pool) = &self.pool {
|
||||||
|
|
@ -75,12 +87,15 @@ impl Exporter {
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
/// Exports data in `&str` jsonb format to DB using connection from the pool
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||||
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||||
let _ = client.query(&query, &[&metrics]).await?;
|
let _ = client.query(&query, &[&metrics]).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
/// Exports metrics in `PrometheusMetrics` format to Exporter defined
|
||||||
|
/// as env var $EXORPTER_URL
|
||||||
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
|
|
||||||
|
|
@ -92,6 +107,8 @@ impl Exporter {
|
||||||
req?;
|
req?;
|
||||||
Ok(metrics.get_bytes_len())
|
Ok(metrics.get_bytes_len())
|
||||||
}
|
}
|
||||||
|
/// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined
|
||||||
|
/// as env var $EXORPTER_URL
|
||||||
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
|
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
let req = Client::new()
|
let req = Client::new()
|
||||||
|
|
@ -105,6 +122,7 @@ impl Exporter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Exporter {
|
impl Drop for Exporter {
|
||||||
|
// Custom destructor to log deinitializing of the `Exporter`
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
info!("Deinitializng Exporter and DB connection pool ...")
|
info!("Deinitializng Exporter and DB connection pool ...")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,8 @@ mod export;
|
||||||
mod monitoring;
|
mod monitoring;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
// use integr_structs::api::ApiConfigV2;
|
|
||||||
use integr_structs::api::v3::Config;
|
use integr_structs::api::v3::Config;
|
||||||
use logger::setup_logger;
|
use logger::setup_logger;
|
||||||
// use log::{info, warn};
|
|
||||||
use config::{pull_local_config, init_config_grub_mechanism};
|
use config::{pull_local_config, init_config_grub_mechanism};
|
||||||
use net::init_api_grub_mechanism;
|
use net::init_api_grub_mechanism;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
@ -23,10 +21,6 @@ async fn main() -> Result<()>{
|
||||||
let config = get_config().await;
|
let config = get_config().await;
|
||||||
// config update channel
|
// config update channel
|
||||||
let (tx, mut rx) = mpsc::channel::<Config>(1);
|
let (tx, mut rx) = mpsc::channel::<Config>(1);
|
||||||
// futures
|
|
||||||
// todo : rewrite with spawn
|
|
||||||
// let config_fut = init_config_grub_mechanism(&tx);
|
|
||||||
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
|
||||||
|
|
||||||
let event_config = tokio::spawn(async move {
|
let event_config = tokio::spawn(async move {
|
||||||
match init_config_grub_mechanism(&tx).await {
|
match init_config_grub_mechanism(&tx).await {
|
||||||
|
|
@ -63,8 +57,7 @@ async fn main() -> Result<()>{
|
||||||
for event in events_handler {
|
for event in events_handler {
|
||||||
let _ = event.await;
|
let _ = event.await;
|
||||||
}
|
}
|
||||||
// let _ = tokio::join!(config_fut, grub_fut);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -76,7 +76,7 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
|
||||||
/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(()));
|
/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(()));
|
||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MonitoringImporter {
|
pub struct MonitoringImporter {
|
||||||
ip : String,
|
ip : String,
|
||||||
login : String,
|
login : String,
|
||||||
|
|
@ -86,6 +86,17 @@ pub struct MonitoringImporter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MonitoringImporter {
|
impl MonitoringImporter {
|
||||||
|
/// The most simple constructor for `MonitoringImporter`
|
||||||
|
///
|
||||||
|
/// Returns `Self` object that is constructing according to
|
||||||
|
/// env vars:
|
||||||
|
/// - `ENODE_MONITORING_IP`
|
||||||
|
/// - `ENODE_MONITORING_LOGIN`
|
||||||
|
/// - `ENODE_MONITORING_PASSWORD`
|
||||||
|
///
|
||||||
|
/// If env vars will not be set, it returns `Self` with
|
||||||
|
/// empty fields
|
||||||
|
///
|
||||||
pub async fn new() -> Self {
|
pub async fn new() -> Self {
|
||||||
MonitoringImporter {
|
MonitoringImporter {
|
||||||
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
|
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
|
||||||
|
|
@ -95,12 +106,29 @@ impl MonitoringImporter {
|
||||||
ts : String::new(),
|
ts : String::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Function that checks is current `MonitoringImporter` valid
|
||||||
|
/// and can be used to pull and push info to and from CM
|
||||||
|
///
|
||||||
async fn is_valid(&self) -> bool {
|
async fn is_valid(&self) -> bool {
|
||||||
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
||||||
}
|
}
|
||||||
|
/// A setter of `timestamp`
|
||||||
|
///
|
||||||
|
/// This function is needed to set a `timestamp` after
|
||||||
|
/// CM session creation.
|
||||||
|
///
|
||||||
|
/// This `timestamp` is a date of creation a session
|
||||||
|
/// on the CM Server
|
||||||
async fn set_ts(&mut self, ts: &str) {
|
async fn set_ts(&mut self, ts: &str) {
|
||||||
self.ts = ts.to_owned();
|
self.ts = ts.to_owned();
|
||||||
}
|
}
|
||||||
|
/// A function for creation CM session
|
||||||
|
///
|
||||||
|
/// Returns OK(()) if session was created and there were
|
||||||
|
/// no errors (neither internal no external)
|
||||||
|
///
|
||||||
|
/// *Also* it saves ts and access-key in it's runtime environment,
|
||||||
|
/// there's no way to get access-key of session
|
||||||
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
||||||
if !self.is_valid().await {
|
if !self.is_valid().await {
|
||||||
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
||||||
|
|
@ -140,7 +168,6 @@ impl MonitoringImporter {
|
||||||
let cls = measure.get("cls");
|
let cls = measure.get("cls");
|
||||||
let name = measure.get("name");
|
let name = measure.get("name");
|
||||||
if id.is_some() && cls.is_some() {
|
if id.is_some() && cls.is_some() {
|
||||||
// todo: later wait for Vaitaliy call of classification
|
|
||||||
let id = id.unwrap().as_i64().unwrap_or_default();
|
let id = id.unwrap().as_i64().unwrap_or_default();
|
||||||
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
||||||
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
|
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
|
||||||
|
|
@ -278,4 +305,16 @@ impl MonitoringImporter {
|
||||||
value : val.clone()
|
value : val.clone()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for MonitoringImporter {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("MonitoringImporter")
|
||||||
|
.field("ip", &self.ip)
|
||||||
|
.field("login", &self.login)
|
||||||
|
.field("password", &"****")
|
||||||
|
.field("access_key", &"HIDDEN")
|
||||||
|
.field("ts", &self.ts)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue