rc #17

Merged
deployer3000 merged 4 commits from rc into master 2025-03-11 13:04:26 +03:00
9 changed files with 113 additions and 49 deletions

View File

@ -12,4 +12,8 @@ EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
# eNODE.Monitoring configuration # eNODE.Monitoring configuration
ENODE_MONITORING_IP = "ip.ip.ip.ip" ENODE_MONITORING_IP = "ip.ip.ip.ip"
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
# IM configuration for max level of logging info
# for example DEBUG, INFO, WARN, ERROR, TRACE
IM_LOG_INFO = "INFO"

25
Jenkinsfile vendored
View File

@ -1,3 +1,24 @@
def notify(
String context,
String giteaUser,
String giteaPass,
String repositoryUrl,
String repositoryName,
String commitHash,
String buildStatus
) {
def status = buildStatus == 'success' ? 'success' : 'failure'
def description = buildStatus == 'success' ? 'Build succeeded' : 'Build failed'
sh """
curl -X POST \
-u "${giteaUser}:${giteaPass}" \
-H "Content-Type: application/json" \
-d '{"context":"${context}","state": "${status}", "description": "${description}"}' \
${repositoryUrl}deployer3000/${repositoryName}/statuses/${commitHash}
"""
}
pipeline { pipeline {
agent any agent any
environment { environment {
@ -66,6 +87,10 @@ pipeline {
http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge
""" """
echo "PR ${prId} merged successfully into master!" echo "PR ${prId} merged successfully into master!"
def context = "test-org/integration-module/pipeline/pr-${env.CHANGE_TARGET}"
def commitHash = sh(script: "git rev-parse HEAD~1", returnStdout: true).trim()
notify(context, GITEA_USER, GITEA_PASS, env.GITEA_REPOSITORY_URL, "integration-module", commitHash, "success")
} }
} }
} }

View File

@ -8,8 +8,6 @@ serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.135" serde_json = "1.0.135"
tokio = { version = "1.43.0", features = ["full"] } tokio = { version = "1.43.0", features = ["full"] }
integr-structs = {path = "../integr-structs"} integr-structs = {path = "../integr-structs"}
env_logger = "0.11.6"
log = "0.4.25"
anyhow = "1.0.95" anyhow = "1.0.95"
chrono = "0.4.39" chrono = "0.4.39"
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] } reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
@ -20,3 +18,5 @@ md5 = "0.7.0"
rand = "0.9.0" rand = "0.9.0"
sysinfo = "0.33.1" sysinfo = "0.33.1"
openssl = { version = "0.10", features = ["vendored"] } openssl = { version = "0.10", features = ["vendored"] }
tracing-subscriber = "0.3.19"
tracing = "0.1.41"

View File

@ -2,7 +2,7 @@
// 1) check changes in unix-socket // 1) check changes in unix-socket
// 2) save changes in local config file // 2) save changes in local config file
use anyhow::{Error, Ok, Result}; use anyhow::{Error, Ok, Result};
use log::{info, warn, error}; use tracing::{info, warn, error};
use std::{fs, path::Path}; use std::{fs, path::Path};
use serde_json::from_str; use serde_json::from_str;
use tokio::{io::AsyncReadExt, net::UnixListener}; use tokio::{io::AsyncReadExt, net::UnixListener};

View File

@ -4,7 +4,7 @@ use reqwest::Client;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use std::env; use std::env;
use anyhow::Result; use anyhow::Result;
use log::{debug, error, info}; use tracing::{debug, error, info};
use std::ops::Drop; use std::ops::Drop;
/// An entity which handles DB connections. /// An entity which handles DB connections.
@ -89,6 +89,7 @@ impl Exporter {
} }
/// Exports data in `&str` jsonb format to DB using connection from the pool /// Exports data in `&str` jsonb format to DB using connection from the pool
#[allow(unused)] #[allow(unused)]
#[tracing::instrument(name = "PostgreSQL export")]
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> { pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?; let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
let _ = client.query(&query, &[&metrics]).await?; let _ = client.query(&query, &[&metrics]).await?;
@ -96,6 +97,7 @@ impl Exporter {
} }
/// Exports metrics in `PrometheusMetrics` format to Exporter defined /// Exports metrics in `PrometheusMetrics` format to Exporter defined
/// as env var $EXORPTER_URL /// as env var $EXORPTER_URL
#[tracing::instrument(name = "Prometheus export")]
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> { pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
let url = env::var("EXPORTER_URL")?; let url = env::var("EXPORTER_URL")?;

View File

@ -1,9 +1,8 @@
use std::str::FromStr;
use chrono::Local; use chrono::Local;
use env_logger::Builder;
use log::LevelFilter;
use std::io::Write;
use anyhow::Result; use anyhow::Result;
use log::info; use tracing::info;
/// # Fn `setup_logger` /// # Fn `setup_logger`
/// ///
@ -22,20 +21,34 @@ use log::info;
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn setup_logger() -> Result<()> { pub async fn setup_logger() -> Result<()> {
Builder::new() // Builder::new()
.format(move |buf, record| { // .format(move |buf, record| {
writeln!( // writeln!(
buf, // buf,
"|{}| {} [{}] - {}", // "|{}| {} [{}] - {}",
"api-grubber", // "api-grubber",
Local::now().format("%d-%m-%Y %H:%M:%S"), // Local::now().format("%d-%m-%Y %H:%M:%S"),
record.level(), // record.level(),
record.args(), // record.args(),
) // )
}) // })
.filter(None, LevelFilter::Info) // .filter(None, LevelFilter::Info)
.target(env_logger::Target::Stdout) // .target(env_logger::Target::Stdout)
.init(); // .init();
let log_level = std::env::var("IM_LOG_INFO").unwrap_or_else(|_| String::from("INFO"));
tracing_subscriber::fmt()
.with_max_level(
tracing::Level::from_str(&log_level)
.unwrap_or_else(|_| tracing::Level::INFO))
.with_writer(std::io::stdout)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW)
// .with_timer(Local::now().format("%d-%m-%Y %H:%M:%S"))
.with_line_number(false)
.with_target(false)
.with_file(false)
.compact()
.init();
info!("Logger configured"); info!("Logger configured");
Ok(()) Ok(())
@ -45,22 +58,17 @@ pub async fn setup_logger() -> Result<()> {
#[cfg(test)] #[cfg(test)]
mod logger_unittests { mod logger_unittests {
use tokio::test; use tokio::test;
use super::*;
#[test] #[test]
async fn check_logger_builder() { async fn check_logger_builder() {
Builder::new() tracing_subscriber::fmt()
.format(move |buf, record| { .with_max_level(tracing::Level::INFO)
writeln!( .with_test_writer()
buf, .with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW)
"|{}| {} [{}] - {}", // .with_timer(Local::now().format("%d-%m-%Y %H:%M:%S"))
"api-grubber", .with_line_number(false)
Local::now().format("%d-%m-%Y %H:%M:%S"), .with_target(false)
record.level(), .with_file(false)
record.args(), .compact()
)
})
.filter(None, LevelFilter::Info)
.target(env_logger::Target::Stdout)
.init(); .init();
} }
} }

View File

@ -11,7 +11,7 @@ use logger::setup_logger;
use config::{pull_local_config, init_config_grub_mechanism}; use config::{pull_local_config, init_config_grub_mechanism};
use net::init_api_grub_mechanism; use net::init_api_grub_mechanism;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use log::{error, info, warn}; use tracing::{error, info, warn};
use monitoring::get_metrics_from_monitoring; use monitoring::get_metrics_from_monitoring;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]

View File

@ -10,7 +10,7 @@ use tokio::task::JoinHandle;
use std::pin::Pin; use std::pin::Pin;
use std::future::Future; use std::future::Future;
use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended}; use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
use log::{error, info, warn}; use tracing::{error, info, warn};
use std::collections::HashMap; use std::collections::HashMap;
/// # Fn `get_metrics_from_monitoring` /// # Fn `get_metrics_from_monitoring`
@ -38,15 +38,18 @@ use std::collections::HashMap;
/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(())); /// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(()));
/// ``` /// ```
/// ///
#[tracing::instrument(name = "CM mechanism", skip_all)]
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> { pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
let timer = tokio::time::Instant::now(); let timer = tokio::time::Instant::now();
let mut a = MonitoringImporter::new().await; let mut a = MonitoringImporter::new().await;
'outer: loop { 'outer: loop {
// let mut a = MonitoringImporter::new().await; // let mut a = MonitoringImporter::new().await;
a.start_session().await?; a.start_session().await?;
info!("Started a new CM session"); tracing::debug!("CM creds struct - {:#?}", a);
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![])); let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
tracing::debug!("Measures Vec - {:#?}", vec);
'inner: loop { 'inner: loop {
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) { if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
@ -63,6 +66,7 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
} }
} }
Ok(()) Ok(())
} }
@ -136,6 +140,7 @@ impl MonitoringImporter {
/// ///
/// *Also* it saves ts and access-key in it's runtime environment, /// *Also* it saves ts and access-key in it's runtime environment,
/// there's no way to get access-key of session /// there's no way to get access-key of session
#[tracing::instrument(name = "CM-session mechanism", skip_all)]
pub async fn start_session(&mut self) -> anyhow::Result<()> { pub async fn start_session(&mut self) -> anyhow::Result<()> {
if !self.is_valid().await { if !self.is_valid().await {
return Err(Error::msg("Invalid eNODE-Monitoring configuration")); return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
@ -143,18 +148,32 @@ impl MonitoringImporter {
let client = Client::new(); let client = Client::new();
let url = format!("http://{}/e-data-front/auth/login", self.ip); let url = format!("http://{}/e-data-front/auth/login", self.ip);
let fortoken = ForTokenCredentials::new(&self.login, &self.password); let fortoken = ForTokenCredentials::new(&self.login, &self.password);
let mut delay = 1;
let client = client loop {
.post(url) let client = client
.post(&url)
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.json(&fortoken); .json(&fortoken);
let resp = client.send().await?; // let resp = client.send().await?;
let auth = resp.json::<AuthResponse>().await?; if let Ok(resp) = client.send().await {
// let auth = resp.json::<AuthResponse>().await?;
self.set_ts(&fortoken.ts).await;
self.access_token = auth.access_token.to_owned();
match resp.json::<AuthResponse>().await {
Ok(auth) => {
self.set_ts(&fortoken.ts).await;
self.access_token = auth.access_token.to_owned();
tracing::trace!("Access key was changed");
break;
},
Err(er) => error!("Error with extracting access-key from CM response due to {}", er),
}
}
error!("Error while trying to create a new session, waiting {} secs and retrying ...", delay);
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
delay = delay * 2;
}
info!("Started a new CM session");
Ok(()) Ok(())
} }
@ -164,7 +183,9 @@ impl MonitoringImporter {
/// and returning measures in format of `Ok(Vec<(String, String)>)` /// and returning measures in format of `Ok(Vec<(String, String)>)`
/// , where `(String, String)` is a tuple of measure `id` and `description` /// , where `(String, String)` is a tuple of measure `id` and `description`
/// (`name`) /// (`name`)
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> { pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
tracing::trace!("Trying ti get measures list from CM ...");
let client = Client::new(); let client = Client::new();
let mut vec: Vec<(String, String)> = Vec::new(); let mut vec: Vec<(String, String)> = Vec::new();
let url = format!("http://{}/e-cmdb/api/query", self.ip); let url = format!("http://{}/e-cmdb/api/query", self.ip);
@ -212,7 +233,9 @@ impl MonitoringImporter {
/// 3) spawns async tasks-grabbers to get measures info which /// 3) spawns async tasks-grabbers to get measures info which
/// exprots all data by itselfs /// exprots all data by itselfs
/// ///
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> { pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
tracing::trace!("Trying ti get info about each measure ...");
let mut sys = sysinfo::System::new(); let mut sys = sysinfo::System::new();
sys.refresh_cpu_all(); sys.refresh_cpu_all();
// adaptive permition on task spawm to prevent system overload // adaptive permition on task spawm to prevent system overload
@ -267,6 +290,7 @@ impl MonitoringImporter {
/// This is a neccesary measure to handle two types of requests and URL restrictions /// This is a neccesary measure to handle two types of requests and URL restrictions
/// ///
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> { async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
tracing::trace!("Processing CM endpoint with one or more measure names");
let resp = client let resp = client
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure)) .get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
@ -321,6 +345,7 @@ impl MonitoringImporter {
/// Searches for certain fields and aggregates it in the `MetricOutputExtended` /// Searches for certain fields and aggregates it in the `MetricOutputExtended`
/// object /// object
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> { async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
tracing::trace!("Processing atomic Object value in CM JSON-response");
let id = obj.get("$id"); let id = obj.get("$id");
let val = obj.get("value"); let val = obj.get("value");
let description = { let description = {

View File

@ -1,7 +1,7 @@
// module to handle unix-socket connection + pulling info from api // module to handle unix-socket connection + pulling info from api
use anyhow::Result; use anyhow::Result;
use log::{error, info}; use tracing::{error, info};
use rand::random; use rand::random;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};