rc #12
|
|
@ -15,6 +15,20 @@ const CONFIG_PATH: &str = "config_api.json";
|
||||||
const SOCKET_PATH: &str = "api-grub.sock";
|
const SOCKET_PATH: &str = "api-grub.sock";
|
||||||
|
|
||||||
// TODO: rewrite to use current_exe
|
// TODO: rewrite to use current_exe
|
||||||
|
/// # Fn `pull_local_config`
|
||||||
|
///
|
||||||
|
/// ## function to one-time pulling local config by straight reading
|
||||||
|
///
|
||||||
|
/// ### Dev-Info :
|
||||||
|
///
|
||||||
|
/// *input* : -
|
||||||
|
///
|
||||||
|
/// *output* : `anyhow::Result<integr_structs::api::v3::Config>`
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `main`
|
||||||
|
///
|
||||||
|
/// *managing* : -
|
||||||
|
///
|
||||||
pub async fn pull_local_config() -> Result<Config> {
|
pub async fn pull_local_config() -> Result<Config> {
|
||||||
let path = Path::new(CONFIG_PATH);
|
let path = Path::new(CONFIG_PATH);
|
||||||
if path.exists() && path.is_file() {
|
if path.exists() && path.is_file() {
|
||||||
|
|
@ -28,6 +42,22 @@ pub async fn pull_local_config() -> Result<Config> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// for config pulling
|
// for config pulling
|
||||||
|
/// # Fn `pull_local_config`
|
||||||
|
///
|
||||||
|
/// ## function to init Unix-Socket listening for grabbing new configs
|
||||||
|
///
|
||||||
|
/// ### Dev-Info :
|
||||||
|
///
|
||||||
|
/// *input* : `&tokio::sync::mpsc::Sender<integr_structs::api::v3::Config>`
|
||||||
|
///
|
||||||
|
/// *output* : `anyhow::Result<()>`
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `main`
|
||||||
|
///
|
||||||
|
/// *managing* : -
|
||||||
|
///
|
||||||
|
/// *depends on* : `const SOCKET_PATH`
|
||||||
|
///
|
||||||
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
||||||
info!("Initializing Unix-Socket listening for pulling new configs...");
|
info!("Initializing Unix-Socket listening for pulling new configs...");
|
||||||
let server = init_unix_listener().await?;
|
let server = init_unix_listener().await?;
|
||||||
|
|
@ -57,12 +87,43 @@ pub async fn init_config_grub_mechanism(tx: &Sender<Config>) -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// saving new config locally
|
// saving new config locally
|
||||||
|
/// # Fn `save_new_config`
|
||||||
|
///
|
||||||
|
/// ## function for saving new config locally
|
||||||
|
///
|
||||||
|
/// ### Dev-Info :
|
||||||
|
///
|
||||||
|
/// *input* : `&String | &str`
|
||||||
|
///
|
||||||
|
/// *output* : `anyhow::Result<()>`
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `main`
|
||||||
|
///
|
||||||
|
/// *managing* : -
|
||||||
|
///
|
||||||
|
/// *depends on* : `const CONFIG_PATH`
|
||||||
|
///
|
||||||
async fn save_new_config(config: &String) -> Result<()> {
|
async fn save_new_config(config: &String) -> Result<()> {
|
||||||
fs::write(CONFIG_PATH, config)?;
|
fs::write(CONFIG_PATH, config)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// # Fn `pull_local_config`
|
||||||
|
///
|
||||||
|
/// ## function for saving new config locally
|
||||||
|
///
|
||||||
|
/// ### Dev-Info :
|
||||||
|
///
|
||||||
|
/// *input* : `&tokio::sync::mpsc::Sender<Config>`
|
||||||
|
///
|
||||||
|
/// *output* : `anyhow::Result<tokio::net::UnixListener>`
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `init_config_grub_mechanism`
|
||||||
|
///
|
||||||
|
/// *managing* : -
|
||||||
|
///
|
||||||
|
/// *depends on* : `const SOCKET_PATH`
|
||||||
|
///
|
||||||
async fn init_unix_listener() -> Result<UnixListener> {
|
async fn init_unix_listener() -> Result<UnixListener> {
|
||||||
let _ = fs::remove_file(SOCKET_PATH);
|
let _ = fs::remove_file(SOCKET_PATH);
|
||||||
Ok(UnixListener::bind(SOCKET_PATH)?)
|
Ok(UnixListener::bind(SOCKET_PATH)?)
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,45 @@ use reqwest::Client;
|
||||||
use tokio_postgres::NoTls;
|
use tokio_postgres::NoTls;
|
||||||
use std::env;
|
use std::env;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::{info, error};
|
use log::{debug, error, info};
|
||||||
|
use std::ops::Drop;
|
||||||
|
|
||||||
|
/// An entity which handles DB connections.
|
||||||
|
///
|
||||||
|
/// This struct is used to be a layer between API-grab workers
|
||||||
|
/// and the `pool` of DB connections to save grabbing processes statuses
|
||||||
|
/// (now `PostgreSQL`, slowly migrating to `ClickHouse` support)
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use api-grub::export::Expoter;
|
||||||
|
///
|
||||||
|
/// let exporter = Expoter::init();
|
||||||
|
///
|
||||||
|
/// assert!(exporter.get_connection_from_pool().is_some());
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// # Hint:
|
||||||
|
///
|
||||||
|
/// - use `export_data` method to export metrics to DB
|
||||||
|
///
|
||||||
|
/// - use `export_metrics` method to export metrics (`PrometehusMetrics`
|
||||||
|
/// type) to the Prometehus exporter (`$EXPORTER_URL`)
|
||||||
|
///
|
||||||
|
/// - use `export_metrics_extended` method to export metrics (
|
||||||
|
/// `*PrometehusMetricsExtended*` type with `desc` field) to the
|
||||||
|
/// Prometehus exporter (`$EXPORTER_URL`)
|
||||||
pub struct Exporter {
|
pub struct Exporter {
|
||||||
pool : Option<Pool>,
|
pool : Option<Pool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Exporter {
|
impl Exporter {
|
||||||
|
/// Fills `deadpool_postgres::Config` object with values from ENV VARS:
|
||||||
|
/// - `DB_HOST`
|
||||||
|
/// - `DB_DBNAME`
|
||||||
|
/// - `DB_USER`
|
||||||
|
/// - `DB_PASSWORD`
|
||||||
fn config_construct() -> Result<Config> {
|
fn config_construct() -> Result<Config> {
|
||||||
let mut cfg = Config::new();
|
let mut cfg = Config::new();
|
||||||
cfg.host = Some(env::var("DB_HOST")?);
|
cfg.host = Some(env::var("DB_HOST")?);
|
||||||
|
|
@ -19,6 +51,9 @@ impl Exporter {
|
||||||
cfg.password = Some(env::var("DB_PASSWORD")?);
|
cfg.password = Some(env::var("DB_PASSWORD")?);
|
||||||
Ok(cfg)
|
Ok(cfg)
|
||||||
}
|
}
|
||||||
|
/// Uses `deadpool_postgres::Config` object to create DB connections
|
||||||
|
/// pool to share between async tasks and to restrict a count of parallel
|
||||||
|
/// connections
|
||||||
fn pool_construct() -> Option<Pool> {
|
fn pool_construct() -> Option<Pool> {
|
||||||
return match Self::config_construct() {
|
return match Self::config_construct() {
|
||||||
Ok(config) => {
|
Ok(config) => {
|
||||||
|
|
@ -34,6 +69,7 @@ impl Exporter {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Checks if DB connections pool is empty
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
pub fn is_no_connection(&self) -> bool { self.pool.is_none() }
|
||||||
pub fn init() -> Self {
|
pub fn init() -> Self {
|
||||||
|
|
@ -41,6 +77,9 @@ impl Exporter {
|
||||||
pool : Self::pool_construct(),
|
pool : Self::pool_construct(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Shares a connection `deadpool_postgres::Client as PgClient`
|
||||||
|
///
|
||||||
|
/// Function awaits til the moment it can return `Option<deadpool_postgres::Client as PgClient>`
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
pub async fn get_connection_from_pool(&self) -> Option<PgClient> {
|
||||||
if let Some(pool) = &self.pool {
|
if let Some(pool) = &self.pool {
|
||||||
|
|
@ -48,15 +87,20 @@ impl Exporter {
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
/// Exports data in `&str` jsonb format to DB using connection from the pool
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||||
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||||
let _ = client.query(&query, &[&metrics]).await?;
|
let _ = client.query(&query, &[&metrics]).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
/// Exports metrics in `PrometheusMetrics` format to Exporter defined
|
||||||
|
/// as env var $EXORPTER_URL
|
||||||
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
|
|
||||||
|
debug!("Exporting: {:?}", &metrics);
|
||||||
|
|
||||||
let req = Client::new()
|
let req = Client::new()
|
||||||
.post(url)
|
.post(url)
|
||||||
.json(&metrics)
|
.json(&metrics)
|
||||||
|
|
@ -65,8 +109,13 @@ impl Exporter {
|
||||||
req?;
|
req?;
|
||||||
Ok(metrics.get_bytes_len())
|
Ok(metrics.get_bytes_len())
|
||||||
}
|
}
|
||||||
|
/// Exports metrics in `PrometheusMetricsExtended` format to Exporter defined
|
||||||
|
/// as env var $EXORPTER_URL
|
||||||
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
|
pub async fn export_extended_metrics(metrics: PrometheusMetricsExtended) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
|
|
||||||
|
debug!("Exporting: {:?}", &metrics);
|
||||||
|
|
||||||
let req = Client::new()
|
let req = Client::new()
|
||||||
.post(url)
|
.post(url)
|
||||||
.json(&metrics)
|
.json(&metrics)
|
||||||
|
|
@ -76,3 +125,10 @@ impl Exporter {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for Exporter {
|
||||||
|
// Custom destructor to log deinitializing of the `Exporter`
|
||||||
|
fn drop(&mut self) {
|
||||||
|
info!("Deinitializng Exporter and DB connection pool ...")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -2,6 +2,21 @@
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
use integr_structs::api::v3::{Metric, MetricOutput};
|
use integr_structs::api::v3::{Metric, MetricOutput};
|
||||||
|
|
||||||
|
/// A JSON-parser struct
|
||||||
|
///
|
||||||
|
/// Using in metric extracting from Server Response
|
||||||
|
/// with metrics mechanism
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use api-grub::json::JsonParser;
|
||||||
|
/// use use integr_structs::api::v3::Metric;
|
||||||
|
///
|
||||||
|
/// let json = b""flat1" : { "room1" : { "rt_tempo" : "+16" }}".to_vec();
|
||||||
|
///
|
||||||
|
/// assert!(!JsonParser::parse(vec![Metric::template()], json).is_empty());
|
||||||
|
/// ```
|
||||||
pub struct JsonParser;
|
pub struct JsonParser;
|
||||||
|
|
||||||
impl JsonParser {
|
impl JsonParser {
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,22 @@ use std::io::Write;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::info;
|
use log::info;
|
||||||
|
|
||||||
|
/// # Fn `setup_logger`
|
||||||
|
///
|
||||||
|
/// ## function to init terminal logger
|
||||||
|
///
|
||||||
|
/// ### Dev-Info :
|
||||||
|
///
|
||||||
|
/// *input* : -
|
||||||
|
///
|
||||||
|
/// *output* : `anyhow::Result<()>`
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `main`
|
||||||
|
///
|
||||||
|
/// *managing* : -
|
||||||
|
///
|
||||||
|
/// *depends on* : -
|
||||||
|
///
|
||||||
pub async fn setup_logger() -> Result<()> {
|
pub async fn setup_logger() -> Result<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.format(move |buf, record| {
|
.format(move |buf, record| {
|
||||||
|
|
@ -17,7 +33,7 @@ pub async fn setup_logger() -> Result<()> {
|
||||||
record.args(),
|
record.args(),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.filter(None, LevelFilter::Info)
|
.filter(None, LevelFilter::Debug)
|
||||||
.target(env_logger::Target::Stdout)
|
.target(env_logger::Target::Stdout)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,8 @@ mod export;
|
||||||
mod monitoring;
|
mod monitoring;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
// use integr_structs::api::ApiConfigV2;
|
|
||||||
use integr_structs::api::v3::Config;
|
use integr_structs::api::v3::Config;
|
||||||
use logger::setup_logger;
|
use logger::setup_logger;
|
||||||
// use log::{info, warn};
|
|
||||||
use config::{pull_local_config, init_config_grub_mechanism};
|
use config::{pull_local_config, init_config_grub_mechanism};
|
||||||
use net::init_api_grub_mechanism;
|
use net::init_api_grub_mechanism;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
@ -23,10 +21,6 @@ async fn main() -> Result<()>{
|
||||||
let config = get_config().await;
|
let config = get_config().await;
|
||||||
// config update channel
|
// config update channel
|
||||||
let (tx, mut rx) = mpsc::channel::<Config>(1);
|
let (tx, mut rx) = mpsc::channel::<Config>(1);
|
||||||
// futures
|
|
||||||
// todo : rewrite with spawn
|
|
||||||
// let config_fut = init_config_grub_mechanism(&tx);
|
|
||||||
// let grub_fut = init_api_grub_mechanism(config, &mut rx);
|
|
||||||
|
|
||||||
let event_config = tokio::spawn(async move {
|
let event_config = tokio::spawn(async move {
|
||||||
match init_config_grub_mechanism(&tx).await {
|
match init_config_grub_mechanism(&tx).await {
|
||||||
|
|
@ -63,7 +57,6 @@ async fn main() -> Result<()>{
|
||||||
for event in events_handler {
|
for event in events_handler {
|
||||||
let _ = event.await;
|
let _ = event.await;
|
||||||
}
|
}
|
||||||
// let _ = tokio::join!(config_fut, grub_fut);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,31 @@ use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
/// # Fn `get_metrics_from_monitoring`
|
||||||
|
///
|
||||||
|
/// A function to init pulling and exporting metrics mechanism
|
||||||
|
/// from CM to Exporter. It spawns async tasks to get measures
|
||||||
|
/// and their values and then extract needed info to export
|
||||||
|
///
|
||||||
|
/// ### Dev-Info :
|
||||||
|
///
|
||||||
|
/// *input* : duration and delay as `usize` (in secs)
|
||||||
|
///
|
||||||
|
/// *output* : `anyhow::Result<()>`
|
||||||
|
///
|
||||||
|
/// *initiator* : fn `main`
|
||||||
|
///
|
||||||
|
/// *managing* : runtime of N async tasks (N - count of chunks)
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use api-grub::monitoring::get_metrics_from_monitoring;
|
||||||
|
///
|
||||||
|
/// // exec func without time restriction but with delay in 5 secs
|
||||||
|
/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(()));
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
||||||
let timer = tokio::time::Instant::now();
|
let timer = tokio::time::Instant::now();
|
||||||
'outer: loop {
|
'outer: loop {
|
||||||
|
|
@ -34,7 +59,24 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
/// An entity which handle CM creds
|
||||||
|
///
|
||||||
|
/// Used to capture measures and there values, to preprocess all measures to
|
||||||
|
/// relevant Exporter's structure
|
||||||
|
///
|
||||||
|
/// # Example:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use api-grub::monitoring::MonitoringImporter;
|
||||||
|
///
|
||||||
|
/// let mut a = MonitoringImporter::new().await;
|
||||||
|
/// a.start_session().await?;
|
||||||
|
/// let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
|
||||||
|
///
|
||||||
|
/// assert_eq!(a.get_measure_info(vec.clone()).await, Ok(()));
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct MonitoringImporter {
|
pub struct MonitoringImporter {
|
||||||
ip : String,
|
ip : String,
|
||||||
login : String,
|
login : String,
|
||||||
|
|
@ -44,6 +86,17 @@ pub struct MonitoringImporter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MonitoringImporter {
|
impl MonitoringImporter {
|
||||||
|
/// The most simple constructor for `MonitoringImporter`
|
||||||
|
///
|
||||||
|
/// Returns `Self` object that is constructing according to
|
||||||
|
/// env vars:
|
||||||
|
/// - `ENODE_MONITORING_IP`
|
||||||
|
/// - `ENODE_MONITORING_LOGIN`
|
||||||
|
/// - `ENODE_MONITORING_PASSWORD`
|
||||||
|
///
|
||||||
|
/// If env vars will not be set, it returns `Self` with
|
||||||
|
/// empty fields
|
||||||
|
///
|
||||||
pub async fn new() -> Self {
|
pub async fn new() -> Self {
|
||||||
MonitoringImporter {
|
MonitoringImporter {
|
||||||
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
|
ip : env::var("ENODE_MONITORING_IP").unwrap_or_else(|_| String::new()),
|
||||||
|
|
@ -53,12 +106,29 @@ impl MonitoringImporter {
|
||||||
ts : String::new(),
|
ts : String::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Function that checks is current `MonitoringImporter` valid
|
||||||
|
/// and can be used to pull and push info to and from CM
|
||||||
|
///
|
||||||
async fn is_valid(&self) -> bool {
|
async fn is_valid(&self) -> bool {
|
||||||
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
!self.ip.is_empty() && !self.login.is_empty() && !self.password.is_empty()
|
||||||
}
|
}
|
||||||
|
/// A setter of `timestamp`
|
||||||
|
///
|
||||||
|
/// This function is needed to set a `timestamp` after
|
||||||
|
/// CM session creation.
|
||||||
|
///
|
||||||
|
/// This `timestamp` is a date of creation a session
|
||||||
|
/// on the CM Server
|
||||||
async fn set_ts(&mut self, ts: &str) {
|
async fn set_ts(&mut self, ts: &str) {
|
||||||
self.ts = ts.to_owned();
|
self.ts = ts.to_owned();
|
||||||
}
|
}
|
||||||
|
/// A function for creation CM session
|
||||||
|
///
|
||||||
|
/// Returns OK(()) if session was created and there were
|
||||||
|
/// no errors (neither internal no external)
|
||||||
|
///
|
||||||
|
/// *Also* it saves ts and access-key in it's runtime environment,
|
||||||
|
/// there's no way to get access-key of session
|
||||||
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
||||||
if !self.is_valid().await {
|
if !self.is_valid().await {
|
||||||
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
||||||
|
|
@ -81,6 +151,12 @@ impl MonitoringImporter {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A function for pulling measures list
|
||||||
|
///
|
||||||
|
/// Used with actual credentials for current CM session
|
||||||
|
/// and returning measures in format of `Ok(Vec<(String, String)>)`
|
||||||
|
/// , where `(String, String)` is a tuple of measure `id` and `description`
|
||||||
|
/// (`name`)
|
||||||
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let mut vec: Vec<(String, String)> = Vec::new();
|
let mut vec: Vec<(String, String)> = Vec::new();
|
||||||
|
|
@ -98,7 +174,6 @@ impl MonitoringImporter {
|
||||||
let cls = measure.get("cls");
|
let cls = measure.get("cls");
|
||||||
let name = measure.get("name");
|
let name = measure.get("name");
|
||||||
if id.is_some() && cls.is_some() {
|
if id.is_some() && cls.is_some() {
|
||||||
// todo: later wait for Vaitaliy call of classification
|
|
||||||
let id = id.unwrap().as_i64().unwrap_or_default();
|
let id = id.unwrap().as_i64().unwrap_or_default();
|
||||||
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
let cls = cls.unwrap().as_str().unwrap_or_else(|| "");
|
||||||
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
|
let name = name.unwrap_or_else(|| &Value::Null).as_str().unwrap_or_else(|| "null");
|
||||||
|
|
@ -114,6 +189,22 @@ impl MonitoringImporter {
|
||||||
info!("List of measures was pulled, total - {}", &vec.len());
|
info!("List of measures was pulled, total - {}", &vec.len());
|
||||||
Ok(vec)
|
Ok(vec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A function to get realtime data
|
||||||
|
///
|
||||||
|
/// It pulles info about 1 measure or a slice of measures and
|
||||||
|
/// exports all data to Prometehus exporter
|
||||||
|
///
|
||||||
|
/// # How it works
|
||||||
|
/// 1) creates a restriction for max count of async
|
||||||
|
/// tasks (`tokio::sync::Semaphore`)
|
||||||
|
///
|
||||||
|
/// 2) divides vec of measures in case of creating chunks with
|
||||||
|
/// the most optimal sizes to optimize self and server load
|
||||||
|
///
|
||||||
|
/// 3) spawns async tasks-grabbers to get measures info which
|
||||||
|
/// exprots all data by itselfs
|
||||||
|
///
|
||||||
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
|
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
|
||||||
let mut sys = sysinfo::System::new();
|
let mut sys = sysinfo::System::new();
|
||||||
sys.refresh_cpu_all();
|
sys.refresh_cpu_all();
|
||||||
|
|
@ -156,6 +247,17 @@ impl MonitoringImporter {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An async task-grabber
|
||||||
|
///
|
||||||
|
/// Used to create request to the CM server and
|
||||||
|
/// get all measure(s) data
|
||||||
|
///
|
||||||
|
/// # Also
|
||||||
|
/// An argument `measure: Arc<String>` can be a single measure like `measure$1` or
|
||||||
|
/// a slice of measures in special format `%5B%22measure$1%22,%20%22measure$2%22%5D`.
|
||||||
|
/// This is a neccesary measure to handle two types of requests and URL restrictions
|
||||||
|
///
|
||||||
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
|
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
|
||||||
let resp = client
|
let resp = client
|
||||||
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
||||||
|
|
@ -170,6 +272,21 @@ impl MonitoringImporter {
|
||||||
PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
|
PrometheusMetricsExtended::new_zvks(Self::extract_metric_data(resp, hm).await?).await
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An recursive extractor of data
|
||||||
|
///
|
||||||
|
/// Uses target-json CM-Server response as `Value` and HashMap of
|
||||||
|
/// measures' `id`s and their appropriate `description`s
|
||||||
|
///
|
||||||
|
/// # How it works
|
||||||
|
/// 1) if `Value` is an `Object` -> executes `Self::process_value` on it and
|
||||||
|
/// returns result of the function as `Vec`
|
||||||
|
///
|
||||||
|
/// 2) if `Value` is an `Array` -> self-executes for each pat of the array
|
||||||
|
/// and aggregates all data in the `Vec` by using `.append(&mut Vec<...>)`
|
||||||
|
///
|
||||||
|
/// 3) if `Value` is `_` -> returns error **Invalid JSON format**
|
||||||
|
///
|
||||||
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
fn extract_metric_data(json: Value, hm: &HashMap<String, String>) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<MetricOutputExtended>>> + Send + '_>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
return match json {
|
return match json {
|
||||||
|
|
@ -190,6 +307,11 @@ impl MonitoringImporter {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A function-extractor for single measure object
|
||||||
|
///
|
||||||
|
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
|
||||||
|
/// object
|
||||||
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
||||||
let id = obj.get("$id");
|
let id = obj.get("$id");
|
||||||
let val = obj.get("value");
|
let val = obj.get("value");
|
||||||
|
|
@ -237,3 +359,15 @@ impl MonitoringImporter {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for MonitoringImporter {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("MonitoringImporter")
|
||||||
|
.field("ip", &self.ip)
|
||||||
|
.field("login", &self.login)
|
||||||
|
.field("password", &"****")
|
||||||
|
.field("access_key", &"HIDDEN")
|
||||||
|
.field("ts", &self.ts)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -160,6 +160,16 @@ pub mod v3 {
|
||||||
pub json_type : String,
|
pub json_type : String,
|
||||||
pub addr : String,
|
pub addr : String,
|
||||||
}
|
}
|
||||||
|
impl Metric {
|
||||||
|
pub fn template() -> Self {
|
||||||
|
Self {
|
||||||
|
id : "room_tempo".to_string(),
|
||||||
|
json_type : "String".to_string(),
|
||||||
|
addr : "flat1.room1.rt_tempo".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct Metrics {
|
pub struct Metrics {
|
||||||
pub name : String,
|
pub name : String,
|
||||||
|
|
@ -411,7 +421,6 @@ pub mod enode_monitoring {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// "{\"access_token\":\"5BNQsmiGFQRNA651HeQxZekYgYUAWZ4e\",\"role\":\"administrator\",\"startRT\":\"2025-02-25T09:03:27.581Z\",\"login\":\"admin\",\"push_active\":null,\"$id\":\"systemuser$1\"}",
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
pub struct AuthResponse {
|
pub struct AuthResponse {
|
||||||
pub access_token : String,
|
pub access_token : String,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue