Compare commits
No commits in common. "2847a5a1e9f2637e97bd00a3ef0aa1605e589ecd" and "9d2e8100dfd006795b7b9316ebcf0b93a07deab0" have entirely different histories.
2847a5a1e9
...
9d2e8100df
|
|
@ -13,7 +13,3 @@ EXPORTER_URL = "http(s)://ip.ip.ip.ip:port"
|
||||||
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
ENODE_MONITORING_IP = "ip.ip.ip.ip"
|
||||||
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
|
ENODE_MONITORING_LOGIN = "admin_user_enode_monitoring" # admin user is required
|
||||||
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
|
ENODE_MONITORING_PASSWORD = "admin_password_enode_monitoring" # # admin password is required
|
||||||
|
|
||||||
# IM configuration for max level of logging info
|
|
||||||
# for example DEBUG, INFO, WARN, ERROR, TRACE
|
|
||||||
IM_LOG_INFO = "INFO"
|
|
||||||
|
|
@ -1,24 +1,3 @@
|
||||||
def notify(
|
|
||||||
String context,
|
|
||||||
String giteaUser,
|
|
||||||
String giteaPass,
|
|
||||||
String repositoryUrl,
|
|
||||||
String repositoryName,
|
|
||||||
String commitHash,
|
|
||||||
String buildStatus
|
|
||||||
) {
|
|
||||||
def status = buildStatus == 'success' ? 'success' : 'failure'
|
|
||||||
def description = buildStatus == 'success' ? 'Build succeeded' : 'Build failed'
|
|
||||||
|
|
||||||
sh """
|
|
||||||
curl -X POST \
|
|
||||||
-u "${giteaUser}:${giteaPass}" \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d '{"context":"${context}","state": "${status}", "description": "${description}"}' \
|
|
||||||
${repositoryUrl}deployer3000/${repositoryName}/statuses/${commitHash}
|
|
||||||
"""
|
|
||||||
}
|
|
||||||
|
|
||||||
pipeline {
|
pipeline {
|
||||||
agent any
|
agent any
|
||||||
environment {
|
environment {
|
||||||
|
|
@ -87,10 +66,6 @@ pipeline {
|
||||||
http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge
|
http://git.entcor/api/v1/repos/deployer3000/integration-module/pulls/${prId}/merge
|
||||||
"""
|
"""
|
||||||
echo "PR ${prId} merged successfully into master!"
|
echo "PR ${prId} merged successfully into master!"
|
||||||
|
|
||||||
def context = "test-org/integration-module/pipeline/pr-${env.CHANGE_TARGET}"
|
|
||||||
def commitHash = sh(script: "git rev-parse HEAD~1", returnStdout: true).trim()
|
|
||||||
notify(context, GITEA_USER, GITEA_PASS, env.GITEA_REPOSITORY_URL, "integration-module", commitHash, "success")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,8 @@ serde = { version = "1.0.217", features = ["derive"] }
|
||||||
serde_json = "1.0.135"
|
serde_json = "1.0.135"
|
||||||
tokio = { version = "1.43.0", features = ["full"] }
|
tokio = { version = "1.43.0", features = ["full"] }
|
||||||
integr-structs = {path = "../integr-structs"}
|
integr-structs = {path = "../integr-structs"}
|
||||||
|
env_logger = "0.11.6"
|
||||||
|
log = "0.4.25"
|
||||||
anyhow = "1.0.95"
|
anyhow = "1.0.95"
|
||||||
chrono = "0.4.39"
|
chrono = "0.4.39"
|
||||||
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
reqwest = { version = "0.12.12", features = ["rustls-tls", "json"] }
|
||||||
|
|
@ -18,5 +20,3 @@ md5 = "0.7.0"
|
||||||
rand = "0.9.0"
|
rand = "0.9.0"
|
||||||
sysinfo = "0.33.1"
|
sysinfo = "0.33.1"
|
||||||
openssl = { version = "0.10", features = ["vendored"] }
|
openssl = { version = "0.10", features = ["vendored"] }
|
||||||
tracing-subscriber = "0.3.19"
|
|
||||||
tracing = "0.1.41"
|
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@
|
||||||
// 1) check changes in unix-socket
|
// 1) check changes in unix-socket
|
||||||
// 2) save changes in local config file
|
// 2) save changes in local config file
|
||||||
use anyhow::{Error, Ok, Result};
|
use anyhow::{Error, Ok, Result};
|
||||||
use tracing::{info, warn, error};
|
use log::{info, warn, error};
|
||||||
use std::{fs, path::Path};
|
use std::{fs, path::Path};
|
||||||
use serde_json::from_str;
|
use serde_json::from_str;
|
||||||
use tokio::{io::AsyncReadExt, net::UnixListener};
|
use tokio::{io::AsyncReadExt, net::UnixListener};
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ use reqwest::Client;
|
||||||
use tokio_postgres::NoTls;
|
use tokio_postgres::NoTls;
|
||||||
use std::env;
|
use std::env;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use tracing::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
use std::ops::Drop;
|
use std::ops::Drop;
|
||||||
|
|
||||||
/// An entity which handles DB connections.
|
/// An entity which handles DB connections.
|
||||||
|
|
@ -89,7 +89,6 @@ impl Exporter {
|
||||||
}
|
}
|
||||||
/// Exports data in `&str` jsonb format to DB using connection from the pool
|
/// Exports data in `&str` jsonb format to DB using connection from the pool
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
#[tracing::instrument(name = "PostgreSQL export")]
|
|
||||||
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
pub async fn export_data(client: PgClient, metrics: &str) -> Result<()> {
|
||||||
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
let query = client.prepare_cached("INSERT INTO metrics (body) VALUES ($1);").await?;
|
||||||
let _ = client.query(&query, &[&metrics]).await?;
|
let _ = client.query(&query, &[&metrics]).await?;
|
||||||
|
|
@ -97,7 +96,6 @@ impl Exporter {
|
||||||
}
|
}
|
||||||
/// Exports metrics in `PrometheusMetrics` format to Exporter defined
|
/// Exports metrics in `PrometheusMetrics` format to Exporter defined
|
||||||
/// as env var $EXORPTER_URL
|
/// as env var $EXORPTER_URL
|
||||||
#[tracing::instrument(name = "Prometheus export")]
|
|
||||||
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
pub async fn export_metrics(metrics: PrometheusMetrics) -> Result<usize> {
|
||||||
let url = env::var("EXPORTER_URL")?;
|
let url = env::var("EXPORTER_URL")?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,9 @@
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
use chrono::Local;
|
use chrono::Local;
|
||||||
|
use env_logger::Builder;
|
||||||
|
use log::LevelFilter;
|
||||||
|
use std::io::Write;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use tracing::info;
|
use log::info;
|
||||||
|
|
||||||
/// # Fn `setup_logger`
|
/// # Fn `setup_logger`
|
||||||
///
|
///
|
||||||
|
|
@ -21,34 +22,20 @@ use tracing::info;
|
||||||
/// *depends on* : -
|
/// *depends on* : -
|
||||||
///
|
///
|
||||||
pub async fn setup_logger() -> Result<()> {
|
pub async fn setup_logger() -> Result<()> {
|
||||||
// Builder::new()
|
Builder::new()
|
||||||
// .format(move |buf, record| {
|
.format(move |buf, record| {
|
||||||
// writeln!(
|
writeln!(
|
||||||
// buf,
|
buf,
|
||||||
// "|{}| {} [{}] - {}",
|
"|{}| {} [{}] - {}",
|
||||||
// "api-grubber",
|
"api-grubber",
|
||||||
// Local::now().format("%d-%m-%Y %H:%M:%S"),
|
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||||
// record.level(),
|
record.level(),
|
||||||
// record.args(),
|
record.args(),
|
||||||
// )
|
)
|
||||||
// })
|
})
|
||||||
// .filter(None, LevelFilter::Info)
|
.filter(None, LevelFilter::Info)
|
||||||
// .target(env_logger::Target::Stdout)
|
.target(env_logger::Target::Stdout)
|
||||||
// .init();
|
.init();
|
||||||
let log_level = std::env::var("IM_LOG_INFO").unwrap_or_else(|_| String::from("INFO"));
|
|
||||||
|
|
||||||
tracing_subscriber::fmt()
|
|
||||||
.with_max_level(
|
|
||||||
tracing::Level::from_str(&log_level)
|
|
||||||
.unwrap_or_else(|_| tracing::Level::INFO))
|
|
||||||
.with_writer(std::io::stdout)
|
|
||||||
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW)
|
|
||||||
// .with_timer(Local::now().format("%d-%m-%Y %H:%M:%S"))
|
|
||||||
.with_line_number(false)
|
|
||||||
.with_target(false)
|
|
||||||
.with_file(false)
|
|
||||||
.compact()
|
|
||||||
.init();
|
|
||||||
|
|
||||||
info!("Logger configured");
|
info!("Logger configured");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -58,17 +45,22 @@ pub async fn setup_logger() -> Result<()> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod logger_unittests {
|
mod logger_unittests {
|
||||||
use tokio::test;
|
use tokio::test;
|
||||||
|
use super::*;
|
||||||
#[test]
|
#[test]
|
||||||
async fn check_logger_builder() {
|
async fn check_logger_builder() {
|
||||||
tracing_subscriber::fmt()
|
Builder::new()
|
||||||
.with_max_level(tracing::Level::INFO)
|
.format(move |buf, record| {
|
||||||
.with_test_writer()
|
writeln!(
|
||||||
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW)
|
buf,
|
||||||
// .with_timer(Local::now().format("%d-%m-%Y %H:%M:%S"))
|
"|{}| {} [{}] - {}",
|
||||||
.with_line_number(false)
|
"api-grubber",
|
||||||
.with_target(false)
|
Local::now().format("%d-%m-%Y %H:%M:%S"),
|
||||||
.with_file(false)
|
record.level(),
|
||||||
.compact()
|
record.args(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.filter(None, LevelFilter::Info)
|
||||||
|
.target(env_logger::Target::Stdout)
|
||||||
.init();
|
.init();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -11,7 +11,7 @@ use logger::setup_logger;
|
||||||
use config::{pull_local_config, init_config_grub_mechanism};
|
use config::{pull_local_config, init_config_grub_mechanism};
|
||||||
use net::init_api_grub_mechanism;
|
use net::init_api_grub_mechanism;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use monitoring::get_metrics_from_monitoring;
|
use monitoring::get_metrics_from_monitoring;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ use tokio::task::JoinHandle;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
|
use integr_structs::api::v3::{MetricOutputExtended, PrometheusMetricsExtended};
|
||||||
use tracing::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
/// # Fn `get_metrics_from_monitoring`
|
/// # Fn `get_metrics_from_monitoring`
|
||||||
|
|
@ -38,18 +38,15 @@ use std::collections::HashMap;
|
||||||
/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(()));
|
/// assert_eq!(get_metrics_from_monitoring(0, 5).await, Ok(()));
|
||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
#[tracing::instrument(name = "CM mechanism", skip_all)]
|
|
||||||
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyhow::Result<()> {
|
||||||
|
|
||||||
let timer = tokio::time::Instant::now();
|
let timer = tokio::time::Instant::now();
|
||||||
let mut a = MonitoringImporter::new().await;
|
let mut a = MonitoringImporter::new().await;
|
||||||
'outer: loop {
|
'outer: loop {
|
||||||
// let mut a = MonitoringImporter::new().await;
|
// let mut a = MonitoringImporter::new().await;
|
||||||
a.start_session().await?;
|
a.start_session().await?;
|
||||||
tracing::debug!("CM creds struct - {:#?}", a);
|
info!("Started a new CM session");
|
||||||
|
|
||||||
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
|
let vec = Arc::new(a.get_metrics_list().await.unwrap_or_else(|_| vec![]));
|
||||||
tracing::debug!("Measures Vec - {:#?}", vec);
|
|
||||||
|
|
||||||
'inner: loop {
|
'inner: loop {
|
||||||
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
|
if duration != 0 && timer.elapsed() >= tokio::time::Duration::from_secs(duration as u64) {
|
||||||
|
|
@ -66,7 +63,6 @@ pub async fn get_metrics_from_monitoring(duration: usize, delay: usize) -> anyho
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
tokio::time::sleep(tokio::time::Duration::from_secs(delay as u64)).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -140,7 +136,6 @@ impl MonitoringImporter {
|
||||||
///
|
///
|
||||||
/// *Also* it saves ts and access-key in it's runtime environment,
|
/// *Also* it saves ts and access-key in it's runtime environment,
|
||||||
/// there's no way to get access-key of session
|
/// there's no way to get access-key of session
|
||||||
#[tracing::instrument(name = "CM-session mechanism", skip_all)]
|
|
||||||
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
pub async fn start_session(&mut self) -> anyhow::Result<()> {
|
||||||
if !self.is_valid().await {
|
if !self.is_valid().await {
|
||||||
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
return Err(Error::msg("Invalid eNODE-Monitoring configuration"));
|
||||||
|
|
@ -148,32 +143,18 @@ impl MonitoringImporter {
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
let url = format!("http://{}/e-data-front/auth/login", self.ip);
|
||||||
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
let fortoken = ForTokenCredentials::new(&self.login, &self.password);
|
||||||
let mut delay = 1;
|
|
||||||
|
|
||||||
loop {
|
let client = client
|
||||||
let client = client
|
.post(url)
|
||||||
.post(&url)
|
|
||||||
.header("Content-Type", "application/json")
|
.header("Content-Type", "application/json")
|
||||||
.json(&fortoken);
|
.json(&fortoken);
|
||||||
// let resp = client.send().await?;
|
let resp = client.send().await?;
|
||||||
if let Ok(resp) = client.send().await {
|
let auth = resp.json::<AuthResponse>().await?;
|
||||||
// let auth = resp.json::<AuthResponse>().await?;
|
|
||||||
|
self.set_ts(&fortoken.ts).await;
|
||||||
|
|
||||||
|
self.access_token = auth.access_token.to_owned();
|
||||||
|
|
||||||
match resp.json::<AuthResponse>().await {
|
|
||||||
Ok(auth) => {
|
|
||||||
self.set_ts(&fortoken.ts).await;
|
|
||||||
self.access_token = auth.access_token.to_owned();
|
|
||||||
tracing::trace!("Access key was changed");
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
Err(er) => error!("Error with extracting access-key from CM response due to {}", er),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
error!("Error while trying to create a new session, waiting {} secs and retrying ...", delay);
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
|
|
||||||
delay = delay * 2;
|
|
||||||
}
|
|
||||||
info!("Started a new CM session");
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -183,9 +164,7 @@ impl MonitoringImporter {
|
||||||
/// and returning measures in format of `Ok(Vec<(String, String)>)`
|
/// and returning measures in format of `Ok(Vec<(String, String)>)`
|
||||||
/// , where `(String, String)` is a tuple of measure `id` and `description`
|
/// , where `(String, String)` is a tuple of measure `id` and `description`
|
||||||
/// (`name`)
|
/// (`name`)
|
||||||
#[tracing::instrument(name = "CM get metrics list mechanism", skip_all)]
|
|
||||||
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
pub async fn get_metrics_list(&self) -> anyhow::Result<Vec<(String, String)>> {
|
||||||
tracing::trace!("Trying ti get measures list from CM ...");
|
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let mut vec: Vec<(String, String)> = Vec::new();
|
let mut vec: Vec<(String, String)> = Vec::new();
|
||||||
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
let url = format!("http://{}/e-cmdb/api/query", self.ip);
|
||||||
|
|
@ -233,9 +212,7 @@ impl MonitoringImporter {
|
||||||
/// 3) spawns async tasks-grabbers to get measures info which
|
/// 3) spawns async tasks-grabbers to get measures info which
|
||||||
/// exprots all data by itselfs
|
/// exprots all data by itselfs
|
||||||
///
|
///
|
||||||
#[tracing::instrument(name = "CM get measures info mechanism", skip_all)]
|
|
||||||
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
|
pub async fn get_measure_info(&self, measures: Arc<Vec<(String, String)>>) -> anyhow::Result<()> {
|
||||||
tracing::trace!("Trying ti get info about each measure ...");
|
|
||||||
let mut sys = sysinfo::System::new();
|
let mut sys = sysinfo::System::new();
|
||||||
sys.refresh_cpu_all();
|
sys.refresh_cpu_all();
|
||||||
// adaptive permition on task spawm to prevent system overload
|
// adaptive permition on task spawm to prevent system overload
|
||||||
|
|
@ -290,7 +267,6 @@ impl MonitoringImporter {
|
||||||
/// This is a neccesary measure to handle two types of requests and URL restrictions
|
/// This is a neccesary measure to handle two types of requests and URL restrictions
|
||||||
///
|
///
|
||||||
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
|
async fn process_endpoint(measure: Arc<String>, client: Arc<Client>, arc: Arc<Self>, hm: &HashMap<String, String>) -> anyhow::Result<PrometheusMetricsExtended> {
|
||||||
tracing::trace!("Processing CM endpoint with one or more measure names");
|
|
||||||
let resp = client
|
let resp = client
|
||||||
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
.get(format!("http://{}/e-nms/mirror/measure/{}", arc.ip, &measure))
|
||||||
.header("Content-Type", "application/json")
|
.header("Content-Type", "application/json")
|
||||||
|
|
@ -345,7 +321,6 @@ impl MonitoringImporter {
|
||||||
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
|
/// Searches for certain fields and aggregates it in the `MetricOutputExtended`
|
||||||
/// object
|
/// object
|
||||||
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
async fn process_value(obj : &Map<String, Value>, hm: &HashMap<String, String>) -> anyhow::Result<MetricOutputExtended> {
|
||||||
tracing::trace!("Processing atomic Object value in CM JSON-response");
|
|
||||||
let id = obj.get("$id");
|
let id = obj.get("$id");
|
||||||
let val = obj.get("value");
|
let val = obj.get("value");
|
||||||
let description = {
|
let description = {
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
// module to handle unix-socket connection + pulling info from api
|
// module to handle unix-socket connection + pulling info from api
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use tracing::{error, info};
|
use log::{error, info};
|
||||||
use rand::random;
|
use rand::random;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue